Hello all! I'm attempting to construct a RecordBatch in R (from the `iris`
dataset for example), write it to an R arrow::BufferOutputStream, pass the
pointer to the buffer and length to Rust through the C ABI, then read the
RecordBatch using the Rust arrow::ipc::reader::StreamReader. So, pretty
straightforward stuff. Unfortunately, I've hit a roadblock. I've tried to
turn this into a MRE as follows:

-----------------------------------------------------------------------------------------------------------------

The R code:

#' @param df
#' @export
#' @rdname rustbind
#' @examples pass_df(mtcars)
#' @useDynLib rustbind pass_arrow_record_batch_wrapper
pass_df <- function(df = iris) {
  rb <- arrow::record_batch(as.data.frame(df))
  output_stream <- arrow::BufferOutputStream$create(initial_capacity = 8192)
  writer <- arrow::RecordBatchStreamWriter$create(output_stream, rb$schema)
  writer$write_batch(rb)
  writer$close()
  buffer <- output_stream$finish()
  output_stream$close()

  # input_stream <- arrow::BufferReader$create(buffer)
  # reader <- arrow::RecordBatchStreamReader$create(input_stream)
  # df_from_stream <- reader$read_table()

  print("From R:")
  print(buffer$pointer())
  print(glue::glue("Buffer Length({buffer$size})\n"))
  .Call(pass_arrow_record_batch_wrapper, buffer$pointer(), buffer$size);
}

-----------------------------------------------------------------------------------------------------------------

The three commented lines seem to let me read back the RecordBatch in R,
which leads me to believe the RecordBatch is being properly written to
`buffer`. I'm printing the pointer address and buffer length as sanity
checks. The `.Call()` passes the pointer and length to the following C
function:

-----------------------------------------------------------------------------------------------------------------

SEXP pass_arrow_record_batch_wrapper(SEXP ptr, SEXP buffer_len){
  void *stream_ptr = R_ExternalPtrAddr(ptr);
  int stream_len = Rf_asInteger(buffer_len);
  Rprintf("C says: Pointer(%p); Buffer Length(%i)\n", stream_ptr,
stream_len);
  pass_record_batch_pointer(stream_ptr, stream_len);
  Rprintf("Back in C\n");

  return R_NilValue;
}

-----------------------------------------------------------------------------------------------------------------

The first `Rprintf()` prints the same pointer address and buffer length as
the R code, so that seems good. `pass_record_batch_pointer()` is the Rust
function shown below:

-----------------------------------------------------------------------------------------------------------------

#[no_mangle]
pub unsafe extern "C" fn pass_record_batch_pointer(rb_ptr: *const c_void,
buffer_len: c_int) {
    println!("Rust says: Pointer({:?}), Buffer Length({})", rb_ptr,
buffer_len);
    let buffer_ref = std::slice::from_raw_parts(rb_ptr as *const u8,
buffer_len as usize); // Unsafe
    println!("The buffer looks like: \n{:?}", buffer_ref);

    match StreamReader::try_new(buffer_ref) {
        Ok(_) => println!("It worked!"),
        Err(e) => println!("{}", e.to_string()),
    }
}

-----------------------------------------------------------------------------------------------------------------

This is where things go wrong. The pointer address and buffer length still
seem to be the same as the values passed from the R code and `buffer_ref`
prints just fine, but this code does not go down the happy (Ok()) path.
I've tracked the error down to a line in the `StreamReader::try_new()`
function:

let mut meta_buffer = vec![0; meta_len as usize];

The `meta_len` ends up being sort of random (apparently) and is not related
in any way to `buffer_len`. So, my questions are: (1) Is this even the
right approach? (2) Any ideas why this fails?

Reply via email to