What I understood from Imran's mail (and what was referenced in his
mail) the RDD mentioned seems to be violating some basic contracts on
how partitions are used in spark [1].
They cannot be arbitrarily numbered,have duplicates, etc.


Extending RDD to add functionality is typically for niche cases; and
requires subclasses to adhere to the explicit (and implicit)
contracts/lifecycles for them.
Using existing RDD's as template would be a good idea for
customizations - one way to look at it is, using RDD is more in api
space but extending them is more in spi space.

Violations would actually not even be detectable by spark-core in general case.


Regards,
Mridul

[1] Ignoring the array out of bounds, etc - I am assuming the intent
is to show overlapping partitions, duplicates. index to partition
mismatch - that sort of thing.


On Thu, Aug 13, 2015 at 11:42 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote:
> Yep, and it works fine for operations which does not involve any shuffle
> (like foreach,, count etc) and those which involves shuffle operations ends
> up in an infinite loop. Spark should somehow indicate this instead of going
> in an infinite loop.
>
> Thanks
> Best Regards
>
> On Thu, Aug 13, 2015 at 11:37 PM, Imran Rashid <iras...@cloudera.com> wrote:
>>
>> oh I see, you are defining your own RDD & Partition types, and you had a
>> bug where partition.index did not line up with the partitions slot in
>> rdd.getPartitions.  Is that correct?
>>
>> On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>>
>>> I figured that out, And these are my findings:
>>>
>>> -> It just enters in an infinite loop when there's a duplicate partition
>>> id.
>>>
>>> -> It enters in an infinite loop when the partition id starts from 1
>>> rather than 0
>>>
>>>
>>> Something like this piece of code can reproduce it: (in getPartitions())
>>>
>>> val total_partitions = 4
>>> val partitionsArray: Array[Partition] =
>>> Array.ofDim[Partition](total_partitions)
>>>
>>> var i = 0
>>>
>>> for(outer <- 0 to 1){
>>>   for(partition <- 1 to total_partitions){
>>>     partitionsArray(i) = new DeadLockPartitions(partition)
>>>     i = i + 1
>>>   }
>>> }
>>>
>>> partitionsArray
>>>
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid <iras...@cloudera.com>
>>> wrote:
>>>>
>>>> yikes.
>>>>
>>>> Was this a one-time thing?  Or does it happen consistently?  can you
>>>> turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe
>>>> ...)
>>>>
>>>> On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>>
>>>>> Hi
>>>>>
>>>>> My Spark job (running in local[*] with spark 1.4.1) reads data from a
>>>>> thrift server(Created an RDD, it will compute the partitions in
>>>>> getPartitions() call and in computes hasNext will return records from 
>>>>> these
>>>>> partitions), count(), foreach() is working fine it returns the correct
>>>>> number of records. But whenever there is shuffleMap stage (like 
>>>>> reduceByKey
>>>>> etc.) then all the tasks are executing properly but it enters in an 
>>>>> infinite
>>>>> loop saying :
>>>>>
>>>>> 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1
>>>>> (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3
>>>>>
>>>>>
>>>>> Here's the complete stack-trace http://pastebin.com/hyK7cG8S
>>>>>
>>>>> What could be the root cause of this problem? I looked up and bumped
>>>>> into this closed JIRA (which is very very old)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to