This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c37a8a  [SPARK-30143][SS] Add a timeout on stopping a streaming query
4c37a8a is described below

commit 4c37a8a3f4a489b52f1919d2db84f6e32c6a05cd
Author: Burak Yavuz <brk...@gmail.com>
AuthorDate: Fri Dec 13 15:16:00 2019 -0800

    [SPARK-30143][SS] Add a timeout on stopping a streaming query
    
    ### What changes were proposed in this pull request?
    
    Add a timeout configuration for StreamingQuery.stop()
    
    ### Why are the changes needed?
    
    The stop() method on a Streaming Query awaits the termination of the stream 
execution thread. However, the stream execution thread may block forever 
depending on the streaming source implementation (like in Kafka, which runs 
UninterruptibleThreads).
    
    This causes control flow applications to hang indefinitely as well. We'd 
like to introduce a timeout to stop the execution thread, so that the control 
flow thread can decide to do an action if a timeout is hit.
    
    ### Does this PR introduce any user-facing change?
    
    By default, no. If the timeout configuration is set, then a 
TimeoutException will be thrown if a stream cannot be stopped within the given 
timeout.
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #26771 from brkyvz/stopTimeout.
    
    Lead-authored-by: Burak Yavuz <brk...@gmail.com>
    Co-authored-by: Burak Yavuz <bu...@databricks.com>
    Signed-off-by: Burak Yavuz <brk...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |   7 ++
 .../execution/streaming/MicroBatchExecution.scala  |   3 +-
 .../sql/execution/streaming/StreamExecution.scala  |  26 +++-
 .../streaming/continuous/ContinuousExecution.scala |   3 +-
 .../spark/sql/streaming/DataStreamWriter.scala     |  11 +-
 .../spark/sql/streaming/StreamingQuery.scala       |  12 +-
 .../sql/streaming/StreamingQueryManager.scala      |   3 +-
 .../streaming/JavaDataStreamReaderWriterSuite.java |   5 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |  35 +++++-
 .../sql/streaming/util/BlockOnStopSource.scala     | 132 +++++++++++++++++++++
 10 files changed, 224 insertions(+), 13 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c54008c..91347cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1298,6 +1298,13 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(10L)
 
