[
https://issues.apache.org/jira/browse/SPARK-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
NITESH VERMA reopened SPARK-13488:
----------------------------------
Hi, i closed it by mistake. so reopening it to add more details and
understanding. Thanks
> PairDStreamFunctions.mapWithState fails in case timeout is set
> java.util.NoSuchElementException: None.get
> ---------------------------------------------------------------------------------------------------------
>
> Key: SPARK-13488
> URL: https://issues.apache.org/jira/browse/SPARK-13488
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.6.0
> Reporter: NITESH VERMA
>
> Using the new spark mapWithState API, I've encountered a issue when setting a
> timeout for mapWithState
> hi i am using mapwithstate api with timeout functionality and i am getting
> below mentioned exception when timeout interval hits for ideal data
> i am using example located here at this location
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
> but some changes done:
> 1. org.apache.spark.api.java.Optional class is not available in 1.6 so i am
> using guava library for Optional
> 2. i have used timeout fucnctionality
> below is part of code :
>
> JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
> new PairFunction<String, String, Integer>() {
> @Override
> public Tuple2<String, Integer> call(String s) {
> return new Tuple2<>(s, 1);
> }
> });
>
>
> **// Update the cumulative count function
> Function3<String, Optional<Integer>, State<Integer>, Tuple2<String,
> Integer>> mappingFunc =
> new Function3<String, Optional<Integer>, State<Integer>,
> Tuple2<String, Integer>>() {
> @Override
> public Tuple2<String, Integer> call(String word, Optional<Integer>
> one, State<Integer> state) {
>
>
> int sum = one.or(0) + (state.exists() ? state.get() : 0);
> Tuple2<String, Integer> output = new Tuple2<>(word, sum);
> state.update(sum);
> return output;
> }
> };
>
>
> // DStream made of get cumulative counts that get updated in every batch
> JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String,
> Integer>> stateDstream =
>
> wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new
> Duration(1000) ));**
> when i ran above mentioned code i was getting below mentioned exception
> 16/02/25 11:41:33 ERROR Executor: Exception in task 0.0 in stage 157.0
> (TID 22)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 16/02/25 11:41:33 WARN TaskSetManager: Lost task 0.0 in stage 157.0 (TID
> 22, localhost): java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 16/02/25 11:41:33 ERROR TaskSetManager: Task 0 in stage 157.0 failed 1
> times; aborting job
> 16/02/25 11:41:33 ERROR JobScheduler: Error running job streaming job
> 1456380693000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]