Repository: spark
Updated Branches:
  refs/heads/branch-2.3 9674d083e -> cbb228e48


[SPARK-25425][SQL][BACKPORT-2.3] Extra options should override session options 
in DataSource V2

## What changes were proposed in this pull request?

In the PR, I propose overriding session options by extra options in DataSource 
V2. Extra options are more specific and set via `.option()`, and should 
overwrite more generic session options.

## How was this patch tested?

Added tests for read and write paths.

Closes #22489 from MaxGekk/session-options-2.3.

Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbb228e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbb228e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbb228e4

Branch: refs/heads/branch-2.3
Commit: cbb228e48bb046e7d88d6bf1c9b9e3b252241552
Parents: 9674d08
Author: Maxim Gekk <[email protected]>
Authored: Tue Sep 25 23:35:57 2018 -0700
Committer: Dongjoon Hyun <[email protected]>
Committed: Tue Sep 25 23:35:57 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala  |  8 ++--
 .../org/apache/spark/sql/DataFrameWriter.scala  |  8 ++--
 .../sql/sources/v2/DataSourceV2Suite.scala      | 50 ++++++++++++++++----
 .../sources/v2/SimpleWritableDataSource.scala   |  7 ++-
 4 files changed, 56 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cbb228e4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 395e1c9..1d74b35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -190,10 +190,10 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     val cls = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf)
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
       val ds = cls.newInstance()
-      val options = new DataSourceOptions((extraOptions ++
-        DataSourceV2Utils.extractSessionConfigs(
-          ds = ds.asInstanceOf[DataSourceV2],
-          conf = sparkSession.sessionState.conf)).asJava)
+      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+        ds = ds.asInstanceOf[DataSourceV2],
+        conf = sparkSession.sessionState.conf)
+      val options = new DataSourceOptions((sessionOptions ++ 
extraOptions).asJava)
 
       // Streaming also uses the data source V2 API. So it may be that the 
data source implements
       // v2, but has no v2 implementation for batch reads. In that case, we 
fall back to loading

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb228e4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6c9fb52..3fcefb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -243,10 +243,10 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       val ds = cls.newInstance()
       ds match {
         case ws: WriteSupport =>
-          val options = new DataSourceOptions((extraOptions ++
-            DataSourceV2Utils.extractSessionConfigs(
-              ds = ds.asInstanceOf[DataSourceV2],
-              conf = df.sparkSession.sessionState.conf)).asJava)
+          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+            ds = ds.asInstanceOf[DataSourceV2],
+            conf = df.sparkSession.sessionState.conf)
+          val options = new DataSourceOptions((sessionOptions ++ 
extraOptions).asJava)
           // Using a timestamp and a random UUID to distinguish different 
writing jobs. This is good
           // enough as there won't be tons of writing jobs created at the same 
second.
           val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb228e4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 6ad0e5f..ec81e89 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.sources.v2
 
+import java.io.File
 import java.util.{ArrayList, List => JList}
 
 import test.org.apache.spark.sql.sources.v2._
@@ -315,19 +316,52 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
     checkCanonicalizedOutput(df, 2)
     checkCanonicalizedOutput(df.select('i), 1)
   }
-}
-
-class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceReader {
-    override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
+  test("SPARK-25425: extra options should override sessions options during 
reading") {
+    val prefix = "spark.datasource.userDefinedDataSource."
+    val optionName = "optionA"
+    withSQLConf(prefix + optionName -> "true") {
+      val df = spark
+        .read
+        .option(optionName, false)
+        .format(classOf[DataSourceV2WithSessionConfig].getName).load()
+      val options = df.queryExecution.optimizedPlan.collectFirst {
+        case DataSourceV2Relation(_, SimpleDataSourceV2Reader(options)) => 
options
+      }
+      assert(options.get.getBoolean(optionName, true) == false)
+    }
+  }
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
-      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new 
SimpleDataReaderFactory(5, 10))
+  test("SPARK-25425: extra options should override sessions options during 
writing") {
+    withTempPath { path =>
+      val sessionPath = path.getCanonicalPath
+      withSQLConf("spark.datasource.simpleWritableDataSource.path" -> 
sessionPath) {
+        withTempPath { file =>
+          val optionPath = file.getCanonicalPath
+          val format = classOf[SimpleWritableDataSource].getName
+
+          val df = Seq((1L, 2L)).toDF("i", "j")
+          df.write.format(format).option("path", optionPath).save()
+          assert(!new File(sessionPath).exists)
+          checkAnswer(spark.read.format(format).option("path", 
optionPath).load(), df)
+        }
+      }
     }
   }
+}
 
-  override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
+case class SimpleDataSourceV2Reader(options: DataSourceOptions) extends 
DataSourceReader {
+  override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
+
+  override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new 
SimpleDataReaderFactory(5, 10))
+  }
+}
+
+class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
+  override def createReader(options: DataSourceOptions): DataSourceReader = {
+    SimpleDataSourceV2Reader(options)
+  }
 }
 
 class SimpleDataReaderFactory(start: Int, end: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb228e4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index a131b16..ea93fb4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -38,10 +38,15 @@ import org.apache.spark.util.SerializableConfiguration
  * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
  * Each job moves files from `target/_temporary/jobId/` to `target`.
  */
-class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+class SimpleWritableDataSource extends DataSourceV2
+  with ReadSupport
+  with WriteSupport
+  with SessionConfigSupport {
 
   private val schema = new StructType().add("i", "long").add("j", "long")
 
+  override def keyPrefix: String = "simpleWritableDataSource"
+
   class Reader(path: String, conf: Configuration) extends DataSourceReader {
     override def readSchema(): StructType = schema
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to