PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Yuval.Itzchakov
Hi,
I've been playing with the expiramental PairDStreamFunctions.mapWithState
feature and I've seem to have stumbled across a bug, and was wondering if
anyone else has been seeing this behavior.

I've opened up an issue in the Spark JIRA, I simply want to pass this along
in case anyone else is experiencing such a failure or perhaps someone has
insightful information if this is actually a bug:  SPARK-13195
  

Using the new spark mapWithState API, I've encountered a bug when setting a
timeout for mapWithState but no explicit state handling.

h1. Steps to reproduce:

1. Create a method which conforms to the StateSpec signature, make sure to
not update any state inside it using *state.update*. Simply create a "pass
through" method, may even be empty.
2. Create a StateSpec object with method from step 1, which explicitly sets
a timeout using *StateSpec.timeout* method.
3. Create a DStream pipeline that uses mapWithState with the given
StateSpec.
4. Run code using spark-submit. You'll see that the method ends up throwing
the following exception:

{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
at org.apache.spark.streaming.StateImpl.get(State.scala:150)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

h1. Sample code to reproduce the issue:

{code:Title=MainObject}
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by yuvali on 04/02/2016.
  */
object Program {

  def main(args: Array[String]): Unit = {

val sc = new SparkConf().setAppName("mapWithState bug reproduce")
val sparkContext = new SparkContext(sc)

val ssc = new StreamingContext(sparkContext, Seconds(4))
val stateSpec = StateSpec.function(trackStateFunc
_).timeout(Seconds(60))

// Create a stream that generates 1000 lines per second
val stream = ssc.receiverStream(new DummySource(10))

// Split the lines into words, and create a paired (key-value) dstream
val wordStream = stream.flatMap {
  _.split(" ")
}.map(word => (word, 1))

// This represents the emitted stream from the trackStateFunc. Since we
emit every input record with the updated value,
// this stream will contain the same # of records as the input dstream.
val wordCountStateStream = wordStream.mapWithState(stateSpec)
wordCountStateStream.print()

ssc.remember(Minutes(1)) // To make sure data is not deleted by the time
we query it interactively

// Don't forget to set checkpoint directory
ssc.checkpoint("")
ssc.start()
ssc.awaitTermination()
  }

  def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
Some(output)
  }
}
{code}

{code:Title=DummySource}

/**
  * Created by yuvali on 04/02/2016.
  */

import org.apache.spark.storage.StorageLevel
import scala.util.Random
import org.apache.spark.streaming.receiver._

