This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b5a9baa  [SPARK-28247][SS] Fix flaky test "query without test harness" 
on ContinuousSuite
b5a9baa is described below

commit b5a9baa19cbc2f4d2f7a7f6218bddda3efef2ba4
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Sat Jul 13 12:11:06 2019 -0500

    [SPARK-28247][SS] Fix flaky test "query without test harness" on 
ContinuousSuite
    
    ## What changes were proposed in this pull request?
    
    This patch fixes the flaky test "query without test harness" on 
ContinuousSuite, via adding some more gaps on waiting query to commit the epoch 
which writes output rows.
    
    The observation of this issue is below (injected some debug logs to get 
them):
    
    ```
    reader creation time                                   1562225320210
    epoch 1 launched                                       1562225320593 
(+380ms from reader creation time)
    epoch 13 launched                                      1562225321702 (+1.5s 
from reader creation time)
    partition reader creation time                         1562225321715 (+1.5s 
from reader creation time)
    
    next read time for first next call                     1562225321210 (+1s 
from reader creation time)
    first next called in partition reader                  1562225321746 
(immediately after creation of partition reader)
    wait finished in next called in partition reader       1562225321746 (no 
wait)
    
    second next called in partition reader                 1562225321747 
(immediately after first next())
    
    epoch 0 commit started                                 1562225321861
    
    writing rows (0, 1) (belong to epoch 13)               1562225321866 
(+100ms after first next())
    
    wait start in waitForRateSourceTriggers(2)             1562225322059
    
    next read time for second next call                    1562225322210 (+1s 
from previous "next read time")
    wait finished in next called in partition reader       1562225322211 
(+450ms wait)
    
    writing rows (2, 3) (belong to epoch 13)               1562225322211 
(immediately after next())
    
    epoch 14 launched                                      1562225322246
    
    desired wait time in waitForRateSourceTriggers(2)      1562225322510 (+2.3s 
from reader creation time)
    
    epoch 12 committed                                     1562225323034
    ```
    
    These rows were written within desired wait time, but the epoch 13 couldn't 
be committed within it. Interestingly, epoch 12 was lucky to be committed 
within a gap between finished waiting in waitForRateSourceTriggers and 
query.stop() - but even suppose the rows were written in epoch 12, it would be 
just in luck and epoch should be committed within desired wait time.
    
    This patch modifies Rate continuous stream to track the highest committed 
value, so that test can wait until desired value is reported to the stream as 
committed.
    
    This patch also modifies Rate continuous stream to track the timestamp at 
stream gets the first committed offset, and let `waitForRateSourceTriggers` use 
the timestamp. This also relies on waiting for specific period, but safer 
approach compared to current based on the observation above. Based on the 
change, this patch saves couple of seconds in test time.
    
    ## How was this patch tested?
    
    10 sequential test runs succeeded locally.
    
    Closes #25048 from HeartSaVioR/SPARK-28247.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
    Signed-off-by: Sean Owen <[email protected]>
---
 .../sql/streaming/continuous/ContinuousSuite.scala | 63 ++++++++++++++++------
 1 file changed, 47 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index dca4520..c692101 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -37,18 +37,43 @@ class ContinuousSuiteBase extends StreamTest {
       "continuous-stream-test-sql-context",
       sparkConf.set("spark.sql.testkey", "true")))
 
-  protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: 
Int): Unit = {
-    query match {
-      case s: ContinuousExecution =>
-        assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure 
query is initialized")
-        val reader = s.lastExecution.executedPlan.collectFirst {
-          case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
-        }.get
-
-        val deltaMs = numTriggers * 1000 + 300
-        while (System.currentTimeMillis < reader.creationTime + deltaMs) {
-          Thread.sleep(reader.creationTime + deltaMs - 
System.currentTimeMillis)
+  protected def waitForRateSourceTriggers(query: ContinuousExecution, 
numTriggers: Int): Unit = {
+    query.awaitEpoch(0)
+
+    // This is called after waiting first epoch to be committed, so we can 
just treat
+    // it as partition readers for rate source are already initialized.
+    val firstCommittedTime = System.nanoTime()
+    val deltaNs = (numTriggers * 1000 + 300) * 1000000L
+    var toWaitNs = firstCommittedTime + deltaNs - System.nanoTime()
+    while (toWaitNs > 0) {
+      Thread.sleep(toWaitNs / 1000000)
+      toWaitNs = firstCommittedTime + deltaNs - System.nanoTime()
+    }
+  }
+
+  protected def waitForRateSourceCommittedValue(
+      query: ContinuousExecution,
+      desiredValue: Long,
+      maxWaitTimeMs: Long): Unit = {
+    def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = {
+      c.committedOffsets.lastOption.map { case (_, offset) =>
+        offset match {
+          case o: RateStreamOffset =>
+            o.partitionToValueAndRunTimeMs.map {
+              case (_, ValueRunTimeMsPair(value, _)) => value
+            }.max
         }
+      }
+    }
+
+    val maxWait = System.currentTimeMillis() + maxWaitTimeMs
+    while (System.currentTimeMillis() < maxWait &&
+      readHighestCommittedValue(query).getOrElse(Long.MinValue) < 
desiredValue) {
+      Thread.sleep(100)
+    }
+    if (System.currentTimeMillis() > maxWait) {
+      logWarning(s"Couldn't reach desired value in $maxWaitTimeMs 
milliseconds!" +
+        s"Current highest committed value is 
${readHighestCommittedValue(query)}")
     }
   }
 
@@ -216,14 +241,16 @@ class ContinuousSuite extends ContinuousSuiteBase {
       .queryName("noharness")
       .trigger(Trigger.Continuous(100))
       .start()
+
+    val expected = Set(0, 1, 2, 3)
     val continuousExecution =
       
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution]
-    continuousExecution.awaitEpoch(0)
-    waitForRateSourceTriggers(continuousExecution, 2)
+    waitForRateSourceCommittedValue(continuousExecution, expected.max, 20 * 
1000)
     query.stop()
 
     val results = spark.read.table("noharness").collect()
-    assert(Set(0, 1, 2, 3).map(Row(_)).subsetOf(results.toSet))
+    assert(expected.map(Row(_)).subsetOf(results.toSet),
+      s"Result set ${results.toSet} are not a superset of $expected!")
   }
 }
 
@@ -241,7 +268,9 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
     testStream(df)(
       StartStream(longContinuousTrigger),
       AwaitEpoch(0),
-      Execute(waitForRateSourceTriggers(_, 10)),
+      Execute { exec =>
+        waitForRateSourceTriggers(exec.asInstanceOf[ContinuousExecution], 5)
+      },
       IncrementEpoch(),
       StopStream,
       CheckAnswerRowsContains(scala.Range(0, 2500).map(Row(_)))
@@ -259,7 +288,9 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
     testStream(df)(
       StartStream(Trigger.Continuous(2012)),
       AwaitEpoch(0),
-      Execute(waitForRateSourceTriggers(_, 10)),
+      Execute { exec =>
+        waitForRateSourceTriggers(exec.asInstanceOf[ContinuousExecution], 5)
+      },
       IncrementEpoch(),
       StopStream,
       CheckAnswerRowsContains(scala.Range(0, 2500).map(Row(_))))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to