+  val STREAMING_STOP_TIMEOUT =
+    buildConf("spark.sql.streaming.stopTimeout")
+      .doc("How long to wait for the streaming execution thread to stop when 
calling the " +
+        "streaming query's stop() method in milliseconds. 0 or negative values 
wait indefinitely.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(0L)
+
   val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
     buildConf("spark.sql.streaming.noDataProgressEventInterval")
       .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 5fe1f92..872c367 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -150,8 +150,7 @@ class MicroBatchExecution(
     state.set(TERMINATED)
     if (queryExecutionThread.isAlive) {
       sparkSession.sparkContext.cancelJobGroup(runId.toString)
-      queryExecutionThread.interrupt()
-      queryExecutionThread.join()
+      interruptAndAwaitExecutionThreadTermination()
       // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
       sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f470ad3..1cb3955 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.io.{InterruptedIOException, IOException, UncheckedIOException}
 import java.nio.channels.ClosedByInterruptException
 import java.util.UUID
-import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, 
TimeoutException, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantLock
 
@@ -435,6 +435,30 @@ abstract class StreamExecution(
   }
 
   /**
+   * Interrupts the query execution thread and awaits its termination until 
until it exceeds the
+   * timeout. The timeout can be set on "spark.sql.streaming.stopTimeout".
+   *
+   * @throws TimeoutException If the thread cannot be stopped within the 
timeout
+   */
+  @throws[TimeoutException]
+  protected def interruptAndAwaitExecutionThreadTermination(): Unit = {
+    val timeout = math.max(
+      sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT), 
0)
+    queryExecutionThread.interrupt()
+    queryExecutionThread.join(timeout)
+    if (queryExecutionThread.isAlive) {
+      val stackTraceException = new SparkException("The stream thread was last 
executing:")
+      stackTraceException.setStackTrace(queryExecutionThread.getStackTrace)
+      val timeoutException = new TimeoutException(
+        s"Stream Execution thread failed to stop within $timeout milliseconds 
(specified by " +
+        s"${SQLConf.STREAMING_STOP_TIMEOUT.key}). See the cause on what was " +
+        "being executed in the streaming query thread.")
+      timeoutException.initCause(stackTraceException)
+      throw timeoutException
+    }
+  }
+
+  /**
    * Blocks the current thread until processing for data from the given 
`source` has reached at
    * least the given `Offset`. This method is intended for use primarily when 
writing tests.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 8c7371e..481552a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -427,8 +427,7 @@ class ContinuousExecution(
     if (queryExecutionThread.isAlive) {
       // The query execution thread will clean itself up in the finally clause 
of runContinuous.
       // We just need to interrupt the long running job.
-      queryExecutionThread.interrupt()
-      queryExecutionThread.join()
+      interruptAndAwaitExecutionThreadTermination()
     }
     logInfo(s"Query $prettyIdString was stopped")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 74170b1..62a1add 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.util.Locale
+import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
@@ -238,10 +239,18 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
   /**
    * Starts the execution of the streaming query, which will continually 
output results to the given
    * path as new data arrives. The returned [[StreamingQuery]] object can be 
used to interact with
-   * the stream.
+   * the stream. Throws a `TimeoutException` if the following conditions are 
met:
+   *  - Another run of the same streaming query, that is a streaming query
+   *    sharing the same checkpoint location, is already active on the same
+   *    Spark Driver
+   *  - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart`
+   *    is enabled
+   *  - The active run cannot be stopped within the timeout controlled by
+   *    the SQL configuration `spark.sql.streaming.stopTimeout`
    *
    * @since 2.0.0
    */
+  @throws[TimeoutException]
   def start(): StreamingQuery = {
     if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
       throw new AnalysisException("Hive data source can only be used with 
tables, you can not " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 47ddc88..85d980e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.util.UUID
+import java.util.concurrent.TimeoutException
 
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.sql.SparkSession
@@ -142,10 +143,17 @@ trait StreamingQuery {
   def processAllAvailable(): Unit
 
   /**
-   * Stops the execution of this query if it is running. This method blocks 
until the threads
-   * performing execution has stopped.
+   * Stops the execution of this query if it is running. This waits until the 
termination of the
+   * query execution threads or until a timeout is hit.
+   *
+   * By default stop will block indefinitely. You can configure a timeout by 
the configuration
+   * `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) 
milliseconds will block
+   * indefinitely. If a `TimeoutException` is thrown, users can retry stopping 
the stream. If the
+   * issue persists, it is advisable to kill the Spark application.
+   *
    * @since 2.0.0
    */
+  @throws[TimeoutException]
   def stop(): Unit
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index e64f67c..810f4a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.util.{ConcurrentModificationException, UUID}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{TimeoutException, TimeUnit}
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
@@ -321,6 +321,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
    * @param trigger [[Trigger]] for the query.
    * @param triggerClock [[Clock]] to use for the triggering.
    */
+  @throws[TimeoutException]
   private[sql] def startQuery(
       userSpecifiedName: Option[String],
       userSpecifiedCheckpointLocation: Option[String],
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
index 48cdb26..5903623 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
@@ -18,6 +18,7 @@
 package test.org.apache.spark.sql.streaming;
 
 import java.io.File;
+import java.util.concurrent.TimeoutException;
 
 import org.junit.After;
 import org.junit.Before;
@@ -52,7 +53,7 @@ public class JavaDataStreamReaderWriterSuite {
   }
 
   @Test
-  public void testForeachBatchAPI() {
+  public void testForeachBatchAPI() throws TimeoutException {
     StreamingQuery query = spark
       .readStream()
       .textFile(input)
@@ -66,7 +67,7 @@ public class JavaDataStreamReaderWriterSuite {
   }
 
   @Test
-  public void testForeachAPI() {
+  public void testForeachAPI() throws TimeoutException {
     StreamingQuery query = spark
       .readStream()
       .textFile(input)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index a637b42..297d6c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.streaming
 
 import java.io.{File, InterruptedIOException, IOException, 
UncheckedIOException}
 import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.{CountDownLatch, ExecutionException, 
TimeoutException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 
+import scala.concurrent.TimeoutException
 import scala.reflect.ClassTag
 import scala.util.control.ControlThrowable
 
@@ -42,7 +43,7 @@ import 
org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreCon
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.streaming.util.{BlockOnStopSourceProvider, 
StreamManualClock}
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
@@ -1125,6 +1126,36 @@ class StreamSuite extends StreamTest {
       }
     )
   }
+
+  // ProcessingTime trigger generates MicroBatchExecution, and 
ContinuousTrigger starts a
+  // ContinuousExecution
+  Seq(Trigger.ProcessingTime("1 second"), Trigger.Continuous("1 
second")).foreach { trigger =>
+    test(s"SPARK-30143: stop waits until timeout if blocked - trigger: 
$trigger") {
+      BlockOnStopSourceProvider.enableBlocking()
+      val sq = 
spark.readStream.format(classOf[BlockOnStopSourceProvider].getName)
+        .load()
+        .writeStream
+        .format("console")
+        .trigger(trigger)
+        .start()
+      failAfter(60.seconds) {
+        val startTime = System.nanoTime()
+        withSQLConf(SQLConf.STREAMING_STOP_TIMEOUT.key -> "2000") {
+          intercept[TimeoutException] {
+            sq.stop()
+          }
+        }
+        val duration = (System.nanoTime() - startTime) / 1e6
+        assert(duration >= 2000,
+          s"Should have waited more than 2000 millis, but waited $duration 
millis")
+
+        BlockOnStopSourceProvider.disableBlocking()
+        withSQLConf(SQLConf.STREAMING_STOP_TIMEOUT.key -> "0") {
+          sq.stop()
+        }
+      }
+    }
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala
new file mode 100644
index 0000000..f25758c
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import java.util
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.JavaConverters._
+
+import org.apache.zookeeper.KeeperException.UnimplementedException
+
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, 
TableCapability, TableProvider}
+import org.apache.spark.sql.connector.catalog.TableCapability.CONTINUOUS_READ
+import org.apache.spark.sql.connector.read.{streaming, InputPartition, Scan, 
ScanBuilder}
+import 
org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReaderFactory,
 ContinuousStream, PartitionOffset}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
+import org.apache.spark.sql.sources.StreamSourceProvider
+import org.apache.spark.sql.types.{LongType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** The V1 and V2 provider of a streaming source, which blocks indefinitely on 
the call of stop() */
+object BlockOnStopSourceProvider {
+  private var _latch: CountDownLatch = _
+  val schema: StructType = new StructType().add("id", LongType)
+
+  /** Set the latch that we will use to block the streaming query thread. */
+  def enableBlocking(): Unit = {
+    if (_latch == null || _latch.getCount == 0) {
+      _latch = new CountDownLatch(1)
+    }
+  }
+
+  def disableBlocking(): Unit = {
+    if (_latch != null) {
+      _latch.countDown()
+      _latch = null
+    }
+  }
+}
+
+class BlockOnStopSourceProvider extends StreamSourceProvider with 
TableProvider {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    new BlockOnStopSourceTable(BlockOnStopSourceProvider._latch)
+  }
+
+  override def sourceSchema(
+      sqlContext: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = {
+    "blockingSource" -> BlockOnStopSourceProvider.schema
+  }
+
+  override def createSource(
+      sqlContext: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    new BlockOnStopSource(sqlContext.sparkSession, 
BlockOnStopSourceProvider._latch)
+  }
+}
+
+/** A V1 Streaming Source which blocks on stop(). It does not produce any 
data. */
+class BlockOnStopSource(spark: SparkSession, latch: CountDownLatch) extends 
Source {
+  // Blocks until latch countdowns
+  override def stop(): Unit = latch.await()
+
+  // Boiler-plate
+  override val schema: StructType = BlockOnStopSourceProvider.schema
+  override def getOffset: Option[Offset] = Some(LongOffset(0))
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
+  }
+}
+
+/** A V2 Table, which can create a blocking streaming source for 
ContinuousExecution. */
+class BlockOnStopSourceTable(latch: CountDownLatch) extends Table with 
SupportsRead {
+  override def schema(): StructType = BlockOnStopSourceProvider.schema
+
+  override def name(): String = "blockingSource"
+
+  override def capabilities(): util.Set[TableCapability] = 
Set(CONTINUOUS_READ).asJava
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    new ScanBuilder {
+      override def build(): Scan = new Scan {
+        override def readSchema(): StructType = schema()
+
+        override def toContinuousStream(checkpointLocation: String): 
ContinuousStream = {
+          new BlockOnStopContinuousStream(latch)
+        }
+      }
+    }
+  }
+}
+
+/**
+ * A V2 Streaming Source which blocks on stop(). It does not produce any data. 
We use this for
+ * testing stopping in ContinuousExecution.
+ */
+class BlockOnStopContinuousStream(latch: CountDownLatch) extends 
ContinuousStream {
+
+  // Blocks until latch countdowns
+  override def stop(): Unit = latch.await()
+
+  // Boiler-plate
+  override def planInputPartitions(start: streaming.Offset): 
Array[InputPartition] = Array.empty
+  override def mergeOffsets(offsets: Array[PartitionOffset]): streaming.Offset 
= LongOffset(0L)
+  override def deserializeOffset(json: String): streaming.Offset = 
LongOffset(0L)
+  override def initialOffset(): Offset = LongOffset(0)
+  override def commit(end: streaming.Offset): Unit = {}
+  override def createContinuousReaderFactory(): 
ContinuousPartitionReaderFactory = {
+    throw new UnimplementedException
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to