[ 
https://issues.apache.org/jira/browse/SPARK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14729024#comment-14729024
 ] 

Pete Robbins commented on SPARK-9869:
-------------------------------------

My pull request build for https://issues.apache.org/jira/browse/SPARK-10431 
failed with this error so I took a look. I think it's another timing issue 
where the assert on the progress listener occurs before the asynchronous 
notification thread has completed processing.

Something like this should fix it:

diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index ec2852d..e27d315 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -76,6 +76,8 @@
           fail("Timeout: cannot finish all batches in 30 seconds")
         }
 
+        ssc.scheduler.listenerBus.waitUntilEmpty(500)
+
         // Verify all "InputInfo"s have been reported
         assert(ssc.progressListener.numTotalReceivedRecords === input.size)
         assert(ssc.progressListener.numTotalProcessedRecords === input.size)


> InputStreamSuite.socket input stream is flaky in Jenkins
> --------------------------------------------------------
>
>                 Key: SPARK-9869
>                 URL: https://issues.apache.org/jira/browse/SPARK-9869
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Josh Rosen
>              Labels: flaky-test
>
> https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-1.5-SBT/68/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.3,label=centos/testReport/junit/org.apache.spark.streaming/InputStreamsSuite/socket_input_stream/
> {code}
> org.apache.spark.streaming.InputStreamsSuite.socket input stream
> sbt.ForkMain$ForkError: 4 did not equal 5
>       at 
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
>       at 
> org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
>       at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
>       at 
> org.apache.spark.streaming.InputStreamsSuite$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$5.apply(InputStreamsSuite.scala:80)
>       at 
> org.apache.spark.streaming.InputStreamsSuite$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$5.apply(InputStreamsSuite.scala:53)
>       at 
> org.apache.spark.streaming.TestSuiteBase$class.withStreamingContext(TestSuiteBase.scala:272)
>       at 
> org.apache.spark.streaming.InputStreamsSuite.withStreamingContext(InputStreamsSuite.scala:45)
>       at 
> org.apache.spark.streaming.InputStreamsSuite$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(InputStreamsSuite.scala:53)
>       at 
> org.apache.spark.streaming.InputStreamsSuite$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(InputStreamsSuite.scala:48)
>       at 
> org.apache.spark.streaming.TestSuiteBase$class.withTestServer(TestSuiteBase.scala:289)
>       at 
> org.apache.spark.streaming.InputStreamsSuite.withTestServer(InputStreamsSuite.scala:45)
>       at 
> org.apache.spark.streaming.InputStreamsSuite$$anonfun$1.apply$mcV$sp(InputStreamsSuite.scala:48)
>       at 
> org.apache.spark.streaming.InputStreamsSuite$$anonfun$1.apply(InputStreamsSuite.scala:48)
>       at 
> org.apache.spark.streaming.InputStreamsSuite$$anonfun$1.apply(InputStreamsSuite.scala:48)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to