Repository: spark Updated Branches: refs/heads/branch-2.4 51d5378f8 -> ec384284e
[SPARK-25460][BRANCH-2.4][SS] DataSourceV2: SS sources do not respect SessionConfigSupport ## What changes were proposed in this pull request? This PR proposes to backport SPARK-25460 to branch-2.4: This PR proposes to respect `SessionConfigSupport` in SS datasources as well. Currently these are only respected in batch sources: https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L198-L203 https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L244-L249 If a developer makes a datasource V2 that supports both structured streaming and batch jobs, batch jobs respect a specific configuration, let's say, URL to connect and fetch data (which end users might not be aware of); however, structured streaming ends up with not supporting this (and should explicitly be set into options). ## How was this patch tested? Unit tests were added. Closes #22529 from HyukjinKwon/SPARK-25460-backport. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec384284 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec384284 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec384284 Branch: refs/heads/branch-2.4 Commit: ec384284eb427d7573bd94c707777e23e4137971 Parents: 51d5378 Author: hyukjinkwon <[email protected]> Authored: Mon Sep 24 08:49:19 2018 -0700 Committer: Dongjoon Hyun <[email protected]> Committed: Mon Sep 24 08:49:19 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/streaming/DataStreamReader.scala | 18 ++- .../spark/sql/streaming/DataStreamWriter.scala | 16 ++- .../sources/StreamingDataSourceV2Suite.scala | 118 ++++++++++++++++--- 3 files changed, 125 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 7eb5db5..a9cb5e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -26,6 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport} @@ -158,7 +159,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo } val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance() - val options = new DataSourceOptions(extraOptions.asJava) // We need to generate the V1 data source so we can pass it to the V2 relation as a shim. // We can't be sure at this point whether we'll actually want to use V2, since we don't know the // writer or whether the query is continuous. @@ -173,12 +173,16 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo } ds match { case s: MicroBatchReadSupport => + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + ds = s, conf = sparkSession.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dataSourceOptions = new DataSourceOptions(options.asJava) var tempReader: MicroBatchReader = null val schema = try { tempReader = s.createMicroBatchReader( Optional.ofNullable(userSpecifiedSchema.orNull), Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, - options) + dataSourceOptions) tempReader.readSchema() } finally { // Stop tempReader to avoid side-effect thing @@ -190,17 +194,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo Dataset.ofRows( sparkSession, StreamingRelationV2( - s, source, extraOptions.toMap, + s, source, options, schema.toAttributes, v1Relation)(sparkSession)) case s: ContinuousReadSupport => + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + ds = s, conf = sparkSession.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dataSourceOptions = new DataSourceOptions(options.asJava) val tempReader = s.createContinuousReader( Optional.ofNullable(userSpecifiedSchema.orNull), Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, - options) + dataSourceOptions) Dataset.ofRows( sparkSession, StreamingRelationV2( - s, source, extraOptions.toMap, + s, source, options, tempReader.readSchema().toAttributes, v1Relation)(sparkSession)) case _ => // Code path for data source v1. http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 3b9a56f..735fd17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.execution.streaming.sources._ @@ -298,22 +299,27 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } else { val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") + var options = extraOptions.toMap val sink = ds.newInstance() match { - case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w + case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + w, df.sparkSession.sessionState.conf) + options = sessionOptions ++ extraOptions + w case _ => val ds = DataSource( df.sparkSession, className = source, - options = extraOptions.toMap, + options = options, partitionColumns = normalizedParCols.getOrElse(Nil)) ds.createSink(outputMode) } df.sparkSession.sessionState.streamingQueryManager.startQuery( - extraOptions.get("queryName"), - extraOptions.get("checkpointLocation"), + options.get("queryName"), + options.get("checkpointLocation"), df, - extraOptions.toMap, + options, sink, outputMode, useTempCheckpointLocation = source == "console", http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 52b833a..2565cd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, Streami import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} -import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport} +import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter -import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -54,14 +54,20 @@ trait FakeMicroBatchReadSupport extends MicroBatchReadSupport { override def createMicroBatchReader( schema: Optional[StructType], checkpointLocation: String, - options: DataSourceOptions): MicroBatchReader = FakeReader() + options: DataSourceOptions): MicroBatchReader = { + LastReadOptions.options = options + FakeReader() + } } trait FakeContinuousReadSupport extends ContinuousReadSupport { override def createContinuousReader( schema: Optional[StructType], checkpointLocation: String, - options: DataSourceOptions): ContinuousReader = FakeReader() + options: DataSourceOptions): ContinuousReader = { + LastReadOptions.options = options + FakeReader() + } } trait FakeStreamWriteSupport extends StreamWriteSupport { @@ -70,16 +76,27 @@ trait FakeStreamWriteSupport extends StreamWriteSupport { schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = { + LastWriteOptions.options = options throw new IllegalStateException("fake sink - cannot actually write") } } -class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupport { +class FakeReadMicroBatchOnly + extends DataSourceRegister + with FakeMicroBatchReadSupport + with SessionConfigSupport { override def shortName(): String = "fake-read-microbatch-only" + + override def keyPrefix: String = shortName() } -class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupport { +class FakeReadContinuousOnly + extends DataSourceRegister + with FakeContinuousReadSupport + with SessionConfigSupport { override def shortName(): String = "fake-read-continuous-only" + + override def keyPrefix: String = shortName() } class FakeReadBothModes extends DataSourceRegister @@ -91,8 +108,13 @@ class FakeReadNeitherMode extends DataSourceRegister { override def shortName(): String = "fake-read-neither-mode" } -class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport { +class FakeWrite + extends DataSourceRegister + with FakeStreamWriteSupport + with SessionConfigSupport { override def shortName(): String = "fake-write-microbatch-continuous" + + override def keyPrefix: String = shortName() } class FakeNoWrite extends DataSourceRegister { @@ -120,6 +142,21 @@ class FakeWriteV1Fallback extends DataSourceRegister override def shortName(): String = "fake-write-v1-fallback" } +object LastReadOptions { + var options: DataSourceOptions = _ + + def clear(): Unit = { + options = null + } +} + +object LastWriteOptions { + var options: DataSourceOptions = _ + + def clear(): Unit = { + options = null + } +} class StreamingDataSourceV2Suite extends StreamTest { @@ -129,6 +166,11 @@ class StreamingDataSourceV2Suite extends StreamTest { spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath) } + override def afterEach(): Unit = { + LastReadOptions.clear() + LastWriteOptions.clear() + } + val readFormats = Seq( "fake-read-microbatch-only", "fake-read-continuous-only", @@ -142,7 +184,14 @@ class StreamingDataSourceV2Suite extends StreamTest { Trigger.ProcessingTime(1000), Trigger.Continuous(1000)) - private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger) = { + private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger): Unit = { + testPositiveCaseWithQuery(readFormat, writeFormat, trigger)(() => _) + } + + private def testPositiveCaseWithQuery( + readFormat: String, + writeFormat: String, + trigger: Trigger)(check: StreamingQuery => Unit): Unit = { val query = spark.readStream .format(readFormat) .load() @@ -150,8 +199,8 @@ class StreamingDataSourceV2Suite extends StreamTest { .format(writeFormat) .trigger(trigger) .start() + check(query) query.stop() - query } private def testNegativeCase( @@ -187,19 +236,54 @@ class StreamingDataSourceV2Suite extends StreamTest { test("disabled v2 write") { // Ensure the V2 path works normally and generates a V2 sink.. - val v2Query = testPositiveCase( - "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) - assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink - .isInstanceOf[FakeWriteV1Fallback]) + testPositiveCaseWithQuery( + "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query => + assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink + .isInstanceOf[FakeWriteV1Fallback]) + } // Ensure we create a V1 sink with the config. Note the config is a comma separated // list, including other fake entries. val fullSinkName = "org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback" withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") { - val v1Query = testPositiveCase( - "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) - assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink - .isInstanceOf[FakeSink]) + testPositiveCaseWithQuery( + "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v1Query => + assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink + .isInstanceOf[FakeSink]) + } + } + } + + Seq( + Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.Once()), + Tuple2(classOf[FakeReadContinuousOnly], Trigger.Continuous(1000)) + ).foreach { case (source, trigger) => + test(s"SPARK-25460: session options are respected in structured streaming sources - $source") { + // `keyPrefix` and `shortName` are the same in this test case + val readSource = source.newInstance().shortName() + val writeSource = "fake-write-microbatch-continuous" + + val readOptionName = "optionA" + withSQLConf(s"spark.datasource.$readSource.$readOptionName" -> "true") { + testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ => + eventually(timeout(streamingTimeout)) { + // Write options should not be set. + assert(LastWriteOptions.options.getBoolean(readOptionName, false) == false) + assert(LastReadOptions.options.getBoolean(readOptionName, false) == true) + } + } + } + + val writeOptionName = "optionB" + withSQLConf(s"spark.datasource.$writeSource.$writeOptionName" -> "true") { + testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ => + eventually(timeout(streamingTimeout)) { + // Read options should not be set. + assert(LastReadOptions.options.getBoolean(writeOptionName, false) == false) + assert(LastWriteOptions.options.getBoolean(writeOptionName, false) == true) + } + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
