Repository: spark Updated Branches: refs/heads/master 80813e198 -> 83e19d5b8
[SPARK-25700][SQL] Creates ReadSupport in only Append Mode in Data Source V2 write path ## What changes were proposed in this pull request? This PR proposes to avoid to make a readsupport and read schema when it writes in other save modes. https://github.com/apache/spark/commit/5fef6e3513d6023a837c427d183006d153c7102b happened to create a readsupport in write path, which ended up with reading schema from readsupport at write path. This breaks `spark.range(1).format("source").write.save("non-existent-path")` case since there's no way to read the schema from "non-existent-path". See also https://github.com/apache/spark/pull/22009#discussion_r223982672 See also https://github.com/apache/spark/pull/22697 See also http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html ## How was this patch tested? Unit test and manual tests. Closes #22688 from HyukjinKwon/append-revert-2. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83e19d5b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83e19d5b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83e19d5b Branch: refs/heads/master Commit: 83e19d5b80fac6ea4b29d8eb561a5ad06835171b Parents: 80813e1 Author: hyukjinkwon <[email protected]> Authored: Thu Oct 11 09:35:49 2018 -0700 Committer: Dongjoon Hyun <[email protected]> Committed: Thu Oct 11 09:35:49 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../sql/sources/v2/DataSourceV2Suite.scala | 29 ++++++++++++++++++++ .../sources/v2/SimpleWritableDataSource.scala | 5 ++-- 3 files changed, 32 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/83e19d5b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 188fce7..55e538f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -246,8 +246,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions - val relation = DataSourceV2Relation.create(source, options) if (mode == SaveMode.Append) { + val relation = DataSourceV2Relation.create(source, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } http://git-wip-us.apache.org/repos/asf/spark/blob/83e19d5b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 7cc8abc..e8f291a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -351,6 +351,24 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25700: do not read schema when writing in other modes except append mode") { + withTempPath { file => + val cls = classOf[SimpleWriteOnlyDataSource] + val path = file.getCanonicalPath + val df = spark.range(5).select('id as 'i, -'id as 'j) + try { + df.write.format(cls.getName).option("path", path).mode("error").save() + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + df.write.format(cls.getName).option("path", path).mode("ignore").save() + } catch { + case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) + } + intercept[SchemaReadAttemptException] { + df.write.format(cls.getName).option("path", path).mode("append").save() + } + } + } } @@ -640,3 +658,14 @@ object SpecificReaderFactory extends PartitionReaderFactory { } } } + +class SchemaReadAttemptException(m: String) extends RuntimeException(m) + +class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { + override def fullSchema(): StructType = { + // This is a bit hacky since this source implements read support but throws + // during schema retrieval. Might have to rewrite but it's done + // such so for minimised changes. + throw new SchemaReadAttemptException("read is not supported") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/83e19d5b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index a0f4404..a7dfc2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2 with BatchWriteSupportProvider with SessionConfigSupport { - private val schema = new StructType().add("i", "long").add("j", "long") + protected def fullSchema(): StructType = new StructType().add("i", "long").add("j", "long") override def keyPrefix: String = "simpleWritableDataSource" class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport { - override def fullSchema(): StructType = schema + override def fullSchema(): StructType = SimpleWritableDataSource.this.fullSchema() override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { val dataPath = new Path(path) @@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2 schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[BatchWriteSupport] = { - assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable)) assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false)) val path = new Path(options.get("path").get()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
