This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9662dd6  ARROW-8376: [R] Add experimental interface to 
ScanTask/RecordBatch iterators
9662dd6 is described below

commit 9662dd6a314af264699d99081953ece87771ebe9
Author: Neal Richardson <[email protected]>
AuthorDate: Thu Apr 9 16:05:43 2020 -0400

    ARROW-8376: [R] Add experimental interface to ScanTask/RecordBatch iterators
    
    As an alternative to calling `ToTable()` to bring everything into memory, 
it would be nice to expose the stream of batches so that you could aggregate 
(or really do whatever) on each chunk. That gives access to the full dataset, 
which otherwise you can't handle unless it's small.
    
    On the NYC taxi dataset (10.5 years, 125 parquet files),
    
    ```r
    tab <- ds %>%
      select(passenger_count) %>%
      map_batches(~count(., passenger_count)) %>%
      group_by(passenger_count) %>%
      summarize(n = sum(n))
    ```
    
    gives me the tabulation of `passenger_count` in about 200s (no 
parallelization). And you can see all sorts of weird features in the data:
    
    ```
    > as.data.frame(tab)
       passenger_count          n
    1             -127          7
    2             -123          1
    3             -122          1
    4             -119          1
    5             -115          1
    6             -101          1
    7              -98          1
    8              -96          1
    9              -93          1
    10             -92          1
    11             -91          1
    12             -79          1
    13             -64          2
    14             -63          1
    15             -48       1508
    16             -45          1
    17             -43          4
    18             -33          1
    19             -31          1
    20              -9          1
    21              -7          1
    22              -6          3
    23              -2          1
    24              -1         10
    25               0    5809809
    26               1 1078624900
    27               2  227454966
    28               3   67096194
    29               4   32443710
    30               5   99064441
    31               6   37241244
    32               7       1753
    33               8       1437
    34               9       1304
    35              10         17
    36              13          1
    37              15          2
    38              17          1
    39              19          1
    40              25          1
    41              33          2
    42              34          1
    43              36          1
    44              37          1
    45              38          1
    46              47          1
    47              49         26
    48              53          1
    49              58          2
    50              61          1
    51              65          3
    52              66          1
    53              69          1
    54              70          1
    55              84          1
    56              96          1
    57              97          1
    58             113          1
    59             125          1
    ```
    
    Closes #6365 from nealrichardson/r-map-reduce
    
    Authored-by: Neal Richardson <[email protected]>
    Signed-off-by: François Saint-Jacques <[email protected]>
---
 r/NAMESPACE                     |  3 ++
 r/R/arrow-package.R             |  2 +-
 r/R/arrowExports.R              |  8 ++++
 r/R/dataset.R                   | 85 +++++++++++++++++++++++++++++++++++++++--
 r/R/dplyr.R                     | 50 +++++++++++++-----------
 r/man/Scanner.Rd                | 19 ++++++++-
 r/man/map_batches.Rd            | 30 +++++++++++++++
 r/src/arrowExports.cpp          | 32 ++++++++++++++++
 r/src/arrow_types.h             |  2 +
 r/src/dataset.cpp               | 28 ++++++++++++++
 r/tests/testthat/test-dataset.R | 34 +++++++++++++++--
 11 files changed, 261 insertions(+), 32 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index de26a53..f86bc21 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -161,6 +161,7 @@ export(int64)
 export(int8)
 export(last_col)
 export(list_of)
+export(map_batches)
 export(matches)
 export(mmap_create)
 export(mmap_open)
@@ -205,9 +206,11 @@ importFrom(assertthat,assert_that)
 importFrom(bit64,print.integer64)
 importFrom(bit64,str.integer64)
 importFrom(methods,as)
+importFrom(purrr,as_mapper)
 importFrom(purrr,map)
 importFrom(purrr,map2)
 importFrom(purrr,map_chr)
+importFrom(purrr,map_dfr)
 importFrom(purrr,map_int)
 importFrom(purrr,map_lgl)
 importFrom(rlang,"%||%")
diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index b04e735..8fed1ea 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -16,7 +16,7 @@
 # under the License.
 
 #' @importFrom R6 R6Class
