This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 32960a1 ARROW-3479: [R] Support to write record_batch as stream
32960a1 is described below
commit 32960a13546c41b3b68e87f94548beffab4ead8d
Author: Javier Luraschi <[email protected]>
AuthorDate: Wed Oct 10 07:48:24 2018 -0400
ARROW-3479: [R] Support to write record_batch as stream
Using this PR as a WIP to efficiently transfer data from R to Spark using
Arrow.
This PR might be ultimately closed and not merged, but thought it would be
good to give visibility as to what I'm exploring.
Specifically, I'm working on supporting efficient execution of:
```r
library(sparklyr)
sc <- spark_connect(master = "local")
copy_to(sc, system.time({
tbl_data <- sdf_copy_to(sc, data.frame(y = runif(10^6, 0, 1)), "data",
overwrite = TRUE)
})
```
Currently, without this PR and without using `arrow`:
```r
system.time({
tbl_data <- sdf_copy_to(sc, data.frame(y = runif(10^6, 0, 1)), "data",
overwrite = TRUE)
})
```
```
user system elapsed
1.120 0.087 3.482
```
Using `arrow` is down to:
```r
library(arrow)
copy_to(sc, system.time({
tbl_data <- sdf_copy_to(sc, data.frame(y = runif(10^6, 0, 1)), "data",
overwrite = TRUE)
})
```
```
user system elapsed
0.222 0.029 0.641
```
and down to the following while using `record$to_raw()` from this PR
instead of `record$to_file()`:
```
user system elapsed
0.102 0.007 0.351
```
Author: Javier Luraschi <[email protected]>
Closes #2727 from javierluraschi/feature/r-to-raw and squashes the
following commits:
0e302a590 <Javier Luraschi> use snake casing not camel
40f4e24d9 <Javier Luraschi> additional code review feedback
0cf4ce748 <Javier Luraschi> additional code review feedback
9e27d04b7 <Javier Luraschi> fix clang lint warnings
ec1a8c2fe <Javier Luraschi> add test for record_batch to_stream
a1580df0d <Javier Luraschi> avoid double copy under to_stream for R bindings
643371dbc <Javier Luraschi> implement record to_raw() for r bindings
---
r/R/RcppExports.R | 4 ++++
r/R/RecordBatch.R | 1 +
r/src/RcppExports.cpp | 12 ++++++++++++
r/src/RecordBatch.cpp | 19 +++++++++++++++++++
r/tests/testthat/test-RecordBatch.R | 13 +++++++++++++
5 files changed, 49 insertions(+)
diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R
index 0d0299f..d8f63ed 100644
--- a/r/R/RcppExports.R
+++ b/r/R/RcppExports.R
@@ -297,6 +297,10 @@ RecordBatch__to_file <- function(batch, path) {
.Call(`_arrow_RecordBatch__to_file`, batch, path)
}
+RecordBatch__to_stream <- function(batch) {
+ .Call(`_arrow_RecordBatch__to_stream`, batch)
+}
+
RecordBatch__from_dataframe <- function(tbl) {
.Call(`_arrow_RecordBatch__from_dataframe`, tbl)
}
diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R
index 4712aaf..af2a290 100644
--- a/r/R/RecordBatch.R
+++ b/r/R/RecordBatch.R
@@ -23,6 +23,7 @@
num_rows = function() RecordBatch__num_rows(self),
schema = function() `arrow::Schema`$new(RecordBatch__schema(self)),
to_file = function(path) invisible(RecordBatch__to_file(self,
fs::path_abs(path))),
+ to_stream = function() RecordBatch__to_stream(self),
column = function(i) `arrow::Array`$new(RecordBatch__column(self, i)),
column_name = function(i) RecordBatch__column_name(self, i),
names = function() RecordBatch__names(self),
diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp
index dcf005a..4ec9162 100644
--- a/r/src/RcppExports.cpp
+++ b/r/src/RcppExports.cpp
@@ -814,6 +814,17 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
+// RecordBatch__to_stream
+RawVector RecordBatch__to_stream(const std::shared_ptr<arrow::RecordBatch>&
batch);
+RcppExport SEXP _arrow_RecordBatch__to_stream(SEXP batchSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>&
>::type batch(batchSEXP);
+ rcpp_result_gen = Rcpp::wrap(RecordBatch__to_stream(batch));
+ return rcpp_result_gen;
+END_RCPP
+}
// RecordBatch__from_dataframe
std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl);
RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) {
@@ -1329,6 +1340,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_RecordBatch__to_dataframe", (DL_FUNC)
&_arrow_RecordBatch__to_dataframe, 1},
{"_arrow_read_record_batch_", (DL_FUNC) &_arrow_read_record_batch_, 1},
{"_arrow_RecordBatch__to_file", (DL_FUNC) &_arrow_RecordBatch__to_file, 2},
+ {"_arrow_RecordBatch__to_stream", (DL_FUNC)
&_arrow_RecordBatch__to_stream, 1},
{"_arrow_RecordBatch__from_dataframe", (DL_FUNC)
&_arrow_RecordBatch__from_dataframe, 1},
{"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2},
{"_arrow_RecordBatch__RemoveColumn", (DL_FUNC)
&_arrow_RecordBatch__RemoveColumn, 2},
diff --git a/r/src/RecordBatch.cpp b/r/src/RecordBatch.cpp
index d4bca93..ec99c2e 100644
--- a/r/src/RecordBatch.cpp
+++ b/r/src/RecordBatch.cpp
@@ -16,6 +16,7 @@
// under the License.
#include <arrow/io/file.h>
+#include <arrow/io/memory.h>
#include <arrow/ipc/reader.h>
#include <arrow/ipc/writer.h>
#include "arrow_types.h"
@@ -95,6 +96,24 @@ int RecordBatch__to_file(const
std::shared_ptr<arrow::RecordBatch>& batch,
}
// [[Rcpp::export]]
+RawVector RecordBatch__to_stream(const std::shared_ptr<arrow::RecordBatch>&
batch) {
+ io::MockOutputStream mock_sink;
+ R_ERROR_NOT_OK(arrow::ipc::WriteRecordBatchStream({batch}, &mock_sink));
+
+ RawVector res(mock_sink.GetExtentBytesWritten());
+
+ std::shared_ptr<arrow::MutableBuffer> raw_buffer;
+ raw_buffer.reset(new arrow::MutableBuffer(res.begin(), res.size()));
+
+ std::unique_ptr<arrow::io::FixedSizeBufferWriter> sink;
+ sink.reset(new arrow::io::FixedSizeBufferWriter(raw_buffer));
+
+ R_ERROR_NOT_OK(arrow::ipc::WriteRecordBatchStream({batch}, sink.get()));
+
+ return res;
+}
+
+// [[Rcpp::export]]
std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl)
{
CharacterVector names = tbl.names();
diff --git a/r/tests/testthat/test-RecordBatch.R
b/r/tests/testthat/test-RecordBatch.R
index e3557f8..454404f 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -114,3 +114,16 @@ test_that("RecordBatch with 0 rows are supported", {
res <- read_record_batch(tf)
expect_equal(res, batch)
})
+
+test_that("RecordBatch can output stream", {
+ tbl <- tibble::tibble(
+ int = 1:10, dbl = as.numeric(1:10),
+ lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
+ chr = letters[1:10]
+ )
+
+ record <- record_batch(tbl)
+ stream <- record$to_stream()
+
+ expect_gt(length(stream), 0)
+})