[ 
https://issues.apache.org/jira/browse/SPARK-12591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15119907#comment-15119907
 ] 

Lin Zhao edited comment on SPARK-12591 at 1/27/16 6:11 PM:
-----------------------------------------------------------

I'm not sure of the timing of the checkpointing. The error seems to affect 
several early batches then go away. Not sure what to make of it. Attached 
screenshot of the streaming page.


was (Author: l...@exabeam.com):
I'm not sure of the timing of the checkpointing. The error seems to affect 
several early batches then go away. Not sure what to make of it.

> NullPointerException using checkpointed mapWithState with KryoSerializer
> ------------------------------------------------------------------------
>
>                 Key: SPARK-12591
>                 URL: https://issues.apache.org/jira/browse/SPARK-12591
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.0
>         Environment: MacOSX
> Java(TM) SE Runtime Environment (build 1.8.0_20-ea-b17)
>            Reporter: Jan Uyttenhove
>            Assignee: Shixiong Zhu
>             Fix For: 1.6.1, 2.0.0
>
>         Attachments: Screen Shot 2016-01-27 at 10.09.18 AM.png
>
>
> Issue occured after upgrading to the RC4 of Spark (streaming) 1.6.0 to 
> (re)test the new mapWithState API, after previously reporting issue 
> SPARK-11932 (https://issues.apache.org/jira/browse/SPARK-11932). 
> For initial report, see 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-streaming-1-6-0-RC4-NullPointerException-using-mapWithState-tt15830.html
> Narrowed it down to an issue unrelated to Kafka directstream, but, after 
> observing very unpredictable behavior as a result of changes to the Kafka 
> messages format, it seems to be related to KryoSerialization in specific.
> For test case, see my modified version of the StatefulNetworkWordCount 
> example: https://gist.github.com/juyttenh/9b4a4103699a7d5f698f 
> To reproduce, use RC4 of Spark-1.6.0 and 
> - start nc:
> {code}nc -lk 9999{code}
> - execute the supplied test case: 
> {code}bin/spark-submit --class 
> org.apache.spark.examples.streaming.StatefulNetworkWordCount --master 
> local[2] file:///some-assembly-jar localhost 9999{code}
> Error scenario:
> - put some text in the nc console with the job running, and observe correct 
> functioning of the word count
> - kill the spark job
> - add some more text in the nc console (with the job not running)
> - restart the spark job and observe the NPE
> (you might need to repeat this a couple of times to trigger the exception)
> Here's the stacktrace: 
> {code}
> 15/12/31 11:43:47 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 5)
> java.lang.NullPointerException
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 15/12/31 11:43:47 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 6, 
> localhost, partition 1,NODE_LOCAL, 2239 bytes)
> 15/12/31 11:43:47 INFO Executor: Running task 1.0 in stage 4.0 (TID 6)
> 15/12/31 11:43:47 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 5, 
> localhost): java.lang.NullPointerException
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 15/12/31 11:43:47 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; 
> aborting job
> 15/12/31 11:43:47 INFO TaskSchedulerImpl: Cancelling stage 4
> 15/12/31 11:43:47 INFO Executor: Executor is trying to kill task 1.0 in stage 
> 4.0 (TID 6)
> 15/12/31 11:43:47 INFO TaskSchedulerImpl: Stage 4 was cancelled
> 15/12/31 11:43:47 INFO DAGScheduler: ResultStage 4 (foreachPartition at 
> StatefulNetworkCountKryo.scala:72) failed in 0.100 s
> 15/12/31 11:43:47 INFO DAGScheduler: Job 1 failed: foreachPartition at 
> StatefulNetworkCountKryo.scala:72, took 0.823996 s
> 15/12/31 11:43:47 INFO JobScheduler: Finished job streaming job 1451558610000 
> ms.0 from job set of time 1451558610000 ms
> 15/12/31 11:43:47 INFO JobScheduler: Total delay: 17.994 s for time 
> 1451558610000 ms (execution: 0.857 s)
> 15/12/31 11:43:47 INFO JobScheduler: Starting job streaming job 1451558620000 
> ms.0 from job set of time 1451558620000 ms
> 15/12/31 11:43:47 ERROR JobScheduler: Error running job streaming job 
> 1451558610000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 
> (TID 5, localhost): java.lang.NullPointerException
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>       at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
>       at 
> org.apache.spark.examples.streaming.StatefulNetworkWordCountKryo$$anonfun$createContext$2.apply(StatefulNetworkCountKryo.scala:72)
>       at 
> org.apache.spark.examples.streaming.StatefulNetworkWordCountKryo$$anonfun$createContext$2.apply(StatefulNetworkCountKryo.scala:71)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>       at scala.util.Try$.apply(Try.scala:161)
>       at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       ... 3 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to