Repository: spark Updated Branches: refs/heads/branch-2.4 9031c7848 -> a9a8d3a4b
[SPARK-25425][SQL][BACKPORT-2.4] Extra options should override session options in DataSource V2 ## What changes were proposed in this pull request? In the PR, I propose overriding session options by extra options in DataSource V2. Extra options are more specific and set via `.option()`, and should overwrite more generic session options. ## How was this patch tested? Added tests for read and write paths. Closes #22474 from MaxGekk/session-options-2.4. Authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9a8d3a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9a8d3a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9a8d3a4 Branch: refs/heads/branch-2.4 Commit: a9a8d3a4b92be89defd82d5f2eeb3f9af45c687d Parents: 9031c78 Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Wed Sep 19 16:53:26 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Wed Sep 19 16:53:26 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../org/apache/spark/sql/DataFrameWriter.scala | 8 +++-- .../sql/sources/v2/DataSourceV2Suite.scala | 33 ++++++++++++++++++++ .../sources/v2/SimpleWritableDataSource.scala | 7 ++++- 4 files changed, 45 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 371ec70..27a1af2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) } Dataset.ofRows(sparkSession, DataSourceV2Relation.create( - ds, extraOptions.toMap ++ sessionOptions + pathsOption, + ds, sessionOptions ++ extraOptions.toMap + pathsOption, userSpecifiedSchema = userSpecifiedSchema)) } else { loadV1Source(paths: _*) http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 4aeddfd..80ade7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val source = cls.newInstance().asInstanceOf[DataSourceV2] source match { case ws: WriteSupport => - val options = extraOptions ++ - DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf) + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + source, + df.sparkSession.sessionState.conf) + val options = sessionOptions ++ extraOptions + val relation = DataSourceV2Relation.create(source, options) - val relation = DataSourceV2Relation.create(source, options.toMap) if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 12beca2..bafde50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.sources.v2 +import java.io.File import java.util.{ArrayList, List => JList} import test.org.apache.spark.sql.sources.v2._ @@ -322,6 +323,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { checkCanonicalizedOutput(df, 2, 2) checkCanonicalizedOutput(df.select('i), 2, 1) } + + test("SPARK-25425: extra options should override sessions options during reading") { + val prefix = "spark.datasource.userDefinedDataSource." + val optionName = "optionA" + withSQLConf(prefix + optionName -> "true") { + val df = spark + .read + .option(optionName, false) + .format(classOf[DataSourceV2WithSessionConfig].getName).load() + val options = df.queryExecution.optimizedPlan.collectFirst { + case d: DataSourceV2Relation => d.options + } + assert(options.get.get(optionName) == Some("false")) + } + } + + test("SPARK-25425: extra options should override sessions options during writing") { + withTempPath { path => + val sessionPath = path.getCanonicalPath + withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) { + withTempPath { file => + val optionPath = file.getCanonicalPath + val format = classOf[SimpleWritableDataSource].getName + + val df = Seq((1L, 2L)).toDF("i", "j") + df.write.format(format).option("path", optionPath).save() + assert(!new File(sessionPath).exists) + checkAnswer(spark.read.format(format).option("path", optionPath).load(), df) + } + } + } + } } class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport { http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index e1b8e9c..654c62d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -38,10 +38,15 @@ import org.apache.spark.util.SerializableConfiguration * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/jobId/` to `target`. */ -class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { +class SimpleWritableDataSource extends DataSourceV2 + with ReadSupport + with WriteSupport + with SessionConfigSupport { private val schema = new StructType().add("i", "long").add("j", "long") + override def keyPrefix: String = "simpleWritableDataSource" + class Reader(path: String, conf: Configuration) extends DataSourceReader { override def readSchema(): StructType = schema --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org