[ https://issues.apache.org/jira/browse/SPARK-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145625#comment-15145625 ]
Bryan Cutler commented on SPARK-10086: -------------------------------------- I was able to track down the cause of these failures, so here is an update with what I found. The test {{StreamingKMeansTest.test_trainOn_predictOn}} has 2 {{DStream.foreachRDD}} output operations, 1 in the call to {{StreamingKMeans.trainOn}} and 1 with {{collect}} which has a parent {{DStream}} that is a {{PythonTransformedDStream}} returned from {{StreamingKMeans.predictOn}}, so 2 jobs are generated for each batch. When the {{DStream}} jobs are generated, there is nothing to compute for the first job, which updates the model. For generating the second job, {{PythonTransformedDStream.compute}} gets called which will then do a {{PythonTransformFunction}} callback that creates a {{PythonRDD}} and serializes the mapped predict function to a command, containing the current model. Next, the 2 jobs are scheduled in order - first to update the model and then collect the predicted result. At this point, there is a race condition between completing the model update and generating the next set of jobs, which is running in a different thread. If there is enough of a delay in the update, then the next set of jobs will be generated and the old model will be serialized to the {{PythonRDD}} command again. Finally, the predict will be run against this old model causing the test failure. To sum it up, the underlying issue is that a func can be serialized with a value before a job is run that updates this value. This doesn't appear to be an issue in the Scala code as the closure cleaner is run just before the job is executed, and it will get the updated values. So far, the best solution I can think of would be to somehow delay the serialization of the model until it is needed, but I believe this would involve some big changes in {{PythonRDD}} as would any other solutions I could think of. Is something that would be worth doing to correct this, or might there be an easier fix that I am not seeing? It's not just a {{StreamingKMeans}} issue, so it would affect any PySpark streaming application with similar structure. I am attaching some simplified code used to reproduce the issue. I also have a similar Scala version that produces the expected results. > Flaky StreamingKMeans test in PySpark > ------------------------------------- > > Key: SPARK-10086 > URL: https://issues.apache.org/jira/browse/SPARK-10086 > Project: Spark > Issue Type: Bug > Components: MLlib, PySpark, Streaming, Tests > Affects Versions: 1.5.0 > Reporter: Joseph K. Bradley > Priority: Critical > > Here's a report on investigating test failures in StreamingKMeans in PySpark. > (See Jenkins links below.) > It is a StreamingKMeans test which trains on a DStream with 2 batches and > then tests on those same 2 batches. It fails here: > [https://github.com/apache/spark/blob/1968276af0f681fe51328b7dd795bd21724a5441/python/pyspark/mllib/tests.py#L1144] > I recreated the same test, with variants training on: (1) the original 2 > batches, (2) just the first batch, (3) just the second batch, and (4) neither > batch. Here is code which avoids Streaming altogether to identify what > batches were processed. > {code} > from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel > batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]] > batches = [sc.parallelize(batch) for batch in batches] > stkm = StreamingKMeans(decayFactor=0.0, k=2) > stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0]) > # Train > def update(rdd): > stkm._model.update(rdd, stkm._decayFactor, stkm._timeUnit) > # Remove one or both of these lines to test skipping batches. > update(batches[0]) > update(batches[1]) > # Test > def predict(rdd): > return stkm._model.predict(rdd) > predict(batches[0]).collect() > predict(batches[1]).collect() > {code} > *Results*: > {code} > ####################### EXPECTED > [0, 1, 1] > > [1, 0, 1] > ####################### Skip batch 0 > [1, 0, 0] > [0, 1, 0] > ####################### Skip batch 1 > [0, 1, 1] > [1, 0, 1] > ####################### Skip both batches (This is what we see in the test > failures.) > [0, 1, 1] > [0, 0, 0] > {code} > Skipping both batches reproduces the failure. There is no randomness in the > StreamingKMeans algorithm (since initial centers are fixed, not randomized). > CC: [~tdas] [~freeman-lab] [~mengxr] > Failure message: > {code} > ====================================================================== > FAIL: test_trainOn_predictOn (__main__.StreamingKMeansTest) > Test that prediction happens on the updated model. > ---------------------------------------------------------------------- > Traceback (most recent call last): > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py", > line 1147, in test_trainOn_predictOn > self._eventually(condition, catch_assertions=True) > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py", > line 123, in _eventually > raise lastValue > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py", > line 114, in _eventually > lastValue = condition() > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py", > line 1144, in condition > self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) > AssertionError: Lists differ: [[0, 1, 1], [0, 0, 0]] != [[0, 1, 1], [1, 0, 1]] > First differing element 1: > [0, 0, 0] > [1, 0, 1] > - [[0, 1, 1], [0, 0, 0]] > ? ^^^^ > + [[0, 1, 1], [1, 0, 1]] > ? +++ ^ > ---------------------------------------------------------------------- > Ran 62 tests in 164.188s > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org