Repository: spark
Updated Branches:
  refs/heads/master 68ec4d641 -> 310632498


[SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky 
test in FlatMapGroupsWithState

## What changes were proposed in this pull request?

The race condition that caused test failure is between 2 threads.
- The MicrobatchExecution thread that processes inputs to produce answers and 
then generates progress events.
- The test thread that generates some input data, checked the answer and then 
verified the query generated progress event.

The synchronization structure between these threads is as follows
1. MicrobatchExecution thread, in every batch, does the following in order.
   a. Processes batch input to generate answer.
   b. Signals `awaitProgressLockCondition` to wake up threads waiting for 
progress using `awaitOffset`
   c. Generates progress event

2. Test execution thread
   a. Calls `awaitOffset` to wait for progress, which waits on 
`awaitProgressLockCondition`.
   b. As soon as `awaitProgressLockCondition` is signaled, it would move on the 
in the test to check answer.
  c. Finally, it would verify the last generated progress event.

What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c 
-> 1c.
In other words, the progress event may be generated after the test tries to 
verify it.

The solution has two steps.
1. Signal the waiting thread after the progress event has been generated, that 
is, after `finishTrigger()`.
2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a 
large value.

This latter is to ensure that test thread for keeps waiting on 
`awaitProgressLockCondition`until the MicroBatchExecution thread explicitly 
signals it. With the existing small timeout of 100ms the following sequence can 
occur.
 - MicroBatchExecution thread updates committed offsets
 - Test thread waiting on `awaitProgressLockCondition` accidentally times out 
after 100 ms, finds that the committed offsets have been updated, therefore 
returns from `awaitOffset` and moves on to the progress event tests.
 - MicroBatchExecution thread then generates progress event and signals. But 
the test thread has already attempted to verify the event and failed.

By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 seconds`, 
similar to `awaitInitialization`), this above type of race condition is also 
avoided.

## How was this patch tested?
Ran locally many times.

Closes #22182 from tdas/SPARK-25184.

Authored-by: Tathagata Das <tathagata.das1...@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31063249
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31063249
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31063249

Branch: refs/heads/master
Commit: 3106324986612800240bc8c945be90c4cb368d79
Parents: 68ec4d6
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Aug 22 12:22:53 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Aug 22 12:22:53 2018 -0700

----------------------------------------------------------------------
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  3 +-
 .../streaming/MicroBatchExecution.scala         |  5 ++-
 .../execution/streaming/StreamExecution.scala   |  4 +-
 .../sql/streaming/StateStoreMetricsTest.scala   | 44 +++++++++++---------
 .../apache/spark/sql/streaming/StreamTest.scala |  2 +-
 5 files changed, 33 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 946b636..c9c5250 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -970,7 +970,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest 
{
       makeSureGetOffsetCalled,
       Execute { q =>
         // wait to reach the last offset in every partition
-        q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 
3L)))
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)), 
streamingTimeout.toMillis)
       },
       CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
       StopStream,

http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index cf83ba7..b1cafd6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -200,6 +200,10 @@ class MicroBatchExecution(
 
         finishTrigger(currentBatchHasNewData)  // Must be outside 
reportTimeTaken so it is recorded
 
+        // Signal waiting threads. Note this must be after finishTrigger() to 
ensure all
+        // activities (progress generation, etc.) have completed before 
signaling.
+        withProgressLocked { awaitProgressLockCondition.signalAll() }
+
         // If the current batch has been executed, then increment the batch id 
and reset flag.
         // Otherwise, there was no data to execute the batch and sleep for 
some time
         if (isCurrentBatchConstructed) {
@@ -538,7 +542,6 @@ class MicroBatchExecution(
       watermarkTracker.updateWatermark(lastExecution.executedPlan)
       commitLog.add(currentBatchId, 
CommitMetadata(watermarkTracker.currentWatermark))
       committedOffsets ++= availableOffsets
-      awaitProgressLockCondition.signalAll()
     }
     logDebug(s"Completed batch ${currentBatchId}")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 290de87..a39bb71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -382,7 +382,7 @@ abstract class StreamExecution(
    * Blocks the current thread until processing for data from the given 
`source` has reached at
    * least the given `Offset`. This method is intended for use primarily when 
writing tests.
    */
-  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit = {
+  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset, timeoutMs: 
Long): Unit = {
     assertAwaitThread()
     def notDone = {
       val localCommittedOffsets = committedOffsets
@@ -398,7 +398,7 @@ abstract class StreamExecution(
     while (notDone) {
       awaitProgressLock.lock()
       try {
-        awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS)
+        awaitProgressLockCondition.await(timeoutMs, TimeUnit.MILLISECONDS)
         if (streamDeathCause != null) {
           throw streamDeathCause
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
index e45f9d3..fb5d13d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
@@ -31,33 +31,37 @@ trait StateStoreMetricsTest extends StreamTest {
 
   def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery =
     AssertOnQuery(s"Check total state rows = $total, updated state rows = 
$updated") { q =>
-      val recentProgress = q.recentProgress
-      require(recentProgress.nonEmpty, "No progress made, cannot check num 
state rows")
-      require(recentProgress.length < 
spark.sessionState.conf.streamingProgressRetention,
-        "This test assumes that all progresses are present in q.recentProgress 
but " +
-          "some may have been dropped due to retention limits")
+      // This assumes that the streaming query will not make any progress 
while the eventually
+      // is being executed.
+      eventually(timeout(streamingTimeout)) {
+        val recentProgress = q.recentProgress
+        require(recentProgress.nonEmpty, "No progress made, cannot check num 
state rows")
+        require(recentProgress.length < 
spark.sessionState.conf.streamingProgressRetention,
+          "This test assumes that all progresses are present in 
q.recentProgress but " +
+            "some may have been dropped due to retention limits")
 
-      if (q.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
-      lastQuery = q
+        if (q.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
+        lastQuery = q
 
-      val numStateOperators = recentProgress.last.stateOperators.length
-      val progressesSinceLastCheck = recentProgress
-        .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
-        .filter(_.stateOperators.length == numStateOperators)
+        val numStateOperators = recentProgress.last.stateOperators.length
+        val progressesSinceLastCheck = recentProgress
+          .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
+          .filter(_.stateOperators.length == numStateOperators)
 
-      val allNumUpdatedRowsSinceLastCheck =
-        progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
+        val allNumUpdatedRowsSinceLastCheck =
+          progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
 
-      lazy val debugString = "recent progresses:\n" +
-        progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
+        lazy val debugString = "recent progresses:\n" +
+          progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
 
-      val numTotalRows = recentProgress.last.stateOperators.map(_.numRowsTotal)
-      assert(numTotalRows === total, s"incorrect total rows, $debugString")
+        val numTotalRows = 
recentProgress.last.stateOperators.map(_.numRowsTotal)
+        assert(numTotalRows === total, s"incorrect total rows, $debugString")
 
-      val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, 
numStateOperators)
-      assert(numUpdatedRows === updated, s"incorrect updates rows, 
$debugString")
+        val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, 
numStateOperators)
+        assert(numUpdatedRows === updated, s"incorrect updates rows, 
$debugString")
 
-      lastCheckedRecentProgressIndex = recentProgress.length - 1
+        lastCheckedRecentProgressIndex = recentProgress.length - 1
+      }
       true
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index b528006..cd9b892 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -467,7 +467,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
       // Block until all data added has been processed for all the source
       awaiting.foreach { case (sourceIndex, offset) =>
         failAfter(streamingTimeout) {
-          currentStream.awaitOffset(sourceIndex, offset)
+          currentStream.awaitOffset(sourceIndex, offset, 
streamingTimeout.toMillis)
           // Make sure all processing including no-data-batches have been 
executed
           if (!currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
             currentStream.processAllAvailable()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to