[
https://issues.apache.org/jira/browse/SPARK-703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13981581#comment-13981581
]
Tathagata Das commented on SPARK-703:
-------------------------------------
Yes, that observation is correct. This is something we have to make it clearer
in the documentation. Closing this JIRA for now.
> KafkaWordCount example crashes with java.lang.ArrayIndexOutOfBoundsException
> in CheckpointRDD.scala
> ---------------------------------------------------------------------------------------------------
>
> Key: SPARK-703
> URL: https://issues.apache.org/jira/browse/SPARK-703
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 0.7.0
> Reporter: Craig A. Vanderborgh
>
> This is a bad Spark Streaming bug. The KafkaWordCount example can be used to
> demonstrate the problem. After a few iterations (batches), the test crashes
> with this stack trace during the checkpointing attempt:
> 3/02/22 15:26:54 INFO streaming.JobManager: Total delay: 0.02100 s for job 12
> (execution: 0.01300 s)
> 13/02/22 15:26:54 INFO rdd.CoGroupedRDD: Adding one-to-one dependency with
> MappedValuesRDD[87] at apply at TraversableLike.scala:239
> 13/02/22 15:26:54 INFO rdd.CoGroupedRDD: Adding one-to-one dependency with
> MapPartitionsRDD[56] at apply at TraversableLike.scala:239
> 13/02/22 15:26:54 INFO rdd.CoGroupedRDD: Adding one-to-one dependency with
> MapPartitionsRDD[99] at apply at TraversableLike.scala:239
> 13/02/22 15:26:54 ERROR streaming.JobManager: Running streaming job 13 @
> 1361572014000 ms failed
> java.lang.ArrayIndexOutOfBoundsException: 0
> at spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:27)
> at spark.RDD.partitions(RDD.scala:166)
> at spark.RDD.partitions(RDD.scala:166)
> at
> spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:71)
> at
> spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:65)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:47)
> at spark.rdd.CoGroupedRDD.getPartitions(CoGroupedRDD.scala:63)
> at spark.RDD.partitions(RDD.scala:166)
> at spark.MappedValuesRDD.getPartitions(PairRDDFunctions.scala:655)
> at spark.RDD.partitions(RDD.scala:166)
> at
> spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:71)
> at
> spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:65)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:47)
> at spark.rdd.CoGroupedRDD.getPartitions(CoGroupedRDD.scala:63)
> at spark.RDD.partitions(RDD.scala:166)
> at spark.MappedValuesRDD.getPartitions(PairRDDFunctions.scala:655)
> at spark.RDD.partitions(RDD.scala:166)
> at spark.RDD.take(RDD.scala:550)
> at
> spark.streaming.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:522)
> at
> spark.streaming.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:521)
> at
> spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:22)
> at
> spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:21)
> at
> spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:21)
> at spark.streaming.Job.run(Job.scala:10)
> at spark.streaming.JobManager$JobHandler.run(JobManager.scala:15)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> The only way I can get this test to work on a cluster is to disable
> checkpointing and to use reduceByKey() instead of reduceByKeyAndWindow().
> Also the test works when run using "local" as the master.
--
This message was sent by Atlassian JIRA
(v6.2#6252)