Repository: spark
Updated Branches:
  refs/heads/master 1c5a7d7f6 -> 7d878cf2d


[SQL][STREAMING][TEST] Fix flaky tests in StreamingQueryListenerSuite

This work has largely been done by lw-lin in his PR #15497. This is a slight 
refactoring of it.

## What changes were proposed in this pull request?
There were two sources of flakiness in StreamingQueryListener test.

- When testing with manual clock, consecutive attempts to advance the clock can 
occur without the stream execution thread being unblocked and doing some work 
between the two attempts. Hence the following can happen with the current 
ManualClock.
```
+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this 
should be disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this 
should be disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+
```

- Second source of flakiness is that the adding data to memory stream may get 
processing in any trigger, not just the first trigger.

My fix is to make the manual clock wait for the other stream execution thread 
to start waiting for the clock at the right wait start time. That is, 
`advance(200)` (see above) will wait for stream execution thread to complete 
the wait that started at time 0, and start a new wait at time 200 (i.e. time 
stamp after the previous `advance(100)`).

In addition, since this is a feature that is solely used by StreamExecution, I 
removed all the non-generic code from ManualClock and put them in 
StreamManualClock inside StreamTest.

## How was this patch tested?
Ran existing unit test MANY TIME in Jenkins

Author: Tathagata Das <tathagata.das1...@gmail.com>
Author: Liwei Lin <lwl...@gmail.com>

Closes #15519 from tdas/metrics-flaky-test-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d878cf2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d878cf2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d878cf2

Branch: refs/heads/master
Commit: 7d878cf2da04800bc4147b05610170865b148c64
Parents: 1c5a7d7
Author: Liwei Lin <lwl...@gmail.com>
Authored: Tue Oct 18 00:49:57 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Oct 18 00:49:57 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/ManualClock.scala     | 18 ++--------
 .../spark/sql/streaming/StreamSuite.scala       |  4 +--
 .../apache/spark/sql/streaming/StreamTest.scala | 38 ++++++++++++++++----
 .../streaming/StreamingQueryListenerSuite.scala |  8 ++---
 4 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/core/src/main/scala/org/apache/spark/util/ManualClock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala 
