This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b5a9baa [SPARK-28247][SS] Fix flaky test "query without test harness"
on ContinuousSuite
b5a9baa is described below
commit b5a9baa19cbc2f4d2f7a7f6218bddda3efef2ba4
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Sat Jul 13 12:11:06 2019 -0500
[SPARK-28247][SS] Fix flaky test "query without test harness" on
ContinuousSuite
## What changes were proposed in this pull request?
This patch fixes the flaky test "query without test harness" on
ContinuousSuite, via adding some more gaps on waiting query to commit the epoch
which writes output rows.
The observation of this issue is below (injected some debug logs to get
them):
```
reader creation time 1562225320210
epoch 1 launched 1562225320593
(+380ms from reader creation time)
epoch 13 launched 1562225321702 (+1.5s
from reader creation time)
partition reader creation time 1562225321715 (+1.5s
from reader creation time)
next read time for first next call 1562225321210 (+1s
from reader creation time)
first next called in partition reader 1562225321746
(immediately after creation of partition reader)
wait finished in next called in partition reader 1562225321746 (no
wait)
second next called in partition reader 1562225321747
(immediately after first next())
epoch 0 commit started 1562225321861
writing rows (0, 1) (belong to epoch 13) 1562225321866
(+100ms after first next())
wait start in waitForRateSourceTriggers(2) 1562225322059
next read time for second next call 1562225322210 (+1s
from previous "next read time")
wait finished in next called in partition reader 1562225322211
(+450ms wait)
writing rows (2, 3) (belong to epoch 13) 1562225322211
(immediately after next())
epoch 14 launched 1562225322246
desired wait time in waitForRateSourceTriggers(2) 1562225322510 (+2.3s
from reader creation time)
epoch 12 committed 1562225323034
```
These rows were written within desired wait time, but the epoch 13 couldn't
be committed within it. Interestingly, epoch 12 was lucky to be committed
within a gap between finished waiting in waitForRateSourceTriggers and
query.stop() - but even suppose the rows were written in epoch 12, it would be
just in luck and epoch should be committed within desired wait time.
This patch modifies Rate continuous stream to track the highest committed
value, so that test can wait until desired value is reported to the stream as
committed.
This patch also modifies Rate continuous stream to track the timestamp at
stream gets the first committed offset, and let `waitForRateSourceTriggers` use
the timestamp. This also relies on waiting for specific period, but safer
approach compared to current based on the observation above. Based on the
change, this patch saves couple of seconds in test time.
## How was this patch tested?
10 sequential test runs succeeded locally.
Closes #25048 from HeartSaVioR/SPARK-28247.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../sql/streaming/continuous/ContinuousSuite.scala | 63 ++++++++++++++++------
1 file changed, 47 insertions(+), 16 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index dca4520..c692101 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -37,18 +37,43 @@ class ContinuousSuiteBase extends StreamTest {
"continuous-stream-test-sql-context",
sparkConf.set("spark.sql.testkey", "true")))
- protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers:
Int): Unit = {
- query match {
- case s: ContinuousExecution =>
- assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure
query is initialized")
- val reader = s.lastExecution.executedPlan.collectFirst {
- case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
- }.get
-
- val deltaMs = numTriggers * 1000 + 300
- while (System.currentTimeMillis < reader.creationTime + deltaMs) {
- Thread.sleep(reader.creationTime + deltaMs -
System.currentTimeMillis)
+ protected def waitForRateSourceTriggers(query: ContinuousExecution,
numTriggers: Int): Unit = {
+ query.awaitEpoch(0)
+
+ // This is called after waiting first epoch to be committed, so we can
just treat
+ // it as partition readers for rate source are already initialized.
+ val firstCommittedTime = System.nanoTime()
+ val deltaNs = (numTriggers * 1000 + 300) * 1000000L
+ var toWaitNs = firstCommittedTime + deltaNs - System.nanoTime()
+ while (toWaitNs > 0) {
+ Thread.sleep(toWaitNs / 1000000)
+ toWaitNs = firstCommittedTime + deltaNs - System.nanoTime()
+ }
+ }
+
+ protected def waitForRateSourceCommittedValue(
+ query: ContinuousExecution,
+ desiredValue: Long,
+ maxWaitTimeMs: Long): Unit = {
+ def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = {
+ c.committedOffsets.lastOption.map { case (_, offset) =>
+ offset match {
+ case o: RateStreamOffset =>
+ o.partitionToValueAndRunTimeMs.map {
+ case (_, ValueRunTimeMsPair(value, _)) => value
+ }.max
}
+ }
+ }
+
+ val maxWait = System.currentTimeMillis() + maxWaitTimeMs
+ while (System.currentTimeMillis() < maxWait &&
+ readHighestCommittedValue(query).getOrElse(Long.MinValue) <
desiredValue) {
+ Thread.sleep(100)
+ }
+ if (System.currentTimeMillis() > maxWait) {
+ logWarning(s"Couldn't reach desired value in $maxWaitTimeMs
milliseconds!" +
+ s"Current highest committed value is
${readHighestCommittedValue(query)}")
}
}
@@ -216,14 +241,16 @@ class ContinuousSuite extends ContinuousSuiteBase {
.queryName("noharness")
.trigger(Trigger.Continuous(100))
.start()
+
+ val expected = Set(0, 1, 2, 3)
val continuousExecution =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution]
- continuousExecution.awaitEpoch(0)
- waitForRateSourceTriggers(continuousExecution, 2)
+ waitForRateSourceCommittedValue(continuousExecution, expected.max, 20 *
1000)
query.stop()
val results = spark.read.table("noharness").collect()
- assert(Set(0, 1, 2, 3).map(Row(_)).subsetOf(results.toSet))
+ assert(expected.map(Row(_)).subsetOf(results.toSet),
+ s"Result set ${results.toSet} are not a superset of $expected!")
}
}
@@ -241,7 +268,9 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
testStream(df)(
StartStream(longContinuousTrigger),
AwaitEpoch(0),
- Execute(waitForRateSourceTriggers(_, 10)),
+ Execute { exec =>
+ waitForRateSourceTriggers(exec.asInstanceOf[ContinuousExecution], 5)
+ },
IncrementEpoch(),
StopStream,
CheckAnswerRowsContains(scala.Range(0, 2500).map(Row(_)))
@@ -259,7 +288,9 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
testStream(df)(
StartStream(Trigger.Continuous(2012)),
AwaitEpoch(0),
- Execute(waitForRateSourceTriggers(_, 10)),
+ Execute { exec =>
+ waitForRateSourceTriggers(exec.asInstanceOf[ContinuousExecution], 5)
+ },
IncrementEpoch(),
StopStream,
CheckAnswerRowsContains(scala.Range(0, 2500).map(Row(_))))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]