I am not sure what DStream operations you are using, but some operation is
internally creating CoalescedRDDs. That is causing the race condition. I
might be able help if you can tell me what DStream operations you are using.

TD


On Tue, Jun 3, 2014 at 4:54 PM, Michael Chang <m...@tellapart.com> wrote:

> Hi Tathagata,
>
> Thanks for your help!  By not using coalesced RDD, do you mean not
> repartitioning my Dstream?
>
> Thanks,
> Mike
>
>
>
>
> On Tue, Jun 3, 2014 at 12:03 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I think I know what is going on! This probably a race condition in the
>> DAGScheduler. I have added a JIRA for this. The fix is not trivial though.
>>
>> https://issues.apache.org/jira/browse/SPARK-2002
>>
>> A "not-so-good" workaround for now would be not use coalesced RDD, which
>> is avoids the race condition.
>>
>> TD
>>
>>
>> On Tue, Jun 3, 2014 at 10:09 AM, Michael Chang <m...@tellapart.com>
>> wrote:
>>
>>> I only had the warning level logs, unfortunately.  There were no other
>>> references of 32855 (except a repeated stack trace, I believe).  I'm using
>>> Spark 0.9.1
>>>
>>>
>>> On Mon, Jun 2, 2014 at 5:50 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Do you have the info level logs of the application? Can you grep the
>>>> value "32855" to find any references to it? Also what version of the
>>>> Spark are you using (so that I can match the stack trace, does not seem to
>>>> match with Spark 1.0)?
>>>>
>>>> TD
>>>>
>>>>
>>>> On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang <m...@tellapart.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Seeing a random exception kill my spark streaming job. Here's a stack
>>>>> trace:
>>>>>
>>>>> java.util.NoSuchElementException: key not found: 32855
>>>>>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>>         at scala.collection.AbstractMap.default(Map.scala:58)
>>>>>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>>>>         at
>>>>> org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211)
>>>>>          at
>>>>> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072)
>>>>>         at
>>>>> org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716)
>>>>>         at
>>>>> org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172)
>>>>>         at
>>>>> org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189)
>>>>>         at
>>>>> org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>         at
>>>>> org.apache.spark.rdd.PartitionCoalescer$LocationIterator.<init>(CoalescedRDD.scala:183)
>>>>>         at
>>>>> org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234)
>>>>>         at
>>>>> org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333)
>>>>>         at
>>>>> org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>>>>         at scala.Option.getOrElse(Option.scala:120)
>>>>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>>>>         at
>>>>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>>>>         at scala.Option.getOrElse(Option.scala:120)
>>>>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>>>>         at
>>>>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>>>>         at scala.Option.getOrElse(Option.scala:120)
>>>>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>>>>         at
>>>>> org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>>>>         at scala.Option.getOrElse(Option.scala:120)
>>>>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>>>>         at scala.Option.getOrElse(Option.scala:120)
>>>>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>>>>         at org.apache.spark.rdd.RDD.take(RDD.scala:830)
>>>>>         at
>>>>> org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337)
>>>>>         at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27)
>>>>>         at
>>>>> com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:87)
>>>>>         at
>>>>> com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:53)
>>>>>         at
>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
>>>>>         at
>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
>>>>>         at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
>>>>>         at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
>>>>>         at
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>         at
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>         at
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>         at scala.util.Try$.apply(Try.scala:161)
>>>>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>>
>>>>> It doesn't seem to happen consistently, but I have no idea causes it.
>>>>>  Has anyone seen this before?  The PersistToKafkaFunction here is just
>>>>> trying to write the elements in a RDD to a Kafka topic.
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to