This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e8982ca [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame e8982ca is described below commit e8982ca7ad94e98d907babf2d6f1068b7cd064c6 Author: hyukjinkwon <gurwls...@apache.org> AuthorDate: Sun Jan 27 10:45:49 2019 +0800 [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame ## What changes were proposed in this pull request? This PR targets to support Arrow optimization for conversion from R DataFrame to Spark DataFrame. Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization. This can be tested as below: ```bash $ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true ``` ```r collect(createDataFrame(mtcars)) ``` ### Requirements - R 3.5.x - Arrow package 0.12+ ```bash Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")' ``` **Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204. **Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204. ### Benchmarks **Shall** ```bash sync && sudo purge ./bin/sparkR --conf spark.sql.execution.arrow.enabled=false ``` ```bash sync && sudo purge ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true ``` **R code** ```r createDataFrame(mtcars) # Initializes rdf <- read.csv("500000.csv") test <- function() { options(digits.secs = 6) # milliseconds start.time <- Sys.time() createDataFrame(rdf) end.time <- Sys.time() time.taken <- end.time - start.time print(time.taken) } test() ``` **Data (350 MB):** ```r object.size(read.csv("500000.csv")) 350379504 bytes ``` "500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/ **Results** ``` Time difference of 29.9468 secs ``` ``` Time difference of 3.222129 secs ``` The performance improvement was around **950%**. Actually, this PR improves around **1200%**+ because this PR includes a small optimization about regular R DataFrame -> Spark DatFrame. See https://github.com/apache/spark/pull/22954#discussion_r231847272 ### Limitations: For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values. In this case, we decide to fall back to non-optimization code path. ## How was this patch tested? Small test was added. I manually forced to set this optimization `true` for _all_ R tests and they were _all_ passed (with few of fallback warnings). **TODOs:** - [x] Draft codes - [x] make the tests passed - [x] make the CRAN check pass - [x] Performance measurement - [x] Supportability investigation (for instance types) - [x] Wait for Arrow 0.12.0 release - [x] Fix and match it to Arrow 0.12.0 Closes #22954 from HyukjinKwon/r-arrow-createdataframe. Lead-authored-by: hyukjinkwon <gurwls...@apache.org> Co-authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- R/pkg/R/SQLContext.R | 161 +++++++++++++++++---- R/pkg/R/context.R | 40 ++--- R/pkg/tests/fulltests/test_sparkSQL.R | 57 ++++++++ .../org/apache/spark/sql/internal/SQLConf.scala | 5 +- .../org/apache/spark/sql/api/r/SQLUtils.scala | 22 +++ 5 files changed, 239 insertions(+), 46 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index afcdd6f..2e5506a 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -147,6 +147,70 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToFileInArrow <- function(fileName, rdf, numPartitions) { + requireNamespace1 <- requireNamespace + + # R API in Arrow is not yet released in CRAN. CRAN requires to add the + # package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available + # or not. Therefore, it works around by avoiding direct requireNamespace. + # Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204. + if (requireNamespace1("arrow", quietly = TRUE)) { + record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) + RecordBatchStreamWriter <- get( + "RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE) + FileOutputStream <- get( + "FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE) + + numPartitions <- if (!is.null(numPartitions)) { + numToInt(numPartitions) + } else { + 1 + } + + rdf_slices <- if (numPartitions > 1) { + split(rdf, makeSplits(numPartitions, nrow(rdf))) + } else { + list(rdf) + } + + stream_writer <- NULL + tryCatch({ + for (rdf_slice in rdf_slices) { + batch <- record_batch(rdf_slice) + if (is.null(stream_writer)) { + stream <- FileOutputStream(fileName) + schema <- batch$schema + stream_writer <- RecordBatchStreamWriter(stream, schema) + } + + stream_writer$write_batch(batch) + } + }, + finally = { + if (!is.null(stream_writer)) { + stream_writer$close() + } + }) + + } else { + stop("'arrow' package should be installed.") + } +} + +checkTypeRequirementForArrow <- function(dataHead, schema) { + # Currenty Arrow optimization does not support raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, is.raw))) { + stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { + if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") + } + } +} + #' Create a SparkDataFrame #' #' Converts R data.frame or list into SparkDataFrame. @@ -172,36 +236,76 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" + useArrow <- FALSE + firstRow <- NULL if (is.data.frame(data)) { - # Convert data into a list of rows. Each row is a list. - - # get the names of columns, they will be put into RDD - if (is.null(schema)) { - schema <- names(data) - } + # get the names of columns, they will be put into RDD + if (is.null(schema)) { + schema <- names(data) + } - # get rid of factor type - cleanCols <- function(x) { - if (is.factor(x)) { - as.character(x) - } else { - x - } + # get rid of factor type + cleanCols <- function(x) { + if (is.factor(x)) { + as.character(x) + } else { + x } + } + data[] <- lapply(data, cleanCols) + + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { + useArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + checkTypeRequirementForArrow(data, schema) + fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp") + tryCatch({ + writeToFileInArrow(fileName, data, numPartitions) + jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName) + }, + finally = { + # File might not be created. + suppressWarnings(file.remove(fileName)) + }) + + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + TRUE + }, + error = function(e) { + warning(paste0("createDataFrame attempted Arrow optimization because ", + "'spark.sql.execution.arrow.enabled' is set to true; however, ", + "failed, attempting non-optimization. Reason: ", + e)) + FALSE + }) + } + if (!useArrow) { + # Convert data into a list of rows. Each row is a list. # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + data <- setNames(as.list(data), NULL) # check if all columns have supported type lapply(data, getInternalType) # convert to rows - args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) data <- do.call(mapply, append(args, data)) + if (length(data) > 0) { + firstRow <- data[[1]] + } + } } - if (is.list(data)) { + if (useArrow) { + rdd <- jrddInArrow + } else if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) if (!is.null(numPartitions)) { rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) @@ -215,14 +319,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, } if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) { - row <- firstRDD(rdd) + if (is.null(firstRow)) { + firstRow <- firstRDD(rdd) + } names <- if (is.null(schema)) { - names(row) + names(firstRow) } else { as.list(schema) } if (is.null(names)) { - names <- lapply(1:length(row), function(x) { + names <- lapply(1:length(firstRow), function(x) { paste("_", as.character(x), sep = "") }) } @@ -237,8 +343,8 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, nn }) - types <- lapply(row, infer_type) - fields <- lapply(1:length(row), function(i) { + types <- lapply(firstRow, infer_type) + fields <- lapply(1:length(firstRow), function(i) { structField(names[[i]], types[[i]], TRUE) }) schema <- do.call(structType, fields) @@ -246,10 +352,15 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, stopifnot(class(schema) == "structType") - jrdd <- getJRDD(lapply(rdd, function(x) x), "row") - srdd <- callJMethod(jrdd, "rdd") - sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF", - srdd, schema$jobj, sparkSession) + if (useArrow) { + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "toDataFrame", rdd, schema$jobj, sparkSession) + } else { + jrdd <- getJRDD(lapply(rdd, function(x) x), "row") + srdd <- callJMethod(jrdd, "rdd") + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF", + srdd, schema$jobj, sparkSession) + } dataFrame(sdf) } diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 0207f24..bac3efd 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -81,6 +81,26 @@ objectFile <- function(sc, path, minPartitions = NULL) { RDD(jrdd, "byte") } +makeSplits <- function(numSerializedSlices, length) { + # Generate the slice ids to put each row + # For instance, for numSerializedSlices of 22, length of 50 + # [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22 + # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47 + # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced. + # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD + if (numSerializedSlices > 0) { + unlist(lapply(0: (numSerializedSlices - 1), function(x) { + # nolint start + start <- trunc((as.numeric(x) * length) / numSerializedSlices) + end <- trunc(((as.numeric(x) + 1) * length) / numSerializedSlices) + # nolint end + rep(start, end - start) + })) + } else { + 1 + } +} + #' Create an RDD from a homogeneous list or vector. #' #' This function creates an RDD from a local homogeneous list in R. The elements @@ -143,25 +163,7 @@ parallelize <- function(sc, coll, numSlices = 1) { # For large objects we make sure the size of each slice is also smaller than sizeLimit numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit))) - # Generate the slice ids to put each row - # For instance, for numSerializedSlices of 22, length of 50 - # [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22 - # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47 - # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced. - # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD - splits <- if (numSerializedSlices > 0) { - unlist(lapply(0: (numSerializedSlices - 1), function(x) { - # nolint start - start <- trunc((as.numeric(x) * len) / numSerializedSlices) - end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices) - # nolint end - rep(start, end - start) - })) - } else { - 1 - } - - slices <- split(coll, splits) + slices <- split(coll, makeSplits(numSerializedSlices, len)) # Serialize each slice: obtain a list of raws, or a list of lists (slices) of # 2-tuples of raws diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 88f2286..93cb890 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -307,6 +307,63 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + + conf <- callJMethod(sparkSession, "conf") + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false") + tryCatch({ + expected <- collect(createDataFrame(mtcars)) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) + }) + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true") + tryCatch({ + expect_equal(collect(createDataFrame(mtcars)), expected) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) + }) +}) + +test_that("createDataFrame Arrow optimization - type specification", { + skip_if_not_installed("arrow") + rdf <- data.frame(list(list(a = 1, + b = "a", + c = TRUE, + d = 1.1, + e = 1L, + f = as.Date("1990-02-24"), + g = as.POSIXct("1990-02-24 12:34:56")))) + + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + conf <- callJMethod(sparkSession, "conf") + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false") + tryCatch({ + expected <- collect(createDataFrame(rdf)) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) + }) + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true") + tryCatch({ + expect_equal(collect(createDataFrame(rdf)), expected) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) + }) +}) + test_that("read/write csv as DataFrame", { if (windows_with_hadoop()) { csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index da595e7..4484273 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1286,8 +1286,9 @@ object SQLConf { val ARROW_EXECUTION_ENABLED = buildConf("spark.sql.execution.arrow.enabled") .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " + - "for use with pyspark.sql.DataFrame.toPandas, and " + - "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. " + + "for use with pyspark.sql.DataFrame.toPandas, " + + "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame, " + + "and createDataFrame when its input is an R DataFrame. " + "The following data types are unsupported: " + "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.") .booleanConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index f5d8d4e..693be99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types._ @@ -237,4 +238,25 @@ private[sql] object SQLUtils extends Logging { def createArrayType(column: Column): ArrayType = { new ArrayType(ExprUtils.evalTypeExpr(column.expr), true) } + + /** + * R callable function to read a file in Arrow stream format and create an `RDD` + * using each serialized ArrowRecordBatch as a partition. + */ + def readArrowStreamFromFile( + sparkSession: SparkSession, + filename: String): JavaRDD[Array[Byte]] = { + ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename) + } + + /** + * R callable function to create a `DataFrame` from a `JavaRDD` of serialized + * ArrowRecordBatches. + */ + def toDataFrame( + arrowBatchRDD: JavaRDD[Array[Byte]], + schema: StructType, + sparkSession: SparkSession): DataFrame = { + ArrowConverters.toDataFrame(arrowBatchRDD, schema.json, sparkSession.sqlContext) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org