paleolimbot commented on issue #12919:
URL: https://github.com/apache/arrow/issues/12919#issuecomment-1133364938

   There's a good discussion on the mailing list about this in the context of 
Python user-defined-functions, which were just added ( 
https://lists.apache.org/thread/hwcrnf77j8p4dvyzoc3v5cwgws83nvqp ). In the next 
few months we'll have R user-defined functions too, which will serve a related 
purpose (do things with the Arrow compute engine that aren't implemented in 
C++). The gist of the mailing list discussion is that a lot of the parallelism 
is already handled by Arrow and when R/Python code dispatches *more* workers it 
might not result in the performance gain that one might expect.
   
   For `map_batches()` it's a little easier to think of a use-case where 
dispatching incoming RecordBatches to workers running on other cores would help 
(my mind jumps to performing a spatial join, which won't be in the query engine 
anytie soon).
   
   I think we have almost all the infrastructure we need to do this...we can 
read/write from R connections (like `socketConnection()`), which gives us some 
options for interprocess communication if they're not already present/too slow 
in the future package abstraction.
   
   I had some fun trying to wire all of that up and ran into some problems, 
which expose some of the limits of our current infrastucture. Perhaps a good 
scope of what we should support in Arrow is the tools to make this work (while 
the actual implementation could/should live in an extension package?)
   
   <details>
   
   ``` r
   make_arrow_worker <- function(schema, .f, ..., port = 1234) {
     fun <- function(..., port) {
       dots <- list(...)
       input <- arrow:::MakeRConnectionInputStream(socketConnection(port = 
port, server = TRUE, blocking = TRUE))
       output_con <- pipe("cat", "wb")
       output <- arrow:::MakeRConnectionOutputStream(output_con)
       
       reader <- arrow::RecordBatchStreamReader$create(input)
       writer <- arrow::RecordBatchStreamWriter$create(output, schema)
       while (!is.null(batch <- reader$read_next_batch())) {
         args <- c(list(batch), dots)
         result <- do.call(.f, args)
         writer$write(as_record_batch(result))
         flush(con)
       }
     }
     
     callr::r_bg(fun, list(..., port = port))
   }
   
   worker <- make_arrow_worker(
     arrow::schema(x = arrow::int32()),
     function(batch, offset = 0) {
       arrow::record_batch(x = arrow::int32())
       batch$x <- batch$x + offset
       batch
     },
     offset = 123,
     port = 4837
   )
   
   
   input_con <- socketConnection(Sys.info()["nodename"], port = 4837)
   #> Warning in socketConnection(Sys.info()["nodename"], port = 4837): Deweys-
   #> MacBook-Air-2.local:4837 cannot be opened
   #> Error in socketConnection(Sys.info()["nodename"], port = 4837): cannot 
open the connection
   input_stream <- arrow:::MakeRConnectionOutputStream(input_con)
   #> Error in arrow:::MakeRConnectionOutputStream(input_con): object 
'input_con' not found
   writer <- arrow::RecordBatchStreamWriter$create(input_stream, 
arrow::schema(x = arrow::int32()))
   #> Error in is.string(sink): object 'input_stream' not found
   writer$write_batch(arrow::record_batch(x = 2L))
   #> Error in eval(expr, envir, enclos): object 'writer' not found
   worker$read_output()
   #> [1] ""
   worker$read_error()
   #> [1] "Error in socketConnection(port = port, server = TRUE, blocking = 
TRUE) : \n  cannot open the connection\nIn addition: Warning message:\nIn 
socketConnection(port = port, server = TRUE, blocking = TRUE) :\n  port 4837 
cannot be opened\n"
   ```
   
   <sup>Created on 2022-05-20 by the [reprex 
package](https://reprex.tidyverse.org) (v2.0.1)</sup>
   
   </details>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to