[ 
https://issues.apache.org/jira/browse/SPARK-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Itzchakov updated SPARK-13195:
------------------------------------
    Description: 
Using the new spark mapWithState API, I've encountered a bug when setting a 
timeout for mapWithState but no explicit state handling.

Steps to reproduce:

1. Create a method which conforms to the StateSpec signature, make sure to not 
update any state inside it using `state.update`. Simply create a "pass through" 
method, may even be empty.
2. Create a StateSpec object with method from step 1, which explicitly sets a 
timeout using `StateSpec.timeout` method.
3. Create a DStream pipeline that uses mapWithState with the given StateSpec.
4. Run code using spark-submit. You'll see that the method ends up throwing the 
following exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage 136.0 
(TID 176, ip-172-31-2-11.us-west-2.compute.internal): 
java.util.NoSuchElementException: State is not set
        at org.apache.spark.streaming.StateImpl.get(State.scala:150)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Sample code to reproduce the issue:

{code:Title=BugReproduce}
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by yuvali on 04/02/2016.
  */
object Program {

  def main(args: Array[String]): Unit = {
    
    val sc = new SparkConf().setAppName("mapWithState bug reproduce")
    val sparkContext = new SparkContext(sc)

    val ssc = new StreamingContext(sparkContext, Seconds(4))
    val stateSpec = StateSpec.function(trackStateFunc _).timeout(Seconds(60))

    // Create a stream that generates 1000 lines per second
    val stream = ssc.receiverStream(new DummySource(10))

    // Split the lines into words, and create a paired (key-value) dstream
    val wordStream = stream.flatMap {
      _.split(" ")
    }.map(word => (word, 1))

    // This represents the emitted stream from the trackStateFunc. Since we 
emit every input record with the updated value,
    // this stream will contain the same # of records as the input dstream.
    val wordCountStateStream = wordStream.mapWithState(stateSpec)
    wordCountStateStream.print()

    ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we 
query it interactively

    // Don't forget to set checkpoint directory
    ssc.checkpoint("")
    ssc.start()
    ssc.awaitTermination()
  }

  def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: 
State[Long]): Option[(String, Long)] = {
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
    val output = (key, sum)
    Some(output)
  }
}
{code}

The given issue resides in the following 
MapWithStateRDDRecord.updateRecordWithData, in the following code block:

    dataIterator.foreach { case (key, value) =>
      wrappedState.wrap(newStateMap.get(key))
      val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
      if (wrappedState.isRemoved) {
        newStateMap.remove(key)
      } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {
        newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
      }
      mappedData ++= returned
    }

In case the stream has a timeout set, but the state wasn't set at all, the 
"else-if" will still follow through because the timeout is defined but 
"wrappedState" is empty and wasn't set.

If it is mandatory to update state for each entry of mapWithState, then this 
code should throw a better exception than "NoSuchElementException", which 
doesn't really saw anything to the developer.

I haven't provided a fix myself because I'm not familiar with the spark 
implementation, but it seems to be there needs to either be an extra check if 
the state is set, or as previously stated a better exception message.

  was:
Using the new spark mapWithState API, I've encountered a bug when setting a 
timeout for mapWithState but no explicit state handling.

Steps to reproduce:

1. Create a method which conforms to the StateSpec signature, make sure to not 
update any state inside it using `state.update`. Simply create a "pass through" 
method, may even be empty.
2. Create a StateSpec object with method from step 1, which explicitly sets a 
timeout using `StateSpec.timeout` method.
3. Create a DStream pipeline that uses mapWithState with the given StateSpec.
4. Run code using spark-submit. You'll see that the method ends up throwing the 
following exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage 136.0 
(TID 176, ip-172-31-2-11.us-west-2.compute.internal): 
java.util.NoSuchElementException: State is not set
        at org.apache.spark.streaming.StateImpl.get(State.scala:150)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Sample code to reproduce the issue:

import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by yuvali on 04/02/2016.
  */
object Program {