-#' @importFrom purrr map map_int map_lgl map_chr map2
+#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl
 #' @importFrom assertthat assert_that
 #' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null 
enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label 
set_names
 #' @importFrom Rcpp sourceCpp
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 0f6b758..a31d72b 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -456,6 +456,14 @@ dataset___Scanner__ToTable <- function(scanner){
     .Call(`_arrow_dataset___Scanner__ToTable` , scanner)
 }
 
+dataset___Scanner__Scan <- function(scanner){
+    .Call(`_arrow_dataset___Scanner__Scan` , scanner)
+}
+
+dataset___ScanTask__get_batches <- function(scan_task){
+    .Call(`_arrow_dataset___ScanTask__get_batches` , scan_task)
+}
+
 shared_ptr_is_null <- function(xp){
     .Call(`_arrow_shared_ptr_is_null` , xp)
 }
diff --git a/r/R/dataset.R b/r/R/dataset.R
index 78dec8e..ed3cf1e 100644
--- a/r/R/dataset.R
+++ b/r/R/dataset.R
@@ -415,9 +415,20 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = 
FileFormat)
 #'
 #' @description
 #' A `Scanner` iterates over a [Dataset]'s fragments and returns data
-#' according to given row filtering and column projection. Use a
-#' `ScannerBuilder`, from a `Dataset`'s `$NewScan()` method, to construct one.
+#' according to given row filtering and column projection. A `ScannerBuilder`
+#' can help create one.
 #'
+#' @section Factory:
+#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`.
+#' It takes the following arguments:
+#'
+#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the
+#'    `dplyr` methods on `Dataset`.
+#' * `projection`: A character vector of column names to select
+#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` 
(default)
+#'    to keep all rows.
+#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
+#' * `...`: Additional arguments, currently ignored
 #' @section Methods:
 #' `ScannerBuilder` has the following methods:
 #'
@@ -440,9 +451,77 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = 
FileFormat)
 #' @export
 Scanner <- R6Class("Scanner", inherit = ArrowObject,
   public = list(
-    ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self))
+    ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)),
+    Scan = function() map(dataset___Scanner__Scan(self), shared_ptr, class = 
ScanTask)
   )
 )
+Scanner$create <- function(dataset, projection = NULL, filter = TRUE, 
use_threads = TRUE, ...) {
+  if (inherits(dataset, "arrow_dplyr_query") && inherits(dataset$.data, 
"Dataset")) {
+    return(Scanner$create(
+      dataset$.data,
+      dataset$selected_columns,
+      dataset$filtered_rows,
+      use_threads,
+      ...
+    ))
+  }
+  assert_is(dataset, "Dataset")
+  scanner_builder <- dataset$NewScan()
+  if (use_threads) {
+    scanner_builder$UseThreads()
+  }
+  if (!is.null(projection)) {
+    scanner_builder$Project(projection)
+  }
+  if (!isTRUE(filter)) {
+    scanner_builder$Filter(filter)
+  }
+  scanner_builder$Finish()
+}
+
+ScanTask <- R6Class("ScanTask", inherit = ArrowObject,
+  public = list(
+    Execute = function() map(dataset___ScanTask__get_batches(self), 
shared_ptr, class = RecordBatch)
+  )
+)
+
+#' Apply a function to a stream of RecordBatches
+#'
+#' As an alternative to calling `collect()` on a `Dataset` query, you can
+#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
+#' This lets you aggregate on each chunk and pull the intermediate results into
+#' a `data.frame` for further aggregation, even if you couldn't fit the whole
+#' `Dataset` result in memory.
+#'
+#' This is experimental and not recommended for production use.
+#'
+#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
+#' `dplyr` methods on `Dataset`.
+#' @param FUN A function or `purrr`-style lambda expression to apply to each
+#' batch
+#' @param ... Additional arguments passed to `FUN`
+#' @param .data.frame logical: collect the resulting chunks into a single
+#' `data.frame`? Default `TRUE`
+#' @export
+map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
+  if (.data.frame) {
+    lapply <- map_dfr
+  }
+  scanner <- Scanner$create(ensure_group_vars(X))
+  FUN <- as_mapper(FUN)
+  # message("Making ScanTasks")
+  lapply(scanner$Scan(), function(scan_task) {
+    # This outer lapply could be parallelized
+    # message("Making Batches")
+    lapply(scan_task$Execute(), function(batch) {
+      # message("Processing Batch")
+      # This inner lapply cannot be parallelized
+      # TODO: wrap batch in arrow_dplyr_query with X$selected_columns and 
X$group_by_vars
+      # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
+      FUN(batch, ...)
+    })
+  })
+}
 
 #' @usage NULL
 #' @format NULL
