Repository: spark Updated Branches: refs/heads/branch-2.4 3e776d73b -> b6e4aca0b
[SPARK-25700][SQL][BRANCH-2.4] Partially revert append mode support in Data Source V2 ## What changes were proposed in this pull request? This PR proposes to partially revert https://github.com/apache/spark/commit/5fef6e3513d6023a837c427d183006d153c7102b so that it does make a readsupport and read schema when it writes in branch 2-4 since it's too breaking change. https://github.com/apache/spark/commit/5fef6e3513d6023a837c427d183006d153c7102b happened to create a readsupport in write path, which ended up with reading schema from readsupport at write path. For instance, this breaks `spark.range(1).format("source").write.save("non-existent-path")` case since there's no way to read the schema from "non-existent-path". See also https://github.com/apache/spark/pull/22009#discussion_r223982672 See also https://github.com/apache/spark/pull/22688 See also http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html ## How was this patch tested? Unit test and manual tests. Closes #22697 from HyukjinKwon/append-revert-2.4. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6e4aca0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6e4aca0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6e4aca0 Branch: refs/heads/branch-2.4 Commit: b6e4aca0be7f3b863c326063a3c02aa8a1c266a3 Parents: 3e776d7 Author: hyukjinkwon <[email protected]> Authored: Mon Oct 15 10:46:10 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Mon Oct 15 10:46:10 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameWriter.scala | 20 +++++---------- .../sql/sources/v2/DataSourceV2Suite.scala | 27 ++++++++++++++++++++ .../sources/v2/SimpleWritableDataSource.scala | 5 ++-- 3 files changed, 35 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b6e4aca0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c1e2f49..b77dfd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -250,22 +250,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { source, df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions - val relation = DataSourceV2Relation.create(source, options) - if (mode == SaveMode.Append) { - runCommand(df.sparkSession, "save") { - AppendData.byName(relation, df.logicalPlan) - } + val writer = ws.createWriter( + UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, + new DataSourceOptions(options.asJava)) - } else { - val writer = ws.createWriter( - UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, - new DataSourceOptions(options.asJava)) - - if (writer.isPresent) { - runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get, df.logicalPlan) - } + if (writer.isPresent) { + runCommand(df.sparkSession, "save") { + WriteToDataSourceV2(writer.get, df.logicalPlan) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b6e4aca0/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index bafde50..2367bdd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -355,6 +355,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25700: do not read schema when writing") { + withTempPath { file => + val cls = classOf[SimpleWriteOnlyDataSource] + val path = file.getCanonicalPath + val df = spark.range(5).select('id as 'i, -'id as 'j) + try { + df.write.format(cls.getName).option("path", path).mode("error").save() + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + df.write.format(cls.getName).option("path", path).mode("ignore").save() + df.write.format(cls.getName).option("path", path).mode("append").save() + } catch { + case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) + } + } + } } class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport { @@ -594,3 +610,14 @@ class SpecificInputPartitionReader(i: Array[Int], j: Array[Int]) override def close(): Unit = {} } + +class SchemaReadAttemptException(m: String) extends RuntimeException(m) + +class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { + override def fullSchema(): StructType = { + // This is a bit hacky since this source implements read support but throws + // during schema retrieval. Might have to rewrite but it's done + // such so for minimised changes. + throw new SchemaReadAttemptException("read is not supported") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/b6e4aca0/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 654c62d..4cf0259 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -43,12 +43,12 @@ class SimpleWritableDataSource extends DataSourceV2 with WriteSupport with SessionConfigSupport { - private val schema = new StructType().add("i", "long").add("j", "long") + protected def fullSchema() = new StructType().add("i", "long").add("j", "long") override def keyPrefix: String = "simpleWritableDataSource" class Reader(path: String, conf: Configuration) extends DataSourceReader { - override def readSchema(): StructType = schema + override def readSchema(): StructType = SimpleWritableDataSource.this.fullSchema() override def planInputPartitions(): JList[InputPartition[InternalRow]] = { val dataPath = new Path(path) @@ -113,7 +113,6 @@ class SimpleWritableDataSource extends DataSourceV2 schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = { - assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable)) assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false)) val path = new Path(options.get("path").get()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
