Did you set driver memory? You can confirm it in the Executors tab of
the WebUI. Btw, the code may only work in local mode. In a cluster
mode, counts will be serialized to remote workers and the result is
not fetched by the driver after foreach. You can use RDD.countByValue
instead. -Xiangrui

On Fri, Aug 15, 2014 at 8:18 AM, jerryye <jerr...@gmail.com> wrote:
> Hi All,
> I'm not sure if I should file a JIRA or if I'm missing something obvious
> since the test code I'm trying is so simple. I've isolated the problem I'm
> seeing to a memory issue but I don't know what parameter I need to tweak, it
> does seem related to spark.akka.frameSize. If I sample my RDD with 35% of
> the data, everything runs to completion, with more than 35%, it fails. In
> standalone mode, I can run on the full RDD without any problems.
>
> // works
> val samples = sc.textFile("s3n://geonames").sample(false,0.35) // 64MB,
> 2849439 Lines
>
> // fails
> val samples = sc.textFile("s3n://geonames").sample(false,0.4) // 64MB,
> 2849439 Lines
>
> Any ideas?
>
> 1) RDD size is causing the problem. The code below as is fails but if I swap
> smallSample for samples, the code runs end to end on both cluster and
> standalone.
> 2) The error I get is:
> rg.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:1
> failed 4 times, most recent failure: TID 12 on host
> ip-10-251-14-74.us-west-2.compute.internal failed for unknown reason
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> 3) Using the 1.1.0 branch the driver freezes instead of aborting with the
> previous error in #2.
> 4) In 1.1.0, changing spark.akka.frameSize also has the effect of no
> progress in the driver.
>
> Code:
> val smallSample = sc.parallelize(Array("foo word", "bar word", "baz word"))
>
> val samples = sc.textFile("s3n://geonames") // 64MB, 2849439 Lines of short
> strings
>
> val counts = new collection.mutable.HashMap[String, Int].withDefaultValue(0)
>
> samples.toArray.foreach(counts(_) += 1)
>
> val result = samples.map(
>   l => (l, counts.get(l))
> )
>
> result.count
>
> Settings (with or without Kryo doesn't matter):
> export SPARK_JAVA_OPTS="-Xms5g -Xmx10g -XX:MaxPermSize=10g"
> export SPARK_MEM=10g
> spark.akka.frameSize 40
> #spark.serializer org.apache.spark.serializer.KryoSerializer
> #spark.kryoserializer.buffer.mb 1000
> spark.executor.memory 58315m
> spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
> spark.executor.extraClassPath /root/ephemeral-hdfs/conf
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to