Repository: spark Updated Branches: refs/heads/branch-1.6 2f390d306 -> a907c7c64
[SPARK-13195][STREAMING] Fix NoSuchElementException when a state is not set but timeoutThreshold is defined Check the state Existence before calling get. Author: Shixiong Zhu <[email protected]> Closes #11081 from zsxwing/SPARK-13195. (cherry picked from commit 8e2f296306131e6c7c2f06d6672995d3ff8ab021) Signed-off-by: Shixiong Zhu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a907c7c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a907c7c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a907c7c6 Branch: refs/heads/branch-1.6 Commit: a907c7c64887833770cd593eecccf53620de59b7 Parents: 2f390d3 Author: Shixiong Zhu <[email protected]> Authored: Thu Feb 4 12:43:16 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Thu Feb 4 12:43:25 2016 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala | 3 ++- .../org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a907c7c6/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala index fdf6167..a301ba0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala @@ -57,7 +57,8 @@ private[streaming] object MapWithStateRDDRecord { val returned = mappingFunction(batchTime, key, Some(value), wrappedState) if (wrappedState.isRemoved) { newStateMap.remove(key) - } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) { + } else if (wrappedState.isUpdated + || (wrappedState.exists && timeoutThresholdTime.isDefined)) { newStateMap.put(key, wrappedState.get(), batchTime.milliseconds) } mappedData ++= returned http://git-wip-us.apache.org/repos/asf/spark/blob/a907c7c6/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala index aa95bd3..2b71a1b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala @@ -185,6 +185,11 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true, expectedStates = Nil, expectedTimingOutStates = Nil, expectedRemovedStates = Seq(123)) + // If a state is not set but timeoutThreshold is defined, we should ignore this state. + // Previously it threw NoSuchElementException (SPARK-13195). + assertRecordUpdate(initStates = Seq(), data = Seq("noop"), + timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true, + expectedStates = Nil, expectedTimingOutStates = Nil) } test("states generated by MapWithStateRDD") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
