[
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xiangrui Meng reassigned SPARK-18924:
-------------------------------------
Assignee: (was: Xiangrui Meng)
> Improve collect/createDataFrame performance in SparkR
> -----------------------------------------------------
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
> Issue Type: Improvement
> Components: SparkR
> Reporter: Xiangrui Meng
> Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> *
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> *
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> *
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> *
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation
> overhead. For example, a short round trip of 1 million doubles surprisingly
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(1000000)))))
> user system elapsed
> 14.224 0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R
> workflow is a use case we should pay attention to. SparkR will never be able
> to cover all existing features from CRAN packages. It is also unnecessary for
> Spark to do so because not all features need scalability.
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before
> SparkR. I'm not sure whether we have a license issue in depending on those
> libraries. Another option is to switch to low-level R'C interface or Rcpp,
> which again might have license issue. I'm not an expert here. If we have to
> implement our own, there still exist much space for improvement, discussed
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`,
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based
> on column types, primitive types in particular. We need to handle null/NA
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(10000000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
> user system elapsed
> 0.036 0.021 0.059
> > > system.time(y <- readBin(r, double(), 10000000))
> user system elapsed
> 0.015 0.007 0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in
> general, it should be feasible to obtain 20x or more performance gain.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]