Repository: spark Updated Branches: refs/heads/branch-2.1 8bf56cc46 -> b020ce408
[SPARK-18811] StreamSource resolution should happen in stream execution thread ## What changes were proposed in this pull request? When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers. ## How was this patch tested? Unit test added. Made sure test fails with no code changes. Author: Burak Yavuz <brk...@gmail.com> Closes #16238 from brkyvz/SPARK-18811. (cherry picked from commit 63c9159870ee274c68e24360594ca01d476b9ace) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b020ce40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b020ce40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b020ce40 Branch: refs/heads/branch-2.1 Commit: b020ce408507d7fd57f6d357054a2b3530a5b95e Parents: 8bf56cc Author: Burak Yavuz <brk...@gmail.com> Authored: Fri Dec 9 22:49:51 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Fri Dec 9 22:50:10 2016 -0800 ---------------------------------------------------------------------- .../execution/streaming/StreamExecution.scala | 24 ++++++- .../sql/streaming/StreamingQueryManager.scala | 14 +---- .../streaming/StreamingQueryManagerSuite.scala | 28 +++++++++ .../sql/streaming/util/DefaultSource.scala | 66 ++++++++++++++++++++ 4 files changed, 116 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 39be222..b52810d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -47,7 +47,7 @@ class StreamExecution( override val sparkSession: SparkSession, override val name: String, checkpointRoot: String, - val logicalPlan: LogicalPlan, + analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, @@ -115,12 +115,26 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + override lazy val logicalPlan: LogicalPlan = { + var nextSourceId = 0L + analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => + // Materialize source to avoid creating it in every batch + val metadataPath = s"$checkpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output) + } + } + /** All stream sources present in the query plan. */ - protected val sources = + protected lazy val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ - private val uniqueSources = sources.distinct + private lazy val uniqueSources = sources.distinct private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) @@ -214,6 +228,10 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) + updateStatusMessage("Initializing sources") + // force initialization of the logical plan so that the sources can be created + logicalPlan + triggerExecutor.execute(() => { startTrigger() http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c6ab416..52d0791 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) } - var nextSourceId = 0L - - val logicalPlan = analyzedPlan.transform { - case StreamingRelation(dataSource, _, output) => - // Materialize source to avoid creating it in every batch - val metadataPath = s"$checkpointLocation/sources/$nextSourceId" - val source = dataSource.createSource(metadataPath) - nextSourceId += 1 - // We still need to use the previous `output` instead of `source.schema` as attributes in - // "df.logicalPlan" has already used attributes of the previous `output`. - StreamingExecutionRelation(source, output) - } val query = new StreamExecution( sparkSession, name, checkpointLocation, - logicalPlan, + analyzedPlan, sink, trigger, triggerClock, http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 268b8ff..d188319 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import java.util.concurrent.CountDownLatch + import scala.concurrent.Future import scala.util.Random import scala.util.control.NonFatal @@ -213,6 +215,28 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } + test("SPARK-18811: Source resolution should not block main thread") { + failAfter(streamingTimeout) { + StreamingQueryManagerSuite.latch = new CountDownLatch(1) + withTempDir { tempDir => + // if source resolution was happening on the main thread, it would block the start call, + // now it should only be blocking the stream execution thread + val sq = spark.readStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .load() + .writeStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .option("checkpointLocation", tempDir.toString) + .start() + eventually(Timeout(streamingTimeout)) { + assert(sq.status.message.contains("Initializing sources")) + } + StreamingQueryManagerSuite.latch.countDown() + sq.stop() + } + } + } + /** Run a body of code by defining a query on each dataset */ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = { @@ -297,3 +321,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { (inputData, mapped) } } + +object StreamingQueryManagerSuite { + var latch: CountDownLatch = null +} http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala new file mode 100644 index 0000000..b0adf76 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.util + +import org.apache.spark.sql.{SQLContext, _} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} +import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */ +class BlockingSource extends StreamSourceProvider with StreamSinkProvider { + + private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + + override def sourceSchema( + spark: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + ("dummySource", fakeSchema) + } + + override def createSource( + spark: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + StreamingQueryManagerSuite.latch.await() + new Source { + override def schema: StructType = fakeSchema + override def getOffset: Option[Offset] = Some(new LongOffset(0)) + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + import spark.implicits._ + Seq[Int]().toDS().toDF() + } + override def stop() {} + } + } + + override def createSink( + spark: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + new Sink { + override def addBatch(batchId: Long, data: DataFrame): Unit = {} + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org