Imran, I have also observed the phenomenon of reducing the cores helping
with OOM. I wanted to ask this (hopefully without straying off topic): we
can specify the number of cores and the executor memory. But we don't get
to specify _how_ the cores are spread among executors.

Is it possible that with 24G memory and 4 cores we get a spread of 1 core
per executor thus ending up with 24G for the task, but with 24G memory and
10 cores some executor ends up with 3 cores on the same machine and thus we
have only 8G per task?

On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid <iras...@cloudera.com> wrote:

> Hi Yong,
>
> mostly correct except for:
>
>>
>>    - Since we are doing reduceByKey, shuffling will happen. Data will be
>>    shuffled into 1000 partitions, as we have 1000 unique keys.
>>
>> no, you will not get 1000 partitions.  Spark has to decide how many
> partitions to use before it even knows how many unique keys there are.  If
> you have 200 as the default parallelism (or you just explicitly make it the
> second parameter to reduceByKey()), then you will get 200 partitions.  The
> 1000 unique keys will be distributed across the 200 partitions.  ideally
> they will be distributed pretty equally, but how they get distributed
> depends on the partitioner (by default you will have a HashPartitioner, so
> it depends on the hash of your keys).
>
> Note that this is more or less the same as in Hadoop MapReduce.
>
> the amount of parallelism matters b/c there are various places in spark
> where there is some overhead proportional to the size of a partition.  So
> in your example, if you have 1000 unique keys in 200 partitions, you expect
> about 5 unique keys per partitions -- if instead you had 10 partitions,
> you'd expect 100 unique keys per partitions, and thus more data and you'd
> be more likely to hit an OOM.  But there are many other possible sources of
> OOM, so this is definitely not the *only* solution.
>
> Sorry I can't comment in particular about Spark SQL -- hopefully somebody
> more knowledgeable can comment on that.
>
>
>
> On Wed, Feb 25, 2015 at 8:58 PM, java8964 <java8...@hotmail.com> wrote:
>
>> Hi, Sparkers:
>>
>> I come from the Hadoop MapReducer world, and try to understand some
>> internal information of spark. From the web and this list, I keep seeing
>> people talking about increase the parallelism if you get the OOM error. I
>> tried to read document as much as possible to understand the RDD partition,
>> and parallelism usage in the spark.
>>
>> I understand that for RDD from HDFS, by default, one partition will be
>> one HDFS block, pretty straightforward. I saw that lots of RDD operations
>> support 2nd parameter of parallelism. This is the part confuse me. From my
>> understand, the parallelism is totally controlled by how many cores you
>> give to your job. Adjust that parameter, or "spark.default.parallelism"
>> shouldn't have any impact.
>>
>> For example, if I have a 10G data in HDFS, and assume the block size is
>> 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
>> a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
>> action, using 200 as the default parallelism. Here is what I assume:
>>
>>
>>    - We have 100 partitions, as the data comes from 100 blocks. Most
>>    likely the spark will generate 100 tasks to read and shuffle them?
>>    - The 1000 unique keys mean the 1000 reducer group, like in MR
>>    - If I set the max core to be 50, so there will be up to 50 tasks can
>>    be run concurrently. The rest tasks just have to wait for the core, if
>>    there are 50 tasks are running.
>>    - Since we are doing reduceByKey, shuffling will happen. Data will be
>>    shuffled into 1000 partitions, as we have 1000 unique keys.
>>    - I don't know these 1000 partitions will be processed by how many
>>    tasks, maybe this is the parallelism parameter comes in?
>>    - No matter what parallelism this will be, there are ONLY 50 task can
>>    be run concurrently. So if we set more cores, more partitions' data will 
>> be
>>    processed in the executor (which runs more thread in this case), so more
>>    memory needs. I don't see how increasing parallelism could help the OOM in
>>    this case.
>>    - In my test case of Spark SQL, I gave 24G as the executor heap, my
>>    join between 2 big datasets keeps getting OOM. I keep increasing the
>>    "spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no
>>    help. What really makes the query finish finally without OOM is after I
>>    change the "--total-executor-cores" from 10 to 4.
>>
>>
>> So my questions are:
>> 1) What is the parallelism really mean in the Spark? In the simple
>> example above, for reduceByKey, what difference it is between parallelism
>> change from 10 to 20?
>> 2) When we talk about partition in the spark, for the data coming from
>> HDFS, I can understand the partition clearly. For the intermediate data,
>> the partition will be same as key, right? For group, reducing, join action,
>> uniqueness of the keys will be partition. Is that correct?
>> 3) Why increasing parallelism could help OOM? I don't get this part. From
>> my limited experience, adjusting the core count really matters for memory.
>>
>> Thanks
>>
>> Yong
>>
>
>

Reply via email to