Did you set driver memory? You can confirm it in the Executors tab of the WebUI. Btw, the code may only work in local mode. In a cluster mode, counts will be serialized to remote workers and the result is not fetched by the driver after foreach. You can use RDD.countByValue instead. -Xiangrui
On Fri, Aug 15, 2014 at 8:18 AM, jerryye <jerr...@gmail.com> wrote: > Hi All, > I'm not sure if I should file a JIRA or if I'm missing something obvious > since the test code I'm trying is so simple. I've isolated the problem I'm > seeing to a memory issue but I don't know what parameter I need to tweak, it > does seem related to spark.akka.frameSize. If I sample my RDD with 35% of > the data, everything runs to completion, with more than 35%, it fails. In > standalone mode, I can run on the full RDD without any problems. > > // works > val samples = sc.textFile("s3n://geonames").sample(false,0.35) // 64MB, > 2849439 Lines > > // fails > val samples = sc.textFile("s3n://geonames").sample(false,0.4) // 64MB, > 2849439 Lines > > Any ideas? > > 1) RDD size is causing the problem. The code below as is fails but if I swap > smallSample for samples, the code runs end to end on both cluster and > standalone. > 2) The error I get is: > rg.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:1 > failed 4 times, most recent failure: TID 12 on host > ip-10-251-14-74.us-west-2.compute.internal failed for unknown reason > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) > 3) Using the 1.1.0 branch the driver freezes instead of aborting with the > previous error in #2. > 4) In 1.1.0, changing spark.akka.frameSize also has the effect of no > progress in the driver. > > Code: > val smallSample = sc.parallelize(Array("foo word", "bar word", "baz word")) > > val samples = sc.textFile("s3n://geonames") // 64MB, 2849439 Lines of short > strings > > val counts = new collection.mutable.HashMap[String, Int].withDefaultValue(0) > > samples.toArray.foreach(counts(_) += 1) > > val result = samples.map( > l => (l, counts.get(l)) > ) > > result.count > > Settings (with or without Kryo doesn't matter): > export SPARK_JAVA_OPTS="-Xms5g -Xmx10g -XX:MaxPermSize=10g" > export SPARK_MEM=10g > spark.akka.frameSize 40 > #spark.serializer org.apache.spark.serializer.KryoSerializer > #spark.kryoserializer.buffer.mb 1000 > spark.executor.memory 58315m > spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ > spark.executor.extraClassPath /root/ephemeral-hdfs/conf > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865.html > Sent from the Apache Spark Developers List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org