Repository: spark
Updated Branches:
  refs/heads/branch-2.4 3e776d73b -> b6e4aca0b


[SPARK-25700][SQL][BRANCH-2.4] Partially revert append mode support in Data 
Source V2

## What changes were proposed in this pull request?

This PR proposes to partially revert 
https://github.com/apache/spark/commit/5fef6e3513d6023a837c427d183006d153c7102b 
so that it does make a readsupport and read schema when it writes in branch 2-4 
since it's too breaking change.

https://github.com/apache/spark/commit/5fef6e3513d6023a837c427d183006d153c7102b 
happened to create a readsupport in write path, which ended up with reading 
schema from readsupport at write path.

For instance, this breaks 
`spark.range(1).format("source").write.save("non-existent-path")` case since 
there's no way to read the schema from "non-existent-path".

See also https://github.com/apache/spark/pull/22009#discussion_r223982672
See also https://github.com/apache/spark/pull/22688
See also 
http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html

## How was this patch tested?

Unit test and manual tests.

Closes #22697 from HyukjinKwon/append-revert-2.4.

Authored-by: hyukjinkwon <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6e4aca0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6e4aca0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6e4aca0

Branch: refs/heads/branch-2.4
Commit: b6e4aca0be7f3b863c326063a3c02aa8a1c266a3
Parents: 3e776d7
Author: hyukjinkwon <[email protected]>
Authored: Mon Oct 15 10:46:10 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Oct 15 10:46:10 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 20 +++++----------
 .../sql/sources/v2/DataSourceV2Suite.scala      | 27 ++++++++++++++++++++
 .../sources/v2/SimpleWritableDataSource.scala   |  5 ++--
 3 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6e4aca0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index c1e2f49..b77dfd9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -250,22 +250,14 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
             source,
             df.sparkSession.sessionState.conf)
           val options = sessionOptions ++ extraOptions
-          val relation = DataSourceV2Relation.create(source, options)
 
-          if (mode == SaveMode.Append) {
-            runCommand(df.sparkSession, "save") {
-              AppendData.byName(relation, df.logicalPlan)
-            }
+          val writer = ws.createWriter(
+            UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
+            new DataSourceOptions(options.asJava))
 
-          } else {
-            val writer = ws.createWriter(
-              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, 
mode,
-              new DataSourceOptions(options.asJava))
-
-            if (writer.isPresent) {
-              runCommand(df.sparkSession, "save") {
-                WriteToDataSourceV2(writer.get, df.logicalPlan)
-              }
+          if (writer.isPresent) {
+            runCommand(df.sparkSession, "save") {
+              WriteToDataSourceV2(writer.get, df.logicalPlan)
             }
           }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b6e4aca0/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index bafde50..2367bdd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -355,6 +355,22 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-25700: do not read schema when writing") {
+    withTempPath { file =>
+      val cls = classOf[SimpleWriteOnlyDataSource]
+      val path = file.getCanonicalPath
+      val df = spark.range(5).select('id as 'i, -'id as 'j)
+      try {
+        df.write.format(cls.getName).option("path", path).mode("error").save()
+        df.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
+        df.write.format(cls.getName).option("path", path).mode("ignore").save()
+        df.write.format(cls.getName).option("path", path).mode("append").save()
+      } catch {
+        case e: SchemaReadAttemptException => fail("Schema read was 
attempted.", e)
+      }
+    }
+  }
 }
 
 class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
@@ -594,3 +610,14 @@ class SpecificInputPartitionReader(i: Array[Int], j: 
Array[Int])
 
   override def close(): Unit = {}
 }
+
+class SchemaReadAttemptException(m: String) extends RuntimeException(m)
+
+class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
+  override def fullSchema(): StructType = {
+    // This is a bit hacky since this source implements read support but throws
+    // during schema retrieval. Might have to rewrite but it's done
+    // such so for minimised changes.
+    throw new SchemaReadAttemptException("read is not supported")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6e4aca0/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 654c62d..4cf0259 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -43,12 +43,12 @@ class SimpleWritableDataSource extends DataSourceV2
   with WriteSupport
   with SessionConfigSupport {
 
-  private val schema = new StructType().add("i", "long").add("j", "long")
+  protected def fullSchema() = new StructType().add("i", "long").add("j", 
"long")
 
   override def keyPrefix: String = "simpleWritableDataSource"
 
   class Reader(path: String, conf: Configuration) extends DataSourceReader {
-    override def readSchema(): StructType = schema
+    override def readSchema(): StructType = 
SimpleWritableDataSource.this.fullSchema()
 
     override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
       val dataPath = new Path(path)
@@ -113,7 +113,6 @@ class SimpleWritableDataSource extends DataSourceV2
       schema: StructType,
       mode: SaveMode,
       options: DataSourceOptions): Optional[DataSourceWriter] = {
-    assert(DataType.equalsStructurally(schema.asNullable, 
this.schema.asNullable))
     assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", 
false))
 
     val path = new Path(options.get("path").get())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to