This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 637a6c2 [SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait
until slow receiver has been initialized, but with hard timeout
637a6c2 is described below
commit 637a6c2750be8d4f42b1fd11c4cca8d0067e80d8
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Wed Sep 11 13:31:43 2019 -0700
[SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow
receiver has been initialized, but with hard timeout
### What changes were proposed in this pull request?
This patch fixes the flaky test failure from StreamingContextSuite "stop
slow receiver gracefully", via putting flag whether initializing slow receiver
is completed, and wait for such flag to be true. As receiver should be
submitted via job and initialized in executor, 500ms might not be enough for
covering all cases.
### Why are the changes needed?
We got some reports for test failure on this test. Please refer
[SPARK-24663](https://issues.apache.org/jira/browse/SPARK-24663)
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Modified UT. I've artificially made delay on handling job submission via
adding below code in `DAGScheduler.submitJob`:
```
if (rdd != null && rdd.name != null && rdd.name.startsWith("Receiver")) {
println(s"Receiver Job! rdd name: ${rdd.name}")
Thread.sleep(1000)
}
```
and the test "stop slow receiver gracefully" failed on current master and
passed on the patch.
Closes #25725 from HeartSaVioR/SPARK-24663.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../org/apache/spark/streaming/StreamingContextSuite.scala | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 7e1b411..be39168 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -355,7 +355,6 @@ class StreamingContextSuite extends SparkFunSuite with
BeforeAndAfter with TimeL
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
- SlowTestReceiver.receivedAllRecords = false
// Create test receiver that sleeps in onStop()
val totalNumRecords = 15
val recordsPerSecond = 1
@@ -367,6 +366,9 @@ class StreamingContextSuite extends SparkFunSuite with
BeforeAndAfter with TimeL
}
ssc.start()
ssc.awaitTerminationOrTimeout(500)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ assert(SlowTestReceiver.initialized)
+ }
ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount)
assert(runningCount > 0)
@@ -958,6 +960,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond:
Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
var receivingThreadOption: Option[Thread] = None
+ @volatile var receivedAllRecords = false
def onStart() {
val thread = new Thread() {
@@ -967,17 +970,18 @@ class SlowTestReceiver(totalRecords: Int,
recordsPerSecond: Int)
Thread.sleep(1000 / recordsPerSecond)
store(i)
}
- SlowTestReceiver.receivedAllRecords = true
+ receivedAllRecords = true
logInfo(s"Received all $totalRecords records")
}
}
receivingThreadOption = Some(thread)
thread.start()
+ SlowTestReceiver.initialized = true
}
def onStop() {
// Simulate slow receiver by waiting for all records to be produced
- while (!SlowTestReceiver.receivedAllRecords) {
+ while (!receivedAllRecords) {
Thread.sleep(100)
}
// no clean to be done, the receiving thread should stop on it own
@@ -985,7 +989,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond:
Int)
}
object SlowTestReceiver {
- var receivedAllRecords = false
+ var initialized = false
}
/** Streaming application for testing DStream and RDD creation sites */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]