[ 
https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533152#comment-15533152
 ] 

Thomas Powell edited comment on SPARK-17634 at 9/29/16 3:56 PM:
----------------------------------------------------------------

This is the time within a single partition (and all partitions are actually 
evenly distributed). I've made the following change and this results in a 
constant time to read 10000 rows. I'll can submit a PR back since this turns 
the deserialization into a linear algorithm which isn't the case in the current 
implementation.
{code}
readMultipleObjectsWithDebug <- function(inputCon) {
    # readMultipleObjects will read multiple continuous objects from
    # a DataOutputStream. There is no preceding field telling the count
    # of the objects, so the number of objects varies, we try to read
    # all objects in a loop until the end of the stream.
    cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n")
    data <- vector('list', 10)
    cap <- 10
    len <- 0
    secs <- elapsedSecs()
    while (TRUE) {
        type <- SparkR:::readType(inputCon)
        if (type == "") {
            break
        }
        if(len == cap) {
            data <- c(data, vector('list', cap))
            cap <- cap*2
        }
        len <- len + 1
        data[[len]] <- SparkR:::readTypedObject(inputCon, type)
        if (len %% 10000 == 0) {
            duration <- elapsedSecs() - secs
            cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, 
append=TRUE, sep="\n")
            secs <- elapsedSecs()
        }
    }
    data[1:len]
}
{code}
For the example I posted above all reads of 10000 rows take around 2 seconds - 
the time to read the first 10000 rows. 

I still think this is terribly slow. It's a huge cost (e.g. if your partitions 
have 1,000,000 rows) to pay even before you've actually run any computations. 
To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} 
to get a preview I still pay the cost of running {{dapply}} on one partition 
which can take a considerably long time. In the example I posted above my 
columns just contained integers but as soon as I expand to strings it takes 
significantly longer to read 10000 row blocks due to the increased volume of 
data.


was (Author: iamthomaspowell):
This is the time within a single partition (and all partitions are actually 
evenly distributed). I've made the following change and this results in a 
constant time to read 10000 rows. I'll can submit a PR back since this turns 
the deserialization into O(n) rather than the current implementation.
{code}
readMultipleObjectsWithDebug <- function(inputCon) {
    # readMultipleObjects will read multiple continuous objects from
    # a DataOutputStream. There is no preceding field telling the count
    # of the objects, so the number of objects varies, we try to read
    # all objects in a loop until the end of the stream.
    cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n")
    data <- vector('list', 10)
    cap <- 10
    len <- 0
    secs <- elapsedSecs()
    while (TRUE) {
        type <- SparkR:::readType(inputCon)
        if (type == "") {
            break
        }
        if(len == cap) {
            data <- c(data, vector('list', cap))
            cap <- cap*2
        }
        len <- len + 1
        data[[len]] <- SparkR:::readTypedObject(inputCon, type)
        if (len %% 10000 == 0) {
            duration <- elapsedSecs() - secs
            cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, 
append=TRUE, sep="\n")
            secs <- elapsedSecs()
        }
    }
    data[1:len]
}
{code}
For the example I posted above all reads of 10000 rows take around 2 seconds - 
the time to read the first 10000 rows. 

I still think this is terribly slow. It's a huge cost (e.g. if your partitions 
have 1,000,000 rows) to pay even before you've actually run any computations. 
To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} 
to get a preview I still pay the cost of running {{dapply}} on one partition 
which can take a considerably long time. In the example I posted above my 
columns just contained integers but as soon as I expand to strings it takes 
significantly longer to read 10000 row blocks due to the increased volume of 
data.

> Spark job hangs when using dapply
> ---------------------------------
>
>                 Key: SPARK-17634
>                 URL: https://issues.apache.org/jira/browse/SPARK-17634
>             Project: Spark
>          Issue Type: Bug
>          Components: SparkR
>    Affects Versions: 2.0.0
>            Reporter: Thomas Powell
>            Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame 
> backed by files in parquet with around 200 files that is around 2GB. When I 
> load this in with the new partition coalescing it ends up having around 20 
> partitions so each one roughly 100MB. The data frame itself has 4 columns of 
> integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that 
> just uses an identity function the tasks hang and make no progress. Both the 
> R and Java processes are running on the Spark nodes and are listening on the 
> {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the 
> socket but never seems to make any progress. The R process is harder to debug 
> what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], 
> int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, 
> int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 
> (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 
> (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() 
> @bci=4, line=212 (Interpreted frame)
>  - 
> org.apache.spark.api.r.RRunner$$anon$1.<init>(org.apache.spark.api.r.RRunner) 
> @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) 
> @bci=109, line=87 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
>  @bci=322, line=59 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object)
>  @bci=5, line=29 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
>  @bci=59, line=178 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object)
>  @bci=5, line=175 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext,
>  int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object,
>  java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
>  @bci=168, line=79 (Interpreted frame)
>  - 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
>  @bci=2, line=47 (Interpreted frame)
>  - org.apache.spark.scheduler.Task.run(long, int, 
> org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame)
>  - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 
> (Interpreted frame)
>  - 
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
>  @bci=95, line=1142 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 
> (Interpreted frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> Any recommendations on how best to debug? Nothing appears in the logs since 
> the processes don't actually fail. The executors themselves have 4GB of 
> memory which should be more than enough.
> My feeling is this could be something around serialization?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to