[
https://issues.apache.org/jira/browse/SPARK-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuval Itzchakov updated SPARK-13195:
------------------------------------
Summary: PairDStreamFunctions.mapWithState fails in case of timeout set
without state update (was: mapWithState fails in case of timeout set without
state update)
> PairDStreamFunctions.mapWithState fails in case of timeout set without state
> update
> -----------------------------------------------------------------------------------
>
> Key: SPARK-13195
> URL: https://issues.apache.org/jira/browse/SPARK-13195
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.6.0
> Reporter: Yuval Itzchakov
>
> Using the new spark mapWithState API, I've encountered a bug when setting a
> timeout for mapWithState but no explicit state handling.
> h1. Steps to reproduce:
> 1. Create a method which conforms to the StateSpec signature, make sure to
> not update any state inside it using `state.update`. Simply create a "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly sets a
> timeout using `StateSpec.timeout` method.
> 3. Create a DStream pipeline that uses mapWithState with the given StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up throwing
> the following exception:
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage 136.0
> (TID 176, ****): java.util.NoSuchElementException: State is not set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> h1. Sample code to reproduce the issue:
> {code:Title=BugReproduce}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
> * Created by yuvali on 04/02/2016.
> */
> object Program {
> def main(args: Array[String]): Unit = {
>
> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
> val sparkContext = new SparkContext(sc)
> val ssc = new StreamingContext(sparkContext, Seconds(4))
> val stateSpec = StateSpec.function(trackStateFunc _).timeout(Seconds(60))
> // Create a stream that generates 1000 lines per second
> val stream = ssc.receiverStream(new DummySource(10))
> // Split the lines into words, and create a paired (key-value) dstream
> val wordStream = stream.flatMap {
> _.split(" ")
> }.map(word => (word, 1))
> // This represents the emitted stream from the trackStateFunc. Since we
> emit every input record with the updated value,
> // this stream will contain the same # of records as the input dstream.
> val wordCountStateStream = wordStream.mapWithState(stateSpec)
> wordCountStateStream.print()
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time
> we query it interactively
> // Don't forget to set checkpoint directory
> ssc.checkpoint("")
> ssc.start()
> ssc.awaitTermination()
> }
> def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state:
> State[Long]): Option[(String, Long)] = {
> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
> val output = (key, sum)
> Some(output)
> }
> }
> {code}
> The given issue resides in the following
> `MapWithStateRDDRecord.updateRecordWithData`, starting line 55, in the
> following code block:
> {code}
> dataIterator.foreach { case (key, value) =>
> wrappedState.wrap(newStateMap.get(key))
> val returned = mappingFunction(batchTime, key, Some(value),
> wrappedState)
> if (wrappedState.isRemoved) {
> newStateMap.remove(key)
> } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) /*
> <--- problem is here */ {
> newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
> }
> mappedData ++= returned
> }
> {code}
> In case the stream has a timeout set, but the state wasn't set at all, the
> "else-if" will still follow through because the timeout is defined but
> "wrappedState" is empty and wasn't set.
> If it is mandatory to update state for each entry of mapWithState, then this
> code should throw a better exception than "NoSuchElementException", which
> doesn't really saw anything to the developer.
> I haven't provided a fix myself because I'm not familiar with the spark
> implementation, but it seems to be there needs to either be an extra check if
> the state is set, or as previously stated a better exception message.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]