class DummySource(ratePerSec: Int) extends
Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
// Start the thread that receives data over a connection
new Thread("Dummy Source") {
  override def run() { receive() }
}.start()
  }

  def onStop() {
// There is nothing much to do 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Yuval Itzchakov
Let me know if you do need a pull request for this, I can make that happen
(given someone does a vast PR to make sure I'm understanding this problem
right).

On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu 
wrote:

> Thanks for reporting it. I will take a look.
>
> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov  wrote:
>
>> Hi,
>> I've been playing with the expiramental PairDStreamFunctions.mapWithState
>> feature and I've seem to have stumbled across a bug, and was wondering if
>> anyone else has been seeing this behavior.
>>
>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>> along
>> in case anyone else is experiencing such a failure or perhaps someone has
>> insightful information if this is actually a bug:  SPARK-13195
>> 
>>
>> Using the new spark mapWithState API, I've encountered a bug when setting
>> a
>> timeout for mapWithState but no explicit state handling.
>>
>> h1. Steps to reproduce:
>>
>> 1. Create a method which conforms to the StateSpec signature, make sure to
>> not update any state inside it using *state.update*. Simply create a "pass
>> through" method, may even be empty.
>> 2. Create a StateSpec object with method from step 1, which explicitly
>> sets
>> a timeout using *StateSpec.timeout* method.
>> 3. Create a DStream pipeline that uses mapWithState with the given
>> StateSpec.
>> 4. Run code using spark-submit. You'll see that the method ends up
>> throwing
>> the following exception:
>>
>> {code}
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in
>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at
>>
>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> {code}
>>
>> h1. Sample code to reproduce the issue:
>>
>> {code:Title=MainObject}
>> import org.apache.spark.streaming._
>> import org.apache.spark.{SparkConf, SparkContext}
>> /**
>>   * Created by yuvali on 04/02/2016.
>>   */
>> object Program {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>> val sparkContext = new SparkContext(sc)
>>
>> val ssc = new StreamingContext(sparkContext, Seconds(4))
>> val stateSpec = StateSpec.function(trackStateFunc
>> _).timeout(Seconds(60))
>>
>> // Create a stream that generates 1000 lines per second
>> val stream = ssc.receiverStream(new DummySource(10))
>>
>> // Split the lines into words, and create a paired (key-value) dstream
>> val wordStream = stream.flatMap {
>>   _.split(" ")
>> }.map(word => (word, 1))
>>
>> // This represents the emitted stream from the trackStateFunc. Since
>> we
>> emit every input record with the updated value,
>> // this stream will contain the same # of records as the input
>> dstream.
>> val wordCountStateStream = wordStream.mapWithState(stateSpec)
>> wordCountStateStream.print()
>>
>> ssc.remember(Minutes(1)) // To make sure data is not deleted by the
>> time
>> we query it interactively
>>
>> // Don't forget to set checkpoint directory
>> ssc.checkpoint("")
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>>
>>   def trackStateFunc(batchTime: Time, key: String, 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Tathagata Das
Shixiong has already opened the PR -
https://github.com/apache/spark/pull/11081

On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov  wrote:

> Let me know if you do need a pull request for this, I can make that happen
> (given someone does a vast PR to make sure I'm understanding this problem
> right).
>
> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Thanks for reporting it. I will take a look.
>>
>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
>> wrote:
>>
>>> Hi,
>>> I've been playing with the expiramental PairDStreamFunctions.mapWithState
>>> feature and I've seem to have stumbled across a bug, and was wondering if
>>> anyone else has been seeing this behavior.
>>>
>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>> along
>>> in case anyone else is experiencing such a failure or perhaps someone has
>>> insightful information if this is actually a bug:  SPARK-13195
>>> 
>>>
>>> Using the new spark mapWithState API, I've encountered a bug when
>>> setting a
>>> timeout for mapWithState but no explicit state handling.
>>>
>>> h1. Steps to reproduce:
>>>
>>> 1. Create a method which conforms to the StateSpec signature, make sure
>>> to
>>> not update any state inside it using *state.update*. Simply create a
>>> "pass
>>> through" method, may even be empty.
>>> 2. Create a StateSpec object with method from step 1, which explicitly
>>> sets
>>> a timeout using *StateSpec.timeout* method.
>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>> StateSpec.
>>> 4. Run code using spark-submit. You'll see that the method ends up
>>> throwing
>>> the following exception:
>>>
>>> {code}
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in
>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
>>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at
>>>
>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> {code}
>>>
>>> h1. Sample code to reproduce the issue:
>>>
>>> {code:Title=MainObject}
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> /**
>>>   * Created by yuvali on 04/02/2016.
>>>   */
>>> object Program {
>>>
>>>   def main(args: Array[String]): Unit = {
>>>
>>> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>> val sparkContext = new SparkContext(sc)
>>>
>>> val ssc = new StreamingContext(sparkContext, Seconds(4))
>>> val stateSpec = StateSpec.function(trackStateFunc
>>> _).timeout(Seconds(60))
>>>
>>> // Create a stream that generates 1000 lines per second
>>> val stream = ssc.receiverStream(new DummySource(10))
>>>
>>> // Split the lines into words, and create a paired (key-value)
>>> dstream
>>> val wordStream = stream.flatMap {
>>>   _.split(" ")
>>> }.map(word => (word, 1))
>>>
>>> // This represents the emitted stream from the trackStateFunc. Since
>>> we
>>> emit every input record with the updated value,
>>> // this stream will contain the same # of records as the input
>>> dstream.
>>> val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>> wordCountStateStream.print()
>>>

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Yuval Itzchakov
Awesome. Thanks for the super fast reply.

On Thu, Feb 4, 2016, 21:16 Tathagata Das 
wrote:

> Shixiong has already opened the PR -
> https://github.com/apache/spark/pull/11081
>
> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
> wrote:
>
>> Let me know if you do need a pull request for this, I can make that
>> happen (given someone does a vast PR to make sure I'm understanding this
>> problem right).
>>
>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Thanks for reporting it. I will take a look.
>>>
>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
>>> wrote:
>>>
 Hi,
 I've been playing with the expiramental
 PairDStreamFunctions.mapWithState
 feature and I've seem to have stumbled across a bug, and was wondering
 if
 anyone else has been seeing this behavior.

 I've opened up an issue in the Spark JIRA, I simply want to pass this
 along
 in case anyone else is experiencing such a failure or perhaps someone
 has
 insightful information if this is actually a bug:  SPARK-13195
 

 Using the new spark mapWithState API, I've encountered a bug when
 setting a
 timeout for mapWithState but no explicit state handling.

 h1. Steps to reproduce:

 1. Create a method which conforms to the StateSpec signature, make sure
 to
 not update any state inside it using *state.update*. Simply create a
 "pass
 through" method, may even be empty.
 2. Create a StateSpec object with method from step 1, which explicitly
 sets
 a timeout using *StateSpec.timeout* method.
 3. Create a DStream pipeline that uses mapWithState with the given
 StateSpec.
 4. Run code using spark-submit. You'll see that the method ends up
 throwing
 the following exception:

 {code}
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in
 stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 136.0 (TID 176, ): java.util.NoSuchElementException: State is not
 set
 at org.apache.spark.streaming.StateImpl.get(State.scala:150)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at

 org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
 at

 org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
 at org.apache.spark.scheduler.Task.run(Task.scala:89)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 {code}

 h1. Sample code to reproduce the issue:

 {code:Title=MainObject}
 import org.apache.spark.streaming._
 import org.apache.spark.{SparkConf, SparkContext}
 /**
   * Created by yuvali on 04/02/2016.
   */
 object Program {

   def main(args: Array[String]): Unit = {

 val sc = new SparkConf().setAppName("mapWithState bug reproduce")
 val sparkContext = new SparkContext(sc)

 val ssc = new StreamingContext(sparkContext, Seconds(4))
 val stateSpec = StateSpec.function(trackStateFunc
 _).timeout(Seconds(60))

 // Create a stream that generates 1000 lines per second
 val stream = ssc.receiverStream(new DummySource(10))

 // Split the lines into words, and create a paired (key-value)
 dstream
 val wordStream = stream.flatMap {
   _.split(" ")
 }.map(word => (word, 1))

 // This represents the emitted stream from the 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Shixiong(Ryan) Zhu
Thanks for reporting it. I will take a look.

On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov  wrote:

> Hi,
> I've been playing with the expiramental PairDStreamFunctions.mapWithState
> feature and I've seem to have stumbled across a bug, and was wondering if
> anyone else has been seeing this behavior.
>
> I've opened up an issue in the Spark JIRA, I simply want to pass this along
> in case anyone else is experiencing such a failure or perhaps someone has
> insightful information if this is actually a bug:  SPARK-13195
> 
>
> Using the new spark mapWithState API, I've encountered a bug when setting a
> timeout for mapWithState but no explicit state handling.
>
> h1. Steps to reproduce:
>
> 1. Create a method which conforms to the StateSpec signature, make sure to
> not update any state inside it using *state.update*. Simply create a "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly sets
> a timeout using *StateSpec.timeout* method.
> 3. Create a DStream pipeline that uses mapWithState with the given
> StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up throwing
> the following exception:
>
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
>
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h1. Sample code to reproduce the issue:
>
> {code:Title=MainObject}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>
>   def main(args: Array[String]): Unit = {
>
> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
> val sparkContext = new SparkContext(sc)
>
> val ssc = new StreamingContext(sparkContext, Seconds(4))
> val stateSpec = StateSpec.function(trackStateFunc
> _).timeout(Seconds(60))
>
> // Create a stream that generates 1000 lines per second
> val stream = ssc.receiverStream(new DummySource(10))
>
> // Split the lines into words, and create a paired (key-value) dstream
> val wordStream = stream.flatMap {
>   _.split(" ")
> }.map(word => (word, 1))
>
> // This represents the emitted stream from the trackStateFunc. Since we
> emit every input record with the updated value,
> // this stream will contain the same # of records as the input dstream.
> val wordCountStateStream = wordStream.mapWithState(stateSpec)
> wordCountStateStream.print()
>
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the
> time
> we query it interactively
>
> // Don't forget to set checkpoint directory
> ssc.checkpoint("")
> ssc.start()
> ssc.awaitTermination()
>   }
>
>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
> state: State[Long]): Option[(String, Long)] = {
> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
> val output = (key, sum)
> Some(output)
>   }
> }
> {code}
>
> {code:Title=DummySource}
>
> /**
>   * Created by yuvali on 04/02/2016.
>   */
>
> import org.apache.spark.storage.StorageLevel
> import scala.util.Random
> import 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Sachin Aggarwal
i think we need to add port
http://serverfault.com/questions/317903/aws-ec2-open-port-8080


do u remember doing anything like this earlier for aws 1

On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov  wrote:

> Awesome. Thanks for the super fast reply.
>
>
> On Thu, Feb 4, 2016, 21:16 Tathagata Das 
> wrote:
>
>> Shixiong has already opened the PR -
>> https://github.com/apache/spark/pull/11081
>>
>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
>> wrote:
>>
>>> Let me know if you do need a pull request for this, I can make that
>>> happen (given someone does a vast PR to make sure I'm understanding this
>>> problem right).
>>>
>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 Thanks for reporting it. I will take a look.

 On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
 wrote:

> Hi,
> I've been playing with the expiramental
> PairDStreamFunctions.mapWithState
> feature and I've seem to have stumbled across a bug, and was wondering
> if
> anyone else has been seeing this behavior.
>
> I've opened up an issue in the Spark JIRA, I simply want to pass this
> along
> in case anyone else is experiencing such a failure or perhaps someone
> has
> insightful information if this is actually a bug:  SPARK-13195
> 
>
> Using the new spark mapWithState API, I've encountered a bug when
> setting a
> timeout for mapWithState but no explicit state handling.
>
> h1. Steps to reproduce:
>
> 1. Create a method which conforms to the StateSpec signature, make
> sure to
> not update any state inside it using *state.update*. Simply create a
> "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly
> sets
> a timeout using *StateSpec.timeout* method.
> 3. Create a DStream pipeline that uses mapWithState with the given
> StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up
> throwing
> the following exception:
>
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not
> set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
>
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h1. Sample code to reproduce the issue:
>
> {code:Title=MainObject}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>
>   def main(args: Array[String]): Unit = {
>
> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
> val sparkContext = new SparkContext(sc)
>
> val ssc = new StreamingContext(sparkContext, Seconds(4))
> val stateSpec = StateSpec.function(trackStateFunc
> _).timeout(Seconds(60))
>
> // Create a stream 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Sachin Aggarwal
http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/

The default Tomcat server uses port 8080. You need to open that port on
your instance to make sure your Tomcat server is available on the Web (you
could also change the default port). In the AWS Management Console, select
Security Groups (left navigation bar), select the quick-start group, the
Inbound tab and add port 8080. Make sure you click “Add Rule” and then
“Apply Rule Changes”.

On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal 
wrote:

> i think we need to add port
> http://serverfault.com/questions/317903/aws-ec2-open-port-8080
>
>
> do u remember doing anything like this earlier for aws 1
>
> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov  wrote:
>
>> Awesome. Thanks for the super fast reply.
>>
>>
>> On Thu, Feb 4, 2016, 21:16 Tathagata Das 
>> wrote:
>>
>>> Shixiong has already opened the PR -
>>> https://github.com/apache/spark/pull/11081
>>>
>>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
>>> wrote:
>>>
 Let me know if you do need a pull request for this, I can make that
 happen (given someone does a vast PR to make sure I'm understanding this
 problem right).

 On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
 shixi...@databricks.com> wrote:

> Thanks for reporting it. I will take a look.
>
> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
> wrote:
>
>> Hi,
>> I've been playing with the expiramental
>> PairDStreamFunctions.mapWithState
>> feature and I've seem to have stumbled across a bug, and was
>> wondering if
>> anyone else has been seeing this behavior.
>>
>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>> along
>> in case anyone else is experiencing such a failure or perhaps someone
>> has
>> insightful information if this is actually a bug:  SPARK-13195
>> 
>>
>> Using the new spark mapWithState API, I've encountered a bug when
>> setting a
>> timeout for mapWithState but no explicit state handling.
>>
>> h1. Steps to reproduce:
>>
>> 1. Create a method which conforms to the StateSpec signature, make
>> sure to
>> not update any state inside it using *state.update*. Simply create a
>> "pass
>> through" method, may even be empty.
>> 2. Create a StateSpec object with method from step 1, which
>> explicitly sets
>> a timeout using *StateSpec.timeout* method.
>> 3. Create a DStream pipeline that uses mapWithState with the given
>> StateSpec.
>> 4. Run code using spark-submit. You'll see that the method ends up
>> throwing
>> the following exception:
>>
>> {code}
>> org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in
>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage
>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not
>> set
>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at
>>
>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>> at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at 

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Sachin Aggarwal
I am sorry for spam, I replied in wrong thread sleepy head :-(

On Fri, Feb 5, 2016 at 1:15 AM, Sachin Aggarwal 
wrote:

>
> http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/
>
> The default Tomcat server uses port 8080. You need to open that port on
> your instance to make sure your Tomcat server is available on the Web (you
> could also change the default port). In the AWS Management Console, select
> Security Groups (left navigation bar), select the quick-start group, the
> Inbound tab and add port 8080. Make sure you click “Add Rule” and then
> “Apply Rule Changes”.
>
> On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal <
> different.sac...@gmail.com> wrote:
>
>> i think we need to add port
>> http://serverfault.com/questions/317903/aws-ec2-open-port-8080
>>
>>
>> do u remember doing anything like this earlier for aws 1
>>
>> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov 
>> wrote:
>>
>>> Awesome. Thanks for the super fast reply.
>>>
>>>
>>> On Thu, Feb 4, 2016, 21:16 Tathagata Das 
>>> wrote:
>>>
 Shixiong has already opened the PR -
 https://github.com/apache/spark/pull/11081

 On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov 
 wrote:

> Let me know if you do need a pull request for this, I can make that
> happen (given someone does a vast PR to make sure I'm understanding this
> problem right).
>
> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Thanks for reporting it. I will take a look.
>>
>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov 
>> wrote:
>>
>>> Hi,
>>> I've been playing with the expiramental
>>> PairDStreamFunctions.mapWithState
>>> feature and I've seem to have stumbled across a bug, and was
>>> wondering if
>>> anyone else has been seeing this behavior.
>>>
>>> I've opened up an issue in the Spark JIRA, I simply want to pass
>>> this along
>>> in case anyone else is experiencing such a failure or perhaps
>>> someone has
>>> insightful information if this is actually a bug:  SPARK-13195
>>> 
>>>
>>> Using the new spark mapWithState API, I've encountered a bug when
>>> setting a
>>> timeout for mapWithState but no explicit state handling.
>>>
>>> h1. Steps to reproduce:
>>>
>>> 1. Create a method which conforms to the StateSpec signature, make
>>> sure to
>>> not update any state inside it using *state.update*. Simply create a
>>> "pass
>>> through" method, may even be empty.
>>> 2. Create a StateSpec object with method from step 1, which
>>> explicitly sets
>>> a timeout using *StateSpec.timeout* method.
>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>> StateSpec.
>>> 4. Run code using spark-submit. You'll see that the method ends up
>>> throwing
>>> the following exception:
>>>
>>> {code}
>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 0 in
>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in
>>> stage
>>> 136.0 (TID 176, ): java.util.NoSuchElementException: State is
>>> not set
>>> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>> at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at
>>>
>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>> at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at
>>>