diff --git a/r/R/dplyr.R b/r/R/dplyr.R
index 3f57742..1d3b5ee 100644
--- a/r/R/dplyr.R
+++ b/r/R/dplyr.R
@@ -229,39 +229,45 @@ set_filters <- function(.data, expressions) {
 }
 
 collect.arrow_dplyr_query <- function(x, ...) {
-  colnames <- x$selected_columns
-  # Be sure to retain any group_by vars
-  gv <- setdiff(dplyr::group_vars(x), names(colnames))
-  if (length(gv)) {
-    colnames <- c(colnames, set_names(gv))
-  }
-
+  x <- ensure_group_vars(x)
   # Pull only the selected rows and cols into R
   if (query_on_dataset(x)) {
     # See dataset.R for Dataset and Scanner(Builder) classes
-    scanner_builder <- x$.data$NewScan()
-    scanner_builder$UseThreads()
-    scanner_builder$Project(colnames)
-    if (!isTRUE(x$filtered_rows)) {
-      scanner_builder$Filter(x$filtered_rows)
-    }
-    df <- as.data.frame(scanner_builder$Finish()$ToTable())
+    df <- Scanner$create(x)$ToTable()
   } else {
     # This is a Table/RecordBatch. See record-batch.R for the [ method
-    df <- as.data.frame(x$.data[x$filtered_rows, colnames, keep_na = FALSE])
+    df <- x$.data[x$filtered_rows, x$selected_columns, keep_na = FALSE]
   }
-  # In case variables were renamed, apply those names
-  names(df) <- names(colnames)
+  df <- as.data.frame(df)
+  restore_dplyr_features(df, x)
+}
+collect.Table <- as.data.frame.Table
+collect.RecordBatch <- as.data.frame.RecordBatch
+collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)
 
