paleolimbot opened a new pull request, #13706:
URL: https://github.com/apache/arrow/pull/13706

   This PR adds support for more types of queries that include calls to R code 
(i.e., `map_batches(..., .lazy = TRUE)`, user-defined functions in mutates, 
arranges, and filters, and custom extension type behaviour). Previously these 
queries failed because it wasn't possible to guarantee that the exec plan would 
be completely executed within a call to `RunWithCapturedR()` where we establish 
an event loop on the main R thread and launch a background thread to do "arrow 
stuff" that can queue tasks to run on the R thread.
   
   The approach I took here was to stuff more of the 
ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that 
doesn't call `plan->StartProducing()` until the first batch has been pulled. 
This lets you return a record batch reader and pass it around at the R level 
(currently how head/tail/a few other things are implemented), and as long as 
it's drained all at once (i.e., `reader$read_table()`) the calls into R will 
work. 
   
   The R code calls within an exec plan *won't* work with 
`reader$read_next_batch()` or the C data interface because there we can't 
guarantee an event loop.
   
   This also has the benefit of allowing us to inject some cancelability to the 
ExecPlan since we can check a StopToken after #13635 (ARROW-11841) for an 
interrupt (for all exec plans). The biggest benefit is, in my view, that the 
lifecycle of the ExecPlan is more explicit...before, the plan was stopped when 
the object was deleted but it was written in a way that I didn't understand for 
a long time. I think a reader subclass makes it more explicit and maybe will 
help to print out nested queries (since they're no longer eagerly evaluated).
   
   An example of something that didn't work before that now does:
   
   ``` r
   library(arrow, warn.conflicts = FALSE)
   #> Some features are not enabled in this build of Arrow. Run `arrow_info()` 
for more information.
   
   register_scalar_function(
     "times_32",
     function(context, x) x * 32.0,
     int32(),
     float64(),
     auto_convert = TRUE
   )
   
   record_batch(a = 1:1000) %>%
     dplyr::mutate(b = times_32(a)) %>%
     as_record_batch_reader() %>%
     as_arrow_table()
   #> Error in record_batch(a = 1:1000) %>% dplyr::mutate(b = times_32(a)) %>% 
: could not find function "%>%"
   
   record_batch(a = 1:1000) %>%
     dplyr::mutate(fun_result = times_32(a)) %>%
     head(11) %>%
     dplyr::collect()
   #> Error in record_batch(a = 1:1000) %>% dplyr::mutate(fun_result = 
times_32(a)) %>% : could not find function "%>%"
   ```
   
   <sup>Created on 2022-07-25 by the [reprex 
package](https://reprex.tidyverse.org) (v2.0.1)</sup>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to