(1) Zero-copy would be awesome. (2) I'm absolute crap at C. (3) I'll be honest, at this point, I'm just happy to have a way to get it working. That said, the return trip back from Rust using the 'coerce RecordBatch to a buffer/byte vector' strategy seems overly complicated. I managed it by extracting various pieces from [Rust] arrow::ipc::writer::StreamWriter and arrow::ipc::writer::IpcDataGenerator. I'd like to file an issue to add a method to StreamWriter to output the StreamWriter.writer (basically how .into_inner() is for BufWriter). I'd appreciate any feedback on that idea.
On Mon, Mar 29, 2021 at 9:17 AM Wes McKinney <[email protected]> wrote: > If you are looking for true zero-copy R/Rust interop, then using the C > interface is the way to go. You shouldn't need to depend on Python to > have this, so we could need to refactor some things on the R side to > compartmentalize anything relating to Python specifically. > > On Sun, Mar 28, 2021 at 10:04 PM Eric Burden <[email protected]> > wrote: > > > > I took a look, but it looks like the R side of things relies on > > `reticulate`, and I'm not keen to add Python as a dependency. After a bit > > of poking at the R source, it actually turned out to be much simpler > that I > > thought. The following code in R produces a raw vector that can be passed > > over and correctly interpreted as a RecordBatch: > > > > > ----------------------------------------------------------------------------------------------------------------- > > rb <- arrow::record_batch(mtcars) > > bytes <- arrow::write_to_raw(rb, "stream") > > > > > ----------------------------------------------------------------------------------------------------------------- > > > > The raw (byte) vector from that can be correctly interpreted by an > > arrow::ipc::reader::StreamReader in Rust. > > > > > > On Wed, Mar 24, 2021 at 4:34 PM Neal Richardson < > [email protected]> > > wrote: > > > > > I'd recommend looking at how we use the C data interface to pass data > > > between Python and R. On the R side, see > > > https://github.com/apache/arrow/blob/master/r/R/python.R and > > > https://github.com/apache/arrow/blob/master/r/src/py-to-r.cpp. I > believe > > > the Rust library has support for the C data interface now, so you would > > > connect with that. > > > > > > Neal > > > > > > On Wed, Mar 24, 2021 at 2:20 PM Eric Burden <[email protected]> > > > wrote: > > > > > > > Hello all! I'm attempting to construct a RecordBatch in R (from the > > > `iris` > > > > dataset for example), write it to an R arrow::BufferOutputStream, > pass > > > the > > > > pointer to the buffer and length to Rust through the C ABI, then > read the > > > > RecordBatch using the Rust arrow::ipc::reader::StreamReader. So, > pretty > > > > straightforward stuff. Unfortunately, I've hit a roadblock. I've > tried to > > > > turn this into a MRE as follows: > > > > > > > > > > > > > > > > ----------------------------------------------------------------------------------------------------------------- > > > > > > > > The R code: > > > > > > > > #' @param df > > > > #' @export > > > > #' @rdname rustbind > > > > #' @examples pass_df(mtcars) > > > > #' @useDynLib rustbind pass_arrow_record_batch_wrapper > > > > pass_df <- function(df = iris) { > > > > rb <- arrow::record_batch(as.data.frame(df)) > > > > output_stream <- arrow::BufferOutputStream$create(initial_capacity > = > > > > 8192) > > > > writer <- arrow::RecordBatchStreamWriter$create(output_stream, > > > rb$schema) > > > > writer$write_batch(rb) > > > > writer$close() > > > > buffer <- output_stream$finish() > > > > output_stream$close() > > > > > > > > # input_stream <- arrow::BufferReader$create(buffer) > > > > # reader <- arrow::RecordBatchStreamReader$create(input_stream) > > > > # df_from_stream <- reader$read_table() > > > > > > > > print("From R:") > > > > print(buffer$pointer()) > > > > print(glue::glue("Buffer Length({buffer$size})\n")) > > > > .Call(pass_arrow_record_batch_wrapper, buffer$pointer(), > buffer$size); > > > > } > > > > > > > > > > > > > > > > ----------------------------------------------------------------------------------------------------------------- > > > > > > > > The three commented lines seem to let me read back the RecordBatch > in R, > > > > which leads me to believe the RecordBatch is being properly written > to > > > > `buffer`. I'm printing the pointer address and buffer length as > sanity > > > > checks. The `.Call()` passes the pointer and length to the following > C > > > > function: > > > > > > > > > > > > > > > > ----------------------------------------------------------------------------------------------------------------- > > > > > > > > SEXP pass_arrow_record_batch_wrapper(SEXP ptr, SEXP buffer_len){ > > > > void *stream_ptr = R_ExternalPtrAddr(ptr); > > > > int stream_len = Rf_asInteger(buffer_len); > > > > Rprintf("C says: Pointer(%p); Buffer Length(%i)\n", stream_ptr, > > > > stream_len); > > > > pass_record_batch_pointer(stream_ptr, stream_len); > > > > Rprintf("Back in C\n"); > > > > > > > > return R_NilValue; > > > > } > > > > > > > > > > > > > > > > ----------------------------------------------------------------------------------------------------------------- > > > > > > > > The first `Rprintf()` prints the same pointer address and buffer > length > > > as > > > > the R code, so that seems good. `pass_record_batch_pointer()` is the > Rust > > > > function shown below: > > > > > > > > > > > > > > > > ----------------------------------------------------------------------------------------------------------------- > > > > > > > > #[no_mangle] > > > > pub unsafe extern "C" fn pass_record_batch_pointer(rb_ptr: *const > c_void, > > > > buffer_len: c_int) { > > > > println!("Rust says: Pointer({:?}), Buffer Length({})", rb_ptr, > > > > buffer_len); > > > > let buffer_ref = std::slice::from_raw_parts(rb_ptr as *const u8, > > > > buffer_len as usize); // Unsafe > > > > println!("The buffer looks like: \n{:?}", buffer_ref); > > > > > > > > match StreamReader::try_new(buffer_ref) { > > > > Ok(_) => println!("It worked!"), > > > > Err(e) => println!("{}", e.to_string()), > > > > } > > > > } > > > > > > > > > > > > > > > > ----------------------------------------------------------------------------------------------------------------- > > > > > > > > This is where things go wrong. The pointer address and buffer length > > > still > > > > seem to be the same as the values passed from the R code and > `buffer_ref` > > > > prints just fine, but this code does not go down the happy (Ok()) > path. > > > > I've tracked the error down to a line in the > `StreamReader::try_new()` > > > > function: > > > > > > > > let mut meta_buffer = vec![0; meta_len as usize]; > > > > > > > > The `meta_len` ends up being sort of random (apparently) and is not > > > related > > > > in any way to `buffer_len`. So, my questions are: (1) Is this even > the > > > > right approach? (2) Any ideas why this fails? > > > > > > > >
