I'm curious where the factor of 6-8 comes from ? Is this assuming snappy (or lzf) compression ? The sizes I mentioned are what the Spark UI reports, not sure if those are before or after compression (for the shuffle read/write).
On Fri, Aug 28, 2015 at 4:41 PM, java8964 <java8...@hotmail.com> wrote: > There are several possibilities here. > > 1) Keep in mind that 7GB data will need way more than 7G heap, as > deserialize java object needs much more space than data itself. Grand rule > is multiple 6 to 8 times, so 7G data need 50G heap space. > 2) You should monitor the Spark UI, to check how many records being > processed by task, and if the failed tasks have more data than the rest. > Even current you have tasks failed, you will also have the tasks succeeded. > Compare them, does the failed tasks process way more records than the > succeeded ones? If so, it indicates you have data skew problem. > 3) If the failed tasks allocated similar records as succeeded ones, then > you just add more partitions, to make each task processing less data, You > should always monitor the GC output in these cases. > 4) If most of your tasks failed due to memory, then your setting is too > small for your data, adding partitions or memory. > > > Yong > > ------------------------------ > From: tom...@gmail.com > Date: Fri, 28 Aug 2015 13:55:52 -0700 > Subject: Re: How to avoid shuffle errors for a large join ? > To: ja...@jasonknight.us > CC: user@spark.apache.org > > > Yeah, I tried with 10k and 30k and these still failed, will try with more > then. Though that is a little disappointing, it only writes ~7TB of shuffle > data which shouldn't in theory require more than 1000 reducers on my 10TB > memory cluster (~7GB of spill per reducer). > I'm now wondering if my shuffle partitions are uneven and I should use a > custom partitioner, is there a way to get stats on the partition sizes from > Spark ? > > On Fri, Aug 28, 2015 at 12:46 PM, Jason <ja...@jasonknight.us> wrote: > > I had similar problems to this (reduce side failures for large joins (25bn > rows with 9bn)), and found the answer was to further up the > spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for > me, but your tables look a little denser, so you may want to go even higher. > > On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote: > > I'm getting errors like "Removing executor with no recent heartbeats" & > "Missing an output location for shuffle" errors for a large SparkSql join > (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to > configure the job to avoid them. > > The initial stage completes fine with some 30k tasks on a cluster with 70 > machines/10TB memory, generating about 6.5TB of shuffle writes, but then > the shuffle stage first waits 30min in the scheduling phase according to > the UI, and then dies with the mentioned errors. > > I can see in the GC logs that the executors reach their memory limits (32g > per executor, 2 workers per machine) and can't allocate any more stuff in > the heap. Fwiw, the top 10 in the memory use histogram are: > > num #instances #bytes class name > ---------------------------------------------- > 1: 249139595 11958700560 > scala.collection.immutable.HashMap$HashMap1 > 2: 251085327 8034730464 scala.Tuple2 > 3: 243694737 5848673688 java.lang.Float > 4: 231198778 5548770672 java.lang.Integer > 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; > 6: 72191582 2310130624 > scala.collection.immutable.HashMap$HashTrieMap > 7: 74114058 1778737392 java.lang.Long > 8: 6059103 779203840 [Ljava.lang.Object; > 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer > 10: 34749 70122104 [B > > Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): > > spark.core.connection.ack.wait.timeout 600 > spark.executor.heartbeatInterval 60s > spark.executor.memory 32g > spark.mesos.coarse false > spark.network.timeout 600s > spark.shuffle.blockTransferService netty > spark.shuffle.consolidateFiles true > spark.shuffle.file.buffer 1m > spark.shuffle.io.maxRetries 6 > spark.shuffle.manager sort > > The join is currently configured with spark.sql.shuffle.partitions=1000 > but that doesn't seem to help. Would increasing the partitions help ? Is > there a formula to determine an approximate partitions number value for a > join ? > Any help with this job would be appreciated ! > > cheers, > Tom > > >