[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515176#comment-15515176 ]
Felix Cheung commented on SPARK-17634: -------------------------------------- How long have you let it run? > Spark job hangs when using dapply > --------------------------------- > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR > Affects Versions: 2.0.0 > Reporter: Thomas Powell > Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.<init>(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - > org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) > @bci=168, line=79 (Interpreted frame) > - > org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) > @bci=2, line=47 (Interpreted frame) > - org.apache.spark.scheduler.Task.run(long, int, > org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame) > - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 > (Interpreted frame) > - > java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) > @bci=95, line=1142 (Interpreted frame) > - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 > (Interpreted frame) > - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame) > {code} > Any recommendations on how best to debug? Nothing appears in the logs since > the processes don't actually fail. The executors themselves have 4GB of > memory which should be more than enough. > My feeling is this could be something around serialization? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org