+ensure_group_vars <- function(x) {
+  if (inherits(x, "arrow_dplyr_query")) {
+    # Before pulling data from Arrow, make sure all group vars are in the 
projection
+    gv <- set_names(setdiff(dplyr::group_vars(x), names(x)))
+    x$selected_columns <- c(x$selected_columns, gv)
+  }
+  x
+}
+
+restore_dplyr_features <- function(df, query) {
+  # An arrow_dplyr_query holds some attributes that Arrow doesn't know about
+  # After pulling data into a data.frame, make sure these features are carried 
over
+
+  # In case variables were renamed, apply those names
+  if (ncol(df)) {
+    names(df) <- names(query)
+  }
   # Preserve groupings, if present
-  if (length(x$group_by_vars)) {
-    df <- dplyr::grouped_df(df, dplyr::groups(x))
+  if (length(query$group_by_vars)) {
+    df <- dplyr::grouped_df(df, dplyr::groups(query))
   }
   df
 }
-collect.Table <- as.data.frame.Table
-collect.RecordBatch <- as.data.frame.RecordBatch
-collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)
 
 #' @importFrom tidyselect vars_pull
 pull.arrow_dplyr_query <- function(.data, var = -1) {
diff --git a/r/man/Scanner.Rd b/r/man/Scanner.Rd
index a665c0b..2b82d20 100644
--- a/r/man/Scanner.Rd
+++ b/r/man/Scanner.Rd
@@ -6,9 +6,24 @@
 \title{Scan the contents of a dataset}
 \description{
 A \code{Scanner} iterates over a \link{Dataset}'s fragments and returns data
-according to given row filtering and column projection. Use a
-\code{ScannerBuilder}, from a \code{Dataset}'s \verb{$NewScan()} method, to 
construct one.
+according to given row filtering and column projection. A \code{ScannerBuilder}
+can help create one.
 }
+\section{Factory}{
+
+\code{Scanner$create()} wraps the \code{ScannerBuilder} interface to make a 
\code{Scanner}.
+It takes the following arguments:
+\itemize{
+\item \code{dataset}: A \code{Dataset} or \code{arrow_dplyr_query} object, as 
returned by the
+\code{dplyr} methods on \code{Dataset}.
+\item \code{projection}: A character vector of column names to select
+\item \code{filter}: A \code{Expression} to filter the scanned rows by, or 
\code{TRUE} (default)
+to keep all rows.
+\item \code{use_threads}: logical: should scanning use multithreading? Default 
\code{TRUE}
+\item \code{...}: Additional arguments, currently ignored
+}
+}
+
 \section{Methods}{
 
 \code{ScannerBuilder} has the following methods:
diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd
new file mode 100644
index 0000000..67d97a8
--- /dev/null
+++ b/r/man/map_batches.Rd
@@ -0,0 +1,30 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/dataset.R
+\name{map_batches}
+\alias{map_batches}
+\title{Apply a function to a stream of RecordBatches}
+\usage{
+map_batches(X, FUN, ..., .data.frame = TRUE)
+}
+\arguments{
+\item{X}{A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by 
the
+\code{dplyr} methods on \code{Dataset}.}
+
+\item{FUN}{A function or \code{purrr}-style lambda expression to apply to each
+batch}
+
+\item{...}{Additional arguments passed to \code{FUN}}
+
+\item{.data.frame}{logical: collect the resulting chunks into a single
+\code{data.frame}? Default \code{TRUE}}
+}
+\description{
+As an alternative to calling \code{collect()} on a \code{Dataset} query, you 
can
+use this function to access the stream of \code{RecordBatch}es in the 
\code{Dataset}.
+This lets you aggregate on each chunk and pull the intermediate results into
+a \code{data.frame} for further aggregation, even if you couldn't fit the whole
+\code{Dataset} result in memory.
+}
+\details{
+This is experimental and not recommended for production use.
+}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index b38edfc..2a33dea 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1799,6 +1799,36 @@ RcppExport SEXP _arrow_dataset___Scanner__ToTable(SEXP 
scanner_sexp){
 }
 #endif
 
+// dataset.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::vector<std::shared_ptr<ds::ScanTask>> dataset___Scanner__Scan(const 
std::shared_ptr<ds::Scanner>& scanner);
+RcppExport SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){
+BEGIN_RCPP
+       Rcpp::traits::input_parameter<const 
std::shared_ptr<ds::Scanner>&>::type scanner(scanner_sexp);
+       return Rcpp::wrap(dataset___Scanner__Scan(scanner));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){
+       Rf_error("Cannot call dataset___Scanner__Scan(). Please use 
arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::vector<std::shared_ptr<arrow::RecordBatch>> 
dataset___ScanTask__get_batches(const std::shared_ptr<ds::ScanTask>& scan_task);
+RcppExport SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){
+BEGIN_RCPP
+       Rcpp::traits::input_parameter<const 
std::shared_ptr<ds::ScanTask>&>::type scan_task(scan_task_sexp);
+       return Rcpp::wrap(dataset___ScanTask__get_batches(scan_task));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){
+       Rf_error("Cannot call dataset___ScanTask__get_batches(). Please use 
arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
 // datatype.cpp
 #if defined(ARROW_R_WITH_ARROW)
 bool shared_ptr_is_null(SEXP xp);
@@ -5863,6 +5893,8 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) 
&_arrow_dataset___ScannerBuilder__schema, 1}, 
                { "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC) 
&_arrow_dataset___ScannerBuilder__Finish, 1}, 
                { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) 
&_arrow_dataset___Scanner__ToTable, 1}, 
+               { "_arrow_dataset___Scanner__Scan", (DL_FUNC) 
&_arrow_dataset___Scanner__Scan, 1}, 
+               { "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) 
&_arrow_dataset___ScanTask__get_batches, 1}, 
                { "_arrow_shared_ptr_is_null", (DL_FUNC) 
&_arrow_shared_ptr_is_null, 1}, 
                { "_arrow_unique_ptr_is_null", (DL_FUNC) 
&_arrow_unique_ptr_is_null, 1}, 
                { "_arrow_Int8__initialize", (DL_FUNC) 
&_arrow_Int8__initialize, 0}, 
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 632067e..502ee4d 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -212,8 +212,10 @@ inline std::shared_ptr<T> extract(SEXP x) {
 #include <arrow/json/reader.h>
 #include <arrow/result.h>
 #include <arrow/type.h>
+#include <arrow/type_fwd.h>
 #include <arrow/util/checked_cast.h>
 #include <arrow/util/compression.h>
+#include <arrow/util/iterator.h>
 #include <arrow/util/ubsan.h>
 #include <arrow/visitor_inline.h>
 #include <parquet/arrow/reader.h>
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index 00c480f..8d87f18 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -233,4 +233,32 @@ std::shared_ptr<arrow::Table> dataset___Scanner__ToTable(
   return VALUE_OR_STOP(scanner->ToTable());
 }
 
+// [[arrow::export]]
+std::vector<std::shared_ptr<ds::ScanTask>> dataset___Scanner__Scan(
+    const std::shared_ptr<ds::Scanner>& scanner) {
+  auto it = VALUE_OR_STOP(scanner->Scan());
+  std::vector<std::shared_ptr<ds::ScanTask>> out;
+  std::shared_ptr<ds::ScanTask> scan_task;
+  // TODO(npr): can this iteration be parallelized?
+  for (auto st : it) {
+    scan_task = VALUE_OR_STOP(st);
+    out.push_back(scan_task);
+  }
+  return out;
+}
+
+// [[arrow::export]]
+std::vector<std::shared_ptr<arrow::RecordBatch>> 
dataset___ScanTask__get_batches(
+    const std::shared_ptr<ds::ScanTask>& scan_task) {
+  arrow::RecordBatchIterator rbi;
+  rbi = VALUE_OR_STOP(scan_task->Execute());
+  std::vector<std::shared_ptr<arrow::RecordBatch>> out;
+  std::shared_ptr<arrow::RecordBatch> batch;
+  for (auto b : rbi) {
+    batch = VALUE_OR_STOP(b);
+    out.push_back(batch);
+  }
+  return out;
+}
+
 #endif
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 94f10c2..14b5b66 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -206,6 +206,21 @@ test_that("Dataset with multiple file formats", {
   )
 })
 
