Repository: spark
Updated Branches:
refs/heads/master bb2f069cf -> e06da95cd
[SPARK-25425][SQL] Extra options should override session options in DataSource
V2
## What changes were proposed in this pull request?
In the PR, I propose overriding session options by extra options in DataSource
V2. Extra options are more specific and set via `.option()`, and should
overwrite more generic session options. Entries from seconds map overwrites
entries with the same key from the first map, for example:
```Scala
scala> Map("option" -> false) ++ Map("option" -> true)
res0: scala.collection.immutable.Map[String,Boolean] = Map(option -> true)
```
## How was this patch tested?
Added a test for checking which option is propagated to a data source in
`load()`.
Closes #22413 from MaxGekk/session-options.
Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e06da95c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e06da95c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e06da95c
Branch: refs/heads/master
Commit: e06da95cd9423f55cdb154a2778b0bddf7be984c
Parents: bb2f069
Author: Maxim Gekk <[email protected]>
Authored: Sat Sep 15 17:24:11 2018 -0700
Committer: Dongjoon Hyun <[email protected]>
Committed: Sat Sep 15 17:24:11 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameReader.scala | 2 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 8 +++--
.../sql/sources/v2/DataSourceV2Suite.scala | 35 +++++++++++++++++++-
.../sources/v2/SimpleWritableDataSource.scala | 6 +++-
4 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e6c2cba..fe69f25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
DataSourceOptions.PATHS_KEY ->
objectMapper.writeValueAsString(paths.toArray)
}
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
- ds, extraOptions.toMap ++ sessionOptions + pathsOption,
+ ds, sessionOptions ++ extraOptions.toMap + pathsOption,
userSpecifiedSchema = userSpecifiedSchema))
} else {
loadV1Source(paths: _*)
http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index dfb8c47..188fce7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
val source = cls.newInstance().asInstanceOf[DataSourceV2]
source match {
case provider: BatchWriteSupportProvider =>
- val options = extraOptions ++
- DataSourceV2Utils.extractSessionConfigs(source,
df.sparkSession.sessionState.conf)
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ source,
+ df.sparkSession.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
- val relation = DataSourceV2Relation.create(source, options.toMap)
+ val relation = DataSourceV2Relation.create(source, options)
if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index f6c3e0c..7cc8abc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.sources.v2
+import java.io.File
+
import test.org.apache.spark.sql.sources.v2._
import org.apache.spark.SparkException
@@ -317,6 +319,38 @@ class DataSourceV2Suite extends QueryTest with
SharedSQLContext {
checkCanonicalizedOutput(df, 2, 2)
checkCanonicalizedOutput(df.select('i), 2, 1)
}
+
+ test("SPARK-25425: extra options should override sessions options during
reading") {
+ val prefix = "spark.datasource.userDefinedDataSource."
+ val optionName = "optionA"
+ withSQLConf(prefix + optionName -> "true") {
+ val df = spark
+ .read
+ .option(optionName, false)
+ .format(classOf[DataSourceV2WithSessionConfig].getName).load()
+ val options = df.queryExecution.optimizedPlan.collectFirst {
+ case d: DataSourceV2Relation => d.options
+ }
+ assert(options.get.get(optionName) == Some("false"))
+ }
+ }
+
+ test("SPARK-25425: extra options should override sessions options during
writing") {
+ withTempPath { path =>
+ val sessionPath = path.getCanonicalPath
+ withSQLConf("spark.datasource.simpleWritableDataSource.path" ->
sessionPath) {
+ withTempPath { file =>
+ val optionPath = file.getCanonicalPath
+ val format = classOf[SimpleWritableDataSource].getName
+
+ val df = Seq((1L, 2L)).toDF("i", "j")
+ df.write.format(format).option("path", optionPath).save()
+ assert(!new File(sessionPath).exists)
+ checkAnswer(spark.read.format(format).option("path",
optionPath).load(), df)
+ }
+ }
+ }
+ }
}
@@ -385,7 +419,6 @@ class SimpleDataSourceV2 extends DataSourceV2 with
BatchReadSupportProvider {
}
}
-
class AdvancedDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider {
class ReadSupport extends SimpleReadSupport {
http://git-wip-us.apache.org/repos/asf/spark/blob/e06da95c/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 952241b..a0f4404 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -39,10 +39,14 @@ import org.apache.spark.util.SerializableConfiguration
* Each job moves files from `target/_temporary/queryId/` to `target`.
*/
class SimpleWritableDataSource extends DataSourceV2
- with BatchReadSupportProvider with BatchWriteSupportProvider {
+ with BatchReadSupportProvider
+ with BatchWriteSupportProvider
+ with SessionConfigSupport {
private val schema = new StructType().add("i", "long").add("j", "long")
+ override def keyPrefix: String = "simpleWritableDataSource"
+
class ReadSupport(path: String, conf: Configuration) extends
SimpleReadSupport {
override def fullSchema(): StructType = schema
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]