Imran, I have also observed the phenomenon of reducing the cores helping with OOM. I wanted to ask this (hopefully without straying off topic): we can specify the number of cores and the executor memory. But we don't get to specify _how_ the cores are spread among executors.
Is it possible that with 24G memory and 4 cores we get a spread of 1 core per executor thus ending up with 24G for the task, but with 24G memory and 10 cores some executor ends up with 3 cores on the same machine and thus we have only 8G per task? On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid <iras...@cloudera.com> wrote: > Hi Yong, > > mostly correct except for: > >> >> - Since we are doing reduceByKey, shuffling will happen. Data will be >> shuffled into 1000 partitions, as we have 1000 unique keys. >> >> no, you will not get 1000 partitions. Spark has to decide how many > partitions to use before it even knows how many unique keys there are. If > you have 200 as the default parallelism (or you just explicitly make it the > second parameter to reduceByKey()), then you will get 200 partitions. The > 1000 unique keys will be distributed across the 200 partitions. ideally > they will be distributed pretty equally, but how they get distributed > depends on the partitioner (by default you will have a HashPartitioner, so > it depends on the hash of your keys). > > Note that this is more or less the same as in Hadoop MapReduce. > > the amount of parallelism matters b/c there are various places in spark > where there is some overhead proportional to the size of a partition. So > in your example, if you have 1000 unique keys in 200 partitions, you expect > about 5 unique keys per partitions -- if instead you had 10 partitions, > you'd expect 100 unique keys per partitions, and thus more data and you'd > be more likely to hit an OOM. But there are many other possible sources of > OOM, so this is definitely not the *only* solution. > > Sorry I can't comment in particular about Spark SQL -- hopefully somebody > more knowledgeable can comment on that. > > > > On Wed, Feb 25, 2015 at 8:58 PM, java8964 <java8...@hotmail.com> wrote: > >> Hi, Sparkers: >> >> I come from the Hadoop MapReducer world, and try to understand some >> internal information of spark. From the web and this list, I keep seeing >> people talking about increase the parallelism if you get the OOM error. I >> tried to read document as much as possible to understand the RDD partition, >> and parallelism usage in the spark. >> >> I understand that for RDD from HDFS, by default, one partition will be >> one HDFS block, pretty straightforward. I saw that lots of RDD operations >> support 2nd parameter of parallelism. This is the part confuse me. From my >> understand, the parallelism is totally controlled by how many cores you >> give to your job. Adjust that parameter, or "spark.default.parallelism" >> shouldn't have any impact. >> >> For example, if I have a 10G data in HDFS, and assume the block size is >> 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to >> a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey >> action, using 200 as the default parallelism. Here is what I assume: >> >> >> - We have 100 partitions, as the data comes from 100 blocks. Most >> likely the spark will generate 100 tasks to read and shuffle them? >> - The 1000 unique keys mean the 1000 reducer group, like in MR >> - If I set the max core to be 50, so there will be up to 50 tasks can >> be run concurrently. The rest tasks just have to wait for the core, if >> there are 50 tasks are running. >> - Since we are doing reduceByKey, shuffling will happen. Data will be >> shuffled into 1000 partitions, as we have 1000 unique keys. >> - I don't know these 1000 partitions will be processed by how many >> tasks, maybe this is the parallelism parameter comes in? >> - No matter what parallelism this will be, there are ONLY 50 task can >> be run concurrently. So if we set more cores, more partitions' data will >> be >> processed in the executor (which runs more thread in this case), so more >> memory needs. I don't see how increasing parallelism could help the OOM in >> this case. >> - In my test case of Spark SQL, I gave 24G as the executor heap, my >> join between 2 big datasets keeps getting OOM. I keep increasing the >> "spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no >> help. What really makes the query finish finally without OOM is after I >> change the "--total-executor-cores" from 10 to 4. >> >> >> So my questions are: >> 1) What is the parallelism really mean in the Spark? In the simple >> example above, for reduceByKey, what difference it is between parallelism >> change from 10 to 20? >> 2) When we talk about partition in the spark, for the data coming from >> HDFS, I can understand the partition clearly. For the intermediate data, >> the partition will be same as key, right? For group, reducing, join action, >> uniqueness of the keys will be partition. Is that correct? >> 3) Why increasing parallelism could help OOM? I don't get this part. From >> my limited experience, adjusting the core count really matters for memory. >> >> Thanks >> >> Yong >> > >