Repository: spark
Updated Branches:
  refs/heads/master 80813e198 -> 83e19d5b8


[SPARK-25700][SQL] Creates ReadSupport in only Append Mode in Data Source V2 
write path

## What changes were proposed in this pull request?

This PR proposes to avoid to make a readsupport and read schema when it writes 
in other save modes.

https://github.com/apache/spark/commit/5fef6e3513d6023a837c427d183006d153c7102b 
happened to create a readsupport in write path, which ended up with reading 
schema from readsupport at write path.

This breaks `spark.range(1).format("source").write.save("non-existent-path")` 
case since there's no way to read the schema from "non-existent-path".

See also https://github.com/apache/spark/pull/22009#discussion_r223982672
See also https://github.com/apache/spark/pull/22697
See also 
http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html

## How was this patch tested?

Unit test and manual tests.

Closes #22688 from HyukjinKwon/append-revert-2.

Authored-by: hyukjinkwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83e19d5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83e19d5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83e19d5b

Branch: refs/heads/master
Commit: 83e19d5b80fac6ea4b29d8eb561a5ad06835171b
Parents: 80813e1
Author: hyukjinkwon <[email protected]>
Authored: Thu Oct 11 09:35:49 2018 -0700
Committer: Dongjoon Hyun <[email protected]>
Committed: Thu Oct 11 09:35:49 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |  2 +-
 .../sql/sources/v2/DataSourceV2Suite.scala      | 29 ++++++++++++++++++++
 .../sources/v2/SimpleWritableDataSource.scala   |  5 ++--
 3 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83e19d5b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 188fce7..55e538f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -246,8 +246,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
             df.sparkSession.sessionState.conf)
           val options = sessionOptions ++ extraOptions
 
-          val relation = DataSourceV2Relation.create(source, options)
           if (mode == SaveMode.Append) {
+            val relation = DataSourceV2Relation.create(source, options)
             runCommand(df.sparkSession, "save") {
               AppendData.byName(relation, df.logicalPlan)
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/83e19d5b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 7cc8abc..e8f291a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -351,6 +351,24 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-25700: do not read schema when writing in other modes except 
append mode") {
+    withTempPath { file =>
+      val cls = classOf[SimpleWriteOnlyDataSource]
+      val path = file.getCanonicalPath
+      val df = spark.range(5).select('id as 'i, -'id as 'j)
+      try {
+        df.write.format(cls.getName).option("path", path).mode("error").save()
+        df.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
+        df.write.format(cls.getName).option("path", path).mode("ignore").save()
+      } catch {
+        case e: SchemaReadAttemptException => fail("Schema read was 
attempted.", e)
+      }
+      intercept[SchemaReadAttemptException] {
+        df.write.format(cls.getName).option("path", path).mode("append").save()
+      }
+    }
+  }
 }
 
 
@@ -640,3 +658,14 @@ object SpecificReaderFactory extends 
PartitionReaderFactory {
     }
   }
 }
+
+class SchemaReadAttemptException(m: String) extends RuntimeException(m)
+
+class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
+  override def fullSchema(): StructType = {
+    // This is a bit hacky since this source implements read support but throws
+    // during schema retrieval. Might have to rewrite but it's done
+    // such so for minimised changes.
+    throw new SchemaReadAttemptException("read is not supported")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/83e19d5b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index a0f4404..a7dfc2d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2
   with BatchWriteSupportProvider
   with SessionConfigSupport {
 
-  private val schema = new StructType().add("i", "long").add("j", "long")
+  protected def fullSchema(): StructType = new StructType().add("i", 
"long").add("j", "long")
 
   override def keyPrefix: String = "simpleWritableDataSource"
 
   class ReadSupport(path: String, conf: Configuration) extends 
SimpleReadSupport {
 
-    override def fullSchema(): StructType = schema
+    override def fullSchema(): StructType = 
SimpleWritableDataSource.this.fullSchema()
 
     override def planInputPartitions(config: ScanConfig): 
Array[InputPartition] = {
       val dataPath = new Path(path)
@@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2
       schema: StructType,
       mode: SaveMode,
       options: DataSourceOptions): Optional[BatchWriteSupport] = {
-    assert(DataType.equalsStructurally(schema.asNullable, 
this.schema.asNullable))
     assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", 
false))
 
     val path = new Path(options.get("path").get())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to