I'm curious where the factor of 6-8 comes from ? Is this assuming snappy
(or lzf) compression ? The sizes I mentioned are what the Spark UI reports,
not sure if those are before or after compression (for the shuffle
read/write).

On Fri, Aug 28, 2015 at 4:41 PM, java8964 <java8...@hotmail.com> wrote:

> There are several possibilities here.
>
> 1) Keep in mind that 7GB data will need way more than 7G heap, as
> deserialize java object needs much more space than data itself. Grand rule
> is multiple 6 to 8 times, so 7G data need 50G heap space.
> 2) You should monitor the Spark UI, to check how many records being
> processed by task, and if the failed tasks have more data than the rest.
> Even current you have tasks failed, you will also have the tasks succeeded.
> Compare them, does the failed tasks process way more records than the
> succeeded ones? If so, it indicates you have data skew problem.
> 3) If the failed tasks allocated similar records as succeeded ones, then
> you just add more partitions, to make each task processing less data, You
> should always monitor the GC output in these cases.
> 4) If most of your tasks failed due to memory, then your setting is too
> small for your data, adding partitions or memory.
>
>
> Yong
>
> ------------------------------
> From: tom...@gmail.com
> Date: Fri, 28 Aug 2015 13:55:52 -0700
> Subject: Re: How to avoid shuffle errors for a large join ?
> To: ja...@jasonknight.us
> CC: user@spark.apache.org
>
>
> Yeah, I tried with 10k and 30k and these still failed, will try with more
> then. Though that is a little disappointing, it only writes ~7TB of shuffle
> data which shouldn't in theory require more than 1000 reducers on my 10TB
> memory cluster (~7GB of spill per reducer).
> I'm now wondering if my shuffle partitions are uneven and I should use a
> custom partitioner, is there a way to get stats on the partition sizes from
> Spark ?
>
> On Fri, Aug 28, 2015 at 12:46 PM, Jason <ja...@jasonknight.us> wrote:
>
> I had similar problems to this (reduce side failures for large joins (25bn
> rows with 9bn)), and found the answer was to further up the
> spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
> me, but your tables look a little denser, so you may want to go even higher.
>
> On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote:
>
> I'm getting errors like "Removing executor with no recent heartbeats" &
> "Missing an output location for shuffle" errors for a large SparkSql join
> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
> configure the job to avoid them.
>
> The initial stage completes fine with some 30k tasks on a cluster with 70
> machines/10TB memory, generating about 6.5TB of shuffle writes, but then
> the shuffle stage first waits 30min in the scheduling phase according to
> the UI, and then dies with the mentioned errors.
>
> I can see in the GC logs that the executors reach their memory limits (32g
> per executor, 2 workers per machine) and can't allocate any more stuff in
> the heap. Fwiw, the top 10 in the memory use histogram are:
>
> num     #instances         #bytes  class name
> ----------------------------------------------
>    1:     249139595    11958700560
>  scala.collection.immutable.HashMap$HashMap1
>    2:     251085327     8034730464  scala.Tuple2
>    3:     243694737     5848673688  java.lang.Float
>    4:     231198778     5548770672  java.lang.Integer
>    5:      72191585     4298521576  [Lscala.collection.immutable.HashMap;
>    6:      72191582     2310130624
>  scala.collection.immutable.HashMap$HashTrieMap
>    7:      74114058     1778737392  java.lang.Long
>    8:       6059103      779203840  [Ljava.lang.Object;
>    9:       5461096      174755072  scala.collection.mutable.ArrayBuffer
>   10:         34749       70122104  [B
>
> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>
> spark.core.connection.ack.wait.timeout 600
> spark.executor.heartbeatInterval       60s
> spark.executor.memory                  32g
> spark.mesos.coarse                     false
> spark.network.timeout                  600s
> spark.shuffle.blockTransferService     netty
> spark.shuffle.consolidateFiles         true
> spark.shuffle.file.buffer              1m
> spark.shuffle.io.maxRetries            6
> spark.shuffle.manager                  sort
>
> The join is currently configured with spark.sql.shuffle.partitions=1000
> but that doesn't seem to help. Would increasing the partitions help ? Is
> there a formula to determine an approximate partitions number value for a
> join ?
> Any help with this job would be appreciated !
>
> cheers,
> Tom
>
>
>

Reply via email to