Repository: spark
Updated Branches:
  refs/heads/branch-2.1 8bf56cc46 -> b020ce408


[SPARK-18811] StreamSource resolution should happen in stream execution thread

## What changes were proposed in this pull request?

When you start a stream, if we are trying to resolve the source of the stream, 
for example if we need to resolve partition columns, this could take a long 
time. This long execution time should not block the main thread where 
`query.start()` was called on. It should happen in the stream execution thread 
possibly before starting any triggers.

## How was this patch tested?

Unit test added. Made sure test fails with no code changes.

Author: Burak Yavuz <brk...@gmail.com>

Closes #16238 from brkyvz/SPARK-18811.

(cherry picked from commit 63c9159870ee274c68e24360594ca01d476b9ace)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b020ce40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b020ce40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b020ce40

Branch: refs/heads/branch-2.1
Commit: b020ce408507d7fd57f6d357054a2b3530a5b95e
Parents: 8bf56cc
Author: Burak Yavuz <brk...@gmail.com>
Authored: Fri Dec 9 22:49:51 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Dec 9 22:50:10 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 24 ++++++-
 .../sql/streaming/StreamingQueryManager.scala   | 14 +----
 .../streaming/StreamingQueryManagerSuite.scala  | 28 +++++++++
 .../sql/streaming/util/DefaultSource.scala      | 66 ++++++++++++++++++++
 4 files changed, 116 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 39be222..b52810d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -47,7 +47,7 @@ class StreamExecution(
     override val sparkSession: SparkSession,
     override val name: String,
     checkpointRoot: String,
-    val logicalPlan: LogicalPlan,
+    analyzedPlan: LogicalPlan,
     val sink: Sink,
     val trigger: Trigger,
     val triggerClock: Clock,
@@ -115,12 +115,26 @@ class StreamExecution(
   private val prettyIdString =
     Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  override lazy val logicalPlan: LogicalPlan = {
+    var nextSourceId = 0L
+    analyzedPlan.transform {
+      case StreamingRelation(dataSource, _, output) =>
+        // Materialize source to avoid creating it in every batch
+        val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+        val source = dataSource.createSource(metadataPath)
+        nextSourceId += 1
+        // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+        // "df.logicalPlan" has already used attributes of the previous 
`output`.
+        StreamingExecutionRelation(source, output)
+    }
+  }
+
   /** All stream sources present in the query plan. */
-  protected val sources =
+  protected lazy val sources =
     logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
-  private val uniqueSources = sources.distinct
+  private lazy val uniqueSources = sources.distinct
 
   private val triggerExecutor = trigger match {
     case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -214,6 +228,10 @@ class StreamExecution(
       // While active, repeatedly attempt to run batches.
       SparkSession.setActiveSession(sparkSession)
 
+      updateStatusMessage("Initializing sources")
+      // force initialization of the logical plan so that the sources can be 
created
+      logicalPlan
+
       triggerExecutor.execute(() => {
         startTrigger()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c6ab416..52d0791 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
         UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
       }
 
-      var nextSourceId = 0L
-
-      val logicalPlan = analyzedPlan.transform {
-        case StreamingRelation(dataSource, _, output) =>
-          // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
-          val source = dataSource.createSource(metadataPath)
-          nextSourceId += 1
-          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
-          // "df.logicalPlan" has already used attributes of the previous 
`output`.
-          StreamingExecutionRelation(source, output)
-      }
       val query = new StreamExecution(
         sparkSession,
         name,
         checkpointLocation,
-        logicalPlan,
+        analyzedPlan,
         sink,
         trigger,
         triggerClock,

http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 268b8ff..d188319 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.concurrent.CountDownLatch
+
 import scala.concurrent.Future
 import scala.util.Random
 import scala.util.control.NonFatal
@@ -213,6 +215,28 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
+  test("SPARK-18811: Source resolution should not block main thread") {
+    failAfter(streamingTimeout) {
+      StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+      withTempDir { tempDir =>
+        // if source resolution was happening on the main thread, it would 
block the start call,
+        // now it should only be blocking the stream execution thread
+        val sq = spark.readStream
+          .format("org.apache.spark.sql.streaming.util.BlockingSource")
+          .load()
+          .writeStream
+          .format("org.apache.spark.sql.streaming.util.BlockingSource")
+          .option("checkpointLocation", tempDir.toString)
+          .start()
+        eventually(Timeout(streamingTimeout)) {
+          assert(sq.status.message.contains("Initializing sources"))
+        }
+        StreamingQueryManagerSuite.latch.countDown()
+        sq.stop()
+      }
+    }
+  }
+
 
   /** Run a body of code by defining a query on each dataset */
   private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] 
=> Unit): Unit = {
@@ -297,3 +321,7 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
     (inputData, mapped)
   }
 }
+
+object StreamingQueryManagerSuite {
+  var latch: CountDownLatch = null
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
new file mode 100644
index 0000000..b0adf76
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, 
Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking `createSource` 
call. */
+class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
+
+  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+  override def sourceSchema(
+      spark: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = {
+    ("dummySource", fakeSchema)
+  }
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    StreamingQueryManagerSuite.latch.await()
+    new Source {
+      override def schema: StructType = fakeSchema
+      override def getOffset: Option[Offset] = Some(new LongOffset(0))
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        import spark.implicits._
+        Seq[Int]().toDS().toDF()
+      }
+      override def stop() {}
+    }
+  }
+
+  override def createSink(
+      spark: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
+    new Sink {
+      override def addBatch(batchId: Long, data: DataFrame): Unit = {}
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to