  def main(args: Array[String]): Unit = {
    
    val sc = new SparkConf().setAppName("mapWithState bug reproduce")
    val sparkContext = new SparkContext(sc)

    val ssc = new StreamingContext(sparkContext, Seconds(4))
    val stateSpec = StateSpec.function(trackStateFunc _).timeout(Seconds(60))

    // Create a stream that generates 1000 lines per second
    val stream = ssc.receiverStream(new DummySource(10))

    // Split the lines into words, and create a paired (key-value) dstream
    val wordStream = stream.flatMap {
      _.split(" ")
    }.map(word => (word, 1))

    // This represents the emitted stream from the trackStateFunc. Since we 
emit every input record with the updated value,
    // this stream will contain the same # of records as the input dstream.
    val wordCountStateStream = wordStream.mapWithState(stateSpec)
    wordCountStateStream.print()

    ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we 
query it interactively

    // Don't forget to set checkpoint directory
    ssc.checkpoint("")
    ssc.start()
    ssc.awaitTermination()
  }

  def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: 
State[Long]): Option[(String, Long)] = {
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
    val output = (key, sum)
    Some(output)
  }
}

The given issue resides in the following 
MapWithStateRDDRecord.updateRecordWithData, in the following code block:

    dataIterator.foreach { case (key, value) =>
      wrappedState.wrap(newStateMap.get(key))
      val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
      if (wrappedState.isRemoved) {
        newStateMap.remove(key)
      } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {
        newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
      }
      mappedData ++= returned
    }

In case the stream has a timeout set, but the state wasn't set at all, the 
"else-if" will still follow through because the timeout is defined but 
"wrappedState" is empty and wasn't set.

If it is mandatory to update state for each entry of mapWithState, then this 
code should throw a better exception than "NoSuchElementException", which 
doesn't really saw anything to the developer.

I haven't provided a fix myself because I'm not familiar with the spark 
implementation, but it seems to be there needs to either be an extra check if 
the state is set, or as previously stated a better exception message.


> mapWithState fails in case of timeout set without state update
> --------------------------------------------------------------
>
>                 Key: SPARK-13195
>                 URL: https://issues.apache.org/jira/browse/SPARK-13195
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.0
>            Reporter: Yuval Itzchakov
>
> Using the new spark mapWithState API, I've encountered a bug when setting a 
> timeout for mapWithState but no explicit state handling.
> Steps to reproduce:
> 1. Create a method which conforms to the StateSpec signature, make sure to 
> not update any state inside it using `state.update`. Simply create a "pass 
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly sets a 
> timeout using `StateSpec.timeout` method.
> 3. Create a DStream pipeline that uses mapWithState with the given StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up throwing 
> the following exception:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage 136.0 
> (TID 176, ip-172-31-2-11.us-west-2.compute.internal): 
> java.util.NoSuchElementException: State is not set
>       at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Sample code to reproduce the issue:
> {code:Title=BugReproduce}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>   def main(args: Array[String]): Unit = {
>     
>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>     val sparkContext = new SparkContext(sc)
>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>     val stateSpec = StateSpec.function(trackStateFunc _).timeout(Seconds(60))
>     // Create a stream that generates 1000 lines per second
>     val stream = ssc.receiverStream(new DummySource(10))
>     // Split the lines into words, and create a paired (key-value) dstream
>     val wordStream = stream.flatMap {
>       _.split(" ")
>     }.map(word => (word, 1))
>     // This represents the emitted stream from the trackStateFunc. Since we 
> emit every input record with the updated value,
>     // this stream will contain the same # of records as the input dstream.
>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>     wordCountStateStream.print()
>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the time 
> we query it interactively
>     // Don't forget to set checkpoint directory
>     ssc.checkpoint("")
>     ssc.start()
>     ssc.awaitTermination()
>   }
>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: 
> State[Long]): Option[(String, Long)] = {
>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>     val output = (key, sum)
>     Some(output)
>   }
> }
> {code}
> The given issue resides in the following 
> MapWithStateRDDRecord.updateRecordWithData, in the following code block:
>     dataIterator.foreach { case (key, value) =>
>       wrappedState.wrap(newStateMap.get(key))
>       val returned = mappingFunction(batchTime, key, Some(value), 
> wrappedState)
>       if (wrappedState.isRemoved) {
>         newStateMap.remove(key)
>       } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {
>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>       }
>       mappedData ++= returned
>     }
> In case the stream has a timeout set, but the state wasn't set at all, the 
> "else-if" will still follow through because the timeout is defined but 
> "wrappedState" is empty and wasn't set.
> If it is mandatory to update state for each entry of mapWithState, then this 
> code should throw a better exception than "NoSuchElementException", which 
> doesn't really saw anything to the developer.
> I haven't provided a fix myself because I'm not familiar with the spark 
> implementation, but it seems to be there needs to either be an extra check if 
> the state is set, or as previously stated a better exception message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to