Hi Nirav,

                  I faced similar issue with Yarn, EMR 1.5.2 and following
Spark Conf helped me. You can set the values accordingly

conf= (SparkConf().set("spark.master","yarn-client").setAppName("HalfWay"
).set("spark.driver.memory", "15G").set("spark.yarn.am.memory","15G"))

conf=conf.set("spark.driver.maxResultSize","10G").set(
"spark.storage.memoryFraction","0.6").set("spark.shuffle.memoryFraction",
"0.6").set("spark.yarn.executor.memoryOverhead","4000")

conf = conf.set("spark.executor.cores","4").set("spark.executor.memory",
"15G").set("spark.executor.instances","6")

Is it also possible to use reduceBy in place of groupBy that might help the
shuffling too.


Kuchekar, Nilesh

On Wed, Feb 10, 2016 at 8:09 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> We have been trying to solve memory issue with a spark job that processes
> 150GB of data (on disk). It does a groupBy operation; some of the executor
> will receive somehwere around (2-4M scala case objects) to work with. We
> are using following spark config:
>
> "executorInstances": "15",
>
>      "executorCores": "1", (we reduce it to one so single task gets all
> the executorMemory! at least that's the assumption here)
>
>      "executorMemory": "15000m",
>
>      "minPartitions": "2000",
>
>      "taskCpus": "1",
>
>      "executorMemoryOverhead": "1300",
>
>      "shuffleManager": "tungsten-sort",
>
>       "storageFraction": "0.4"
>
>
> This is a snippet of what we see in spark UI for a Job that fails.
>
> This is a *stage* of this job that fails.
>
> Stage IdPool NameDescriptionSubmittedDurationTasks: Succeeded/TotalInput
> OutputShuffle Read â–¾Shuffle WriteFailure Reason
> 5 (retry 15) prod
> <http://hdn7:18080/history/application_1454975800192_0447/stages/pool?poolname=prod>
>  map
> at SparkDataJobs.scala:210
> <http://hdn7:18080/history/application_1454975800192_0447/stages/stage?id=5&attempt=15>
> +details
>
> 2016/02/09 21:30:06 13 min
> 130/389 (16 failed)
> 1982.6 MB 818.7 MB org.apache.spark.shuffle.FetchFailedException: Error
> in opening
> FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/fasd/appcache/application_1454975800192_0447/blockmgr-abb77b52-9761-457a-b67d-42a15b975d76/0c/shuffle_0_39_0.data,
> offset=11421300, length=2353}
>
> This is one of the single *task* attempt from above stage that threw OOM
>
> 2 22361 0 FAILED PROCESS_LOCAL 38 / nd1.mycom.local 2016/02/09 22:10:42 5.2
> min 1.6 min 7.4 MB / 375509 java.lang.OutOfMemoryError: Java heap space
> +details
>
> java.lang.OutOfMemoryError: Java heap space
>       at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
>       at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
>       at 
> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
>       at 
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203)
>       at 
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at 
> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202)
>       at 
> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
>       at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
>       at 
> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
>       at 
> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
>       at 
> org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:3
>
>
> None of above suggest that it went out ot 15GB of memory that I initially
> allocated? So what am i missing here. What's eating my memory.
>
> We tried executorJavaOpts to get heap dump but it doesn't seem to work.
>
> -XX:-HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -3 %p'
> -XX:HeapDumpPath=/opt/cores/spark
>
> I don't see any cores being generated.. neither I can find Heap dump
> anywhere in logs.
>
> Also, how do I find yarn container ID from spark executor ID ? So that I
> can investigate yarn nodemanager and resourcemanager logs for particular
> container.
>
> PS - Job does not do any caching of intermediate RDD as each RDD is just
> used once for subsequent step. We use spark 1.5.2 over Yarn in yarn-client
> mode.
>
>
> Thanks
>
>
>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>

Reply via email to