b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
index 91a9587..e7a65d7 100644
--- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala
+++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -26,8 +26,6 @@ package org.apache.spark.util
  */
 private[spark] class ManualClock(private var time: Long) extends Clock {
 
-  private var _isWaiting = false
-
   /**
    * @return `ManualClock` with initial time 0
    */
@@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) 
extends Clock {
    * @return current time reported by the clock when waiting finishes
    */
   def waitTillTime(targetTime: Long): Long = synchronized {
-    _isWaiting = true
-    try {
-      while (time < targetTime) {
-        wait(10)
-      }
-      getTimeMillis()
-    } finally {
-      _isWaiting = false
+    while (time < targetTime) {
+      wait(10)
     }
+    getTimeMillis()
   }
-
-  /**
-   * Returns whether there is any thread being blocked in `waitTillTime`.
-   */
-  def isWaiting: Boolean = synchronized { _isWaiting }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index cdbad90..6bdf479 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -161,7 +161,7 @@ class StreamSuite extends StreamTest {
 
     val inputData = MemoryStream[Int]
     testStream(inputData.toDS())(
-      StartStream(ProcessingTime("10 seconds"), new ManualClock),
+      StartStream(ProcessingTime("10 seconds"), new StreamManualClock),
 
       /* -- batch 0 ----------------------- */
       // Add some data in batch 0
@@ -199,7 +199,7 @@ class StreamSuite extends StreamTest {
 
       /* Stop then restart the Stream  */
       StopStream,
-      StartStream(ProcessingTime("10 seconds"), new ManualClock),
+      StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 
1000)),
 
       /* -- batch 1 rerun ----------------- */
       // this batch 1 would re-run because the latest batch id logged in 
offset log is 1

http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 3b9d378..254f823 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -204,6 +204,21 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
   case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
     extends StreamAction
 
+  class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
+    private var waitStartTime: Option[Long] = None
+
+    override def waitTillTime(targetTime: Long): Long = synchronized {
+      try {
+        waitStartTime = Some(getTimeMillis())
+        super.waitTillTime(targetTime)
+      } finally {
+        waitStartTime = None
+      }
+    }
+
+    def isStreamWaitingAt(time: Long): Boolean = synchronized { 
waitStartTime.contains(time) }
+  }
+
 
   /**
    * Executes the specified actions on the given streaming DataFrame and 
provides helpful
@@ -307,7 +322,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     val testThread = Thread.currentThread()
     val metadataRoot = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
     val statusCollector = new QueryStatusCollector
-
+    var manualClockExpectedTime = -1L
     try {
       spark.streams.addListener(statusCollector)
       startedTest.foreach { action =>
@@ -315,6 +330,12 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
         action match {
           case StartStream(trigger, triggerClock) =>
             verify(currentStream == null, "stream already running")
+            verify(triggerClock.isInstanceOf[SystemClock]
+              || triggerClock.isInstanceOf[StreamManualClock],
+              "Use either SystemClock or StreamManualClock to start the 
stream")
+            if (triggerClock.isInstanceOf[StreamManualClock]) {
+              manualClockExpectedTime = 
triggerClock.asInstanceOf[StreamManualClock].getTimeMillis()
+            }
             lastStream = currentStream
             currentStream =
               spark
@@ -338,14 +359,19 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
           case AdvanceManualClock(timeToAdd) =>
             verify(currentStream != null,
                    "can not advance manual clock when a stream is not running")
-            verify(currentStream.triggerClock.isInstanceOf[ManualClock],
+            verify(currentStream.triggerClock.isInstanceOf[StreamManualClock],
                    s"can not advance clock of type 
${currentStream.triggerClock.getClass}")
-            val clock = currentStream.triggerClock.asInstanceOf[ManualClock]
+            val clock = 
currentStream.triggerClock.asInstanceOf[StreamManualClock]
+            assert(manualClockExpectedTime >= 0)
             // Make sure we don't advance ManualClock too early. See 
SPARK-16002.
-            eventually("ManualClock has not yet entered the waiting state") {
-              assert(clock.isWaiting)
+            eventually("StreamManualClock has not yet entered the waiting 
state") {
+              assert(clock.isStreamWaitingAt(manualClockExpectedTime))
             }
-            
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
+            clock.advance(timeToAdd)
+            manualClockExpectedTime += timeToAdd
+            verify(clock.getTimeMillis() === manualClockExpectedTime,
+              s"Unexpected clock time after updating: " +
+                s"expecting $manualClockExpectedTime, current 
${clock.getTimeMillis()}")
 
           case StopStream =>
             verify(currentStream != null, "can not stop a stream that is not 
running")

http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 9e0eefb..623f66a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -43,9 +43,9 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     // Make sure we don't leak any events to the next test
   }
 
-  ignore("single listener, check trigger statuses") {
+  test("single listener, check trigger statuses") {
     import StreamingQueryListenerSuite._
-    clock = new ManualClock()
+    clock = new StreamManualClock
 
     /** Custom MemoryStream that waits for manual clock to reach a time */
     val inputData = new MemoryStream[Int](0, sqlContext) {
@@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       AssertOnLastQueryStatus { status: StreamingQueryStatus =>
         // Check the correctness of the trigger info of the last completed 
batch reported by
         // onQueryProgress
-        assert(status.triggerDetails.get("triggerId") == "0")
+        assert(status.triggerDetails.containsKey("triggerId"))
         assert(status.triggerDetails.get("isTriggerActive") === "false")
         assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
 
@@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
         assert(status.triggerDetails.get("numRows.state.aggregation1.updated") 
=== "1")
 
         assert(status.sourceStatuses.length === 1)
-        assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === 
"0")
+        
assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
         
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") 
=== "100")
         
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") 
=== "200")
         
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === 
"2")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to