You can try to reduce the number of containers in order to increase their
memory.

2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>:

> You can try to increase the number of partitions to get ride of the OOM
> errors. Also try to use reduceByKey instead of groupByKey.
>
> Thanks
> Best Regards
>
> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com>
> wrote:
>
>> Hi everyone,
>> I have an RDD of the format (user: String, timestamp: Long, state:
>> Boolean).  My task invovles converting the states, where on/off is
>> represented as true/false, into intervals of 'on' of the format (beginTs:
>> Long, endTs: Long).  So this task requires me, per user, to line up all of
>> the on/off states so that I can compute when it is on, since the
>> calculation is neither associative nor commutative.
>>
>> So there are 2 main operations that I'm trying to accomplish together:
>> 1. group by each user
>> 2. sort by time -- keep all of the states in sorted order by time
>>
>> The main code inside the method that does grouping by user and sorting by
>> time looks sort of looks like this:
>>
>>
>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
>> Boolean)]
>> val grouped = keyedStatesRDD.groupByKey
>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of
>> type RDD[(String, Iterable(Long, Boolean))]
>> // take the sequence of (ts, state) per user, sort, get intervals
>> val groupedIntervals = grouped.mapValues(
>>   states => {
>>     val sortedStates = states.toSeq.sortBy(_._1)
>>     val intervals = DFUtil.statesToIntervals(sortedStates)
>>     val intervalsList = bucketDurations.map{case(k,v) =>
>> (k,v)}(collection.breakOut).sortBy(_._1)
>>     intervalsList
>>   }
>> )
>> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>>
>>
>> When I run my Spark job with 1 day's worth of data, the job completes
>> successfully.  When I run with 1 month's or 1 year's worth of data, that
>> method is where my Spark job consistently crashes with get
>> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>>
>> My suspicion is that the groupByKey is the problem (it's pulling all of
>> the matching data values into a single executor's heap as a plain Scala
>> Iterable).  But alternatives of doing sortByKey on the RDD first before
>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
>> quite apply in my scenario because my operation is not associative (can't
>> combine per-partition results) and I still need to group by users before
>> doing a foldLeft.
>>
>> I've definitely thought about the issue before and come across users with
>> issues that are similar but not exactly the same:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>>
>> And this Jira seems relevant too:
>> https://issues.apache.org/jira/browse/SPARK-3655
>>
>> The amount of memory that I'm using is 2g per executor, and I can't go
>> higher than that because each executor gets a YARN container from nodes
>> with 16 GB of RAM and 5 YARN containers allowed per node.
>>
>> So I'd like to know if there's an easy solution to executing my logic on
>> my full dataset in Spark.
>>
>> Thanks!
>>
>> -- Elango
>>
>
>

Reply via email to