+test_that("map_batches", {
+  ds <- open_dataset(dataset_dir, partitioning = "part")
+  expect_equivalent(
+    ds %>%
+      filter(int > 5) %>%
+      select(int, lgl) %>%
+      map_batches(
+        ~summarize(.,
+          min_int = min(int)
+        )
+      ),
+    tibble(min_int = c(6L, 101L))
+  )
+})
+
 test_that("partitioning = NULL to ignore partition information (but why?)", {
   ds <- open_dataset(hive_dir, partitioning = NULL)
   expect_identical(names(ds), names(df1)) # i.e. not c(names(df1), "group", 
"other")
@@ -301,10 +316,21 @@ test_that("filter scalar validation doesn't crash 
(ARROW-7772)", {
 test_that("collect() on Dataset works (if fits in memory)", {
   expect_equal(
     collect(open_dataset(dataset_dir)),
-    rbind(
-      cbind(df1),
-      cbind(df2)
-    )
+    rbind(df1, df2)
+  )
+})
+
+test_that("count()", {
+  skip("count() is not a generic so we have to get here through summarize()")
+  ds <- open_dataset(dataset_dir)
+  df <- rbind(df1, df2)
+  expect_equal(
+    ds %>%
+      filter(int > 6, int < 108) %>%
+      count(chr),
+    df %>%
+      filter(int > 6, int < 108) %>%
+      count(chr)
   )
 })
 

Reply via email to