This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d3ec690 ARROW-3490: [R] streaming of arrow objects to streams
d3ec690 is described below
commit d3ec69069649013229366ebe01e22f389597dc19
Author: Romain Francois <[email protected]>
AuthorDate: Thu Oct 18 10:06:08 2018 -0400
ARROW-3490: [R] streaming of arrow objects to streams
This makes `write_record_batch` and `write_table` generic with dispatch on
the stream type.
```r
write_record_batch <- function(x, stream, ...){
UseMethod("write_record_batch", stream)
}
write_table <- function(x, stream, ...) {
UseMethod("write_table", stream)
}
```
The `stream` argument can be various things for different use cases:
- an `arrow::pic::RecordBatchWriter` created either with
`record_batch_stream_writer()` or `record_batch_file_writer()`. This is the
lowest level and that calls its `$WriteBatch()` or `$WriteTable()` method
depending on what is being streamed
- an `arrow::io::OutputStream` : this first creates an
`arrow::ipc::RecordBatchStreamWriter` and streams into it. In particular this
*does not* add the bytes of arrow files.
- an `fs_path` from 📦 `fs` : this opens a
`arrow::ipc::RecordBatchFileWriter` and streams to it, so that the file gets
the ARROW1 bytes
- A `character`, we just assert it is of length one and then call the
`fs_path` method
- A `raw()` which is just used for its type, in that case we stream into a
byte buffer and returns it as a raw vector
Some examples:
``` r
library(arrow)
tbl <- tibble::tibble(
int = 1:10, dbl = as.numeric(1:10),
lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
chr = letters[1:10]
)
batch <- record_batch(tbl)
tf <- tempfile()
# stream the batch to the file
write_record_batch(batch, tf)
# same
write_record_batch(batch, fs::path_abs(tf))
# to an InputStream
file_stream <- file_output_stream(tf)
write_record_batch(batch, file_stream)
file_stream$Close()
# to a RecordBatchFileWriter
file_stream <- file_output_stream(tf)
file_writer <- record_batch_file_writer(file_stream, batch$schema())
write_record_batch(batch, file_writer)
file_writer$Close()
file_stream$Close()
# get the bytes directly
write_record_batch(batch, raw())
#> [1] 04 01 00 00 10 00 00 00 00 00 0a 00 0c 00 06 00 05 00 08 00 0a 00
00
#> [24] 00 00 01 03 00 0c 00 00 00 08 00 08 00 00 00 04 00 08 00 00 00 04
00
#> [47] 00 00 04 00 00 00 9c 00 00 00 58 00 00 00 2c 00 00 00 04 00 00 00
84
#> [70] ff ff ff 00 00 01 05 14 00 00 00 0c 00 00 00 04 00 00 00 00 00 00
00
#> [93] dc ff ff ff 03 00 00 00 63 68 72 00 a8 ff ff ff 00 00 01 06 18 00
00
#> [116] 00 10 00 00 00 04 00 00 00 00 00 00 00 04 00 04 00 04 00 00 00 03
00
#> [139] 00 00 6c 67 6c 00 d0 ff ff ff 00 00 01 03 20 00 00 00 14 00 00 00
04
#> [162] 00 00 00 00 00 00 00 00 00 06 00 08 00 06 00 06 00 00 00 00 00 02
00
#> [185] 03 00 00 00 64 62 6c 00 10 00 14 00 08 00 06 00 07 00 0c 00 00 00
10
#> [208] 00 10 00 00 00 00 00 01 02 24 00 00 00 14 00 00 00 04 00 00 00 00
00
#> [231] 00 00 08 00 0c 00 08 00 07 00 08 00 00 00 00 00 00 01 20 00 00 00
03
#> [254] 00 00 00 69 6e 74 00 00 00 00 00 2c 01 00 00 14 00 00 00 00 00 00
00
#> [277] 0c 00 16 00 06 00 05 00 08 00 0c 00 0c 00 00 00 00 03 03 00 18 00
00
#> [300] 00 c8 00 00 00 00 00 00 00 00 00 0a 00 18 00 0c 00 04 00 08 00 0a
00
#> [323] 00 00 ac 00 00 00 10 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00
09
#> [346] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00
#> [369] 00 00 00 00 28 00 00 00 00 00 00 00 28 00 00 00 00 00 00 00 00 00
00
#> [392] 00 00 00 00 00 28 00 00 00 00 00 00 00 50 00 00 00 00 00 00 00 78
00
#> [415] 00 00 00 00 00 00 08 00 00 00 00 00 00 00 80 00 00 00 00 00 00 00
08
#> [438] 00 00 00 00 00 00 00 88 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00
#> [461] 88 00 00 00 00 00 00 00 30 00 00 00 00 00 00 00 b8 00 00 00 00 00
00
#> [484] 00 10 00 00 00 00 00 00 00 00 00 00 00 04 00 00 00 0a 00 00 00 00
00
#> [507] 00 00 00 00 00 00 00 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00
00
#> [530] 00 00 00 0a 00 00 00 00 00 00 00 03 00 00 00 00 00 00 00 0a 00 00
00
#> [553] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 01 00 00 00 02 00
00
#> [576] 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 00 08
00
#> [599] 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 00 00 f0 3f 00 00 00 00
00
#> [622] 00 00 40 00 00 00 00 00 00 08 40 00 00 00 00 00 00 10 40 00 00 00
00
#> [645] 00 00 14 40 00 00 00 00 00 00 18 40 00 00 00 00 00 00 1c 40 00 00
00
#> [668] 00 00 00 20 40 00 00 00 00 00 00 22 40 00 00 00 00 00 00 24 40 6b
03
#> [691] 00 00 00 00 00 00 22 01 00 00 00 00 00 00 00 00 00 00 01 00 00 00
02
#> [714] 00 00 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00
00
#> [737] 08 00 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 61 62 63 64 65 66
67
#> [760] 68 69 6a 00 00 00 00 00 00 00 00 00 00
```
Created on 2018-10-12 by the [reprex package](https://reprex.tidyverse.org)
(v0.2.1.9000)
Author: Romain Francois <[email protected]>
Closes #2749 from romainfrancois/ARROW-3490/stream-2 and squashes the
following commits:
ce4ec06a4 <Romain Francois> type promotion for types that do not exist in R
338f75f0b <Romain Francois> More flexible read_table
5cb8dbd65 <Romain Francois> more flexible read_record_batch with various
dispatch
072b7f0f1 <Romain Francois> + BufferOutputStream
f301f1e52 <Romain Francois> :rewind: to write_record_batch, write_table and
write_arrow
a17b37582 <Romain Francois> Trying less double dispatch
9b9a6b85b <Romain Francois> roxygen
2ae2ab386 <Romain Francois> - to_file and to_stream - write_arrow +
stream.data.frame
8d0e581ba <Romain Francois> stream.arrow::Table methods
e1f62cc07 <Romain Francois> R6 arrrow::io::FixedSizeBufferWriter
80ea2b75f <Romain Francois> R6 arrow::io::MockOutputSream
a93933a0c <Romain Francois> +close_on_exit, local_tempfile
ac20df3b6 <Romain Francois> + stream
---
r/DESCRIPTION | 9 +-
r/NAMESPACE | 39 +++-
r/R/R6.R | 4 +
r/R/RcppExports.R | 132 +++++++++---
r/R/RecordBatch.R | 46 +----
r/R/RecordBatchReader.R | 188 +++++++++++++++++
r/R/RecordBatchWriter.R | 191 ++++++++++++++++++
r/R/Table.R | 53 +----
r/R/buffer.R | 2 +
r/R/enums.R | 2 -
r/R/io.R | 65 ++++++
r/R/on_exit.R | 28 +++
r/data-raw/test.R | 6 +-
r/man/io.Rd | 16 ++
r/man/read_record_batch.Rd | 6 +-
r/man/read_table.Rd | 2 +-
r/man/record_batch_file_reader.Rd | 14 ++
r/man/record_batch_file_writer.Rd | 19 ++
r/man/record_batch_stream_reader.Rd | 14 ++
r/man/record_batch_stream_writer.Rd | 16 ++
r/man/write_arrow.Rd | 14 +-
r/man/write_record_batch.Rd | 18 ++
r/man/write_table.Rd | 18 ++
r/src/DataType.cpp | 3 +
r/src/RcppExports.cpp | 389 +++++++++++++++++++++++++++---------
r/src/RecordBatch.cpp | 60 ------
r/src/RecordBatchReader.cpp | 104 ++++++++++
r/src/RecordBatchWriter.cpp | 58 ++++++
r/src/Table.cpp | 77 -------
r/src/array.cpp | 61 +++++-
r/src/arrow_types.h | 9 +-
r/src/io.cpp | 77 +++++++
r/tests/testthat/test-RecordBatch.R | 38 ++--
r/tests/testthat/test-Table.R | 20 +-
r/tests/testthat/test-read-write.R | 8 +-
35 files changed, 1392 insertions(+), 414 deletions(-)
diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index ceab62a..66fc42a 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -24,10 +24,12 @@ Imports:
vctrs (>= 0.0.0.9000),
fs,
tibble,
- crayon
+ crayon,
+ withr
Remotes:
r-lib/vctrs,
- RcppCore/Rcpp
+ RcppCore/Rcpp,
+ romainfrancois/withr@bug-79/defer
Roxygen: list(markdown = TRUE)
RoxygenNote: 6.1.0.9000
Suggests:
@@ -43,6 +45,8 @@ Collate:
'List.R'
'RcppExports.R'
'RecordBatch.R'
+ 'RecordBatchReader.R'
+ 'RecordBatchWriter.R'
'Schema.R'
'Struct.R'
'Table.R'
@@ -51,5 +55,6 @@ Collate:
'dictionary.R'
'io.R'
'memory_pool.R'
+ 'on_exit.R'
'reexports-tibble.R'
'zzz.R'
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 649efe7..b9967a3 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -13,19 +13,43 @@ S3method(buffer,numeric)
S3method(buffer,raw)
S3method(buffer_reader,"arrow::Buffer")
S3method(buffer_reader,default)
+S3method(fixed_size_buffer_writer,"arrow::Buffer")
+S3method(fixed_size_buffer_writer,default)
S3method(length,"arrow::Array")
S3method(names,"arrow::RecordBatch")
S3method(print,"arrow-enum")
S3method(read_record_batch,"arrow::io::BufferReader")
S3method(read_record_batch,"arrow::io::RandomAccessFile")
+S3method(read_record_batch,"arrow::ipc::RecordBatchFileReader")
+S3method(read_record_batch,"arrow::ipc::RecordBatchStreamReader")
S3method(read_record_batch,character)
S3method(read_record_batch,fs_path)
S3method(read_record_batch,raw)
S3method(read_table,"arrow::io::BufferReader")
S3method(read_table,"arrow::io::RandomAccessFile")
+S3method(read_table,"arrow::ipc::RecordBatchFileReader")
+S3method(read_table,"arrow::ipc::RecordBatchStreamReader")
S3method(read_table,character)
S3method(read_table,fs_path)
S3method(read_table,raw)
+S3method(record_batch_file_reader,"arrow::io::RandomAccessFile")
+S3method(record_batch_file_reader,character)
+S3method(record_batch_file_reader,fs_path)
+S3method(record_batch_stream_reader,"arrow::io::InputStream")
+S3method(record_batch_stream_reader,raw)
+S3method(write_arrow,"arrow::RecordBatch")
+S3method(write_arrow,"arrow::Table")
+S3method(write_arrow,data.frame)
+S3method(write_record_batch,"arrow::io::OutputStream")
+S3method(write_record_batch,"arrow::ipc::RecordBatchWriter")
+S3method(write_record_batch,character)
+S3method(write_record_batch,fs_path)
+S3method(write_record_batch,raw)
+S3method(write_table,"arrow::io::OutputStream")
+S3method(write_table,"arrow::ipc::RecordBatchWriter")
+S3method(write_table,character)
+S3method(write_table,fs_path)
+S3method(write_table,raw)
export(DateUnit)
export(FileMode)
export(StatusCode)
@@ -35,6 +59,7 @@ export(array)
export(as_tibble)
export(boolean)
export(buffer)
+export(buffer_output_stream)
export(buffer_reader)
export(chunked_array)
export(date32)
@@ -42,6 +67,8 @@ export(date64)
export(decimal)
export(dictionary)
export(file_open)
+export(file_output_stream)
+export(fixed_size_buffer_writer)
export(float16)
export(float32)
export(float64)
@@ -52,11 +79,16 @@ export(int8)
export(list_of)
export(mmap_create)
export(mmap_open)
+export(mock_output_stream)
export(null)
export(read_arrow)
export(read_record_batch)
export(read_table)
export(record_batch)
+export(record_batch_file_reader)
+export(record_batch_file_writer)
+export(record_batch_stream_reader)
+export(record_batch_stream_writer)
export(schema)
export(struct)
export(table)
@@ -69,17 +101,16 @@ export(uint64)
export(uint8)
export(utf8)
export(write_arrow)
+export(write_record_batch)
+export(write_table)
importFrom(R6,R6Class)
importFrom(Rcpp,sourceCpp)
importFrom(assertthat,assert_that)
importFrom(glue,glue)
importFrom(purrr,map)
importFrom(purrr,map2)
-importFrom(purrr,map_chr)
importFrom(purrr,map_int)
importFrom(rlang,dots_n)
-importFrom(rlang,quo_name)
-importFrom(rlang,seq2)
-importFrom(rlang,set_names)
importFrom(tibble,as_tibble)
+importFrom(withr,defer_parent)
useDynLib(arrow, .registration = TRUE)
diff --git a/r/R/R6.R b/r/R/R6.R
index 734ddc0..752ad59 100644
--- a/r/R/R6.R
+++ b/r/R/R6.R
@@ -40,6 +40,10 @@
},
pointer_address = function(){
Object__pointer_address(self$pointer())
+ },
+
+ is_null = function(){
+ Object__is_null(self)
}
)
)
diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R
index 8ca9fd6..915cd4d 100644
--- a/r/R/RcppExports.R
+++ b/r/R/RcppExports.R
@@ -333,6 +333,10 @@ Object__pointer_address <- function(obj) {
.Call(`_arrow_Object__pointer_address`, obj)
}
+Object__is_null <- function(obj) {
+ .Call(`_arrow_Object__is_null`, obj)
+}
+
DictionaryType__initialize <- function(type, array, ordered) {
.Call(`_arrow_DictionaryType__initialize`, type, array, ordered)
}
@@ -377,6 +381,10 @@ io___InputStream__Close <- function(x) {
invisible(.Call(`_arrow_io___InputStream__Close`, x))
}
+io___OutputStream__Close <- function(x) {
+ invisible(.Call(`_arrow_io___OutputStream__Close`, x))
+}
+
io___RandomAccessFile__GetSize <- function(x) {
.Call(`_arrow_io___RandomAccessFile__GetSize`, x)
}
@@ -413,6 +421,42 @@ io___BufferReader__initialize <- function(buffer) {
.Call(`_arrow_io___BufferReader__initialize`, buffer)
}
+io___FileOutputStream__Open <- function(path) {
+ .Call(`_arrow_io___FileOutputStream__Open`, path)
+}
+
+io___BufferOutputStream__Create <- function(initial_capacity) {
+ .Call(`_arrow_io___BufferOutputStream__Create`, initial_capacity)
+}
+
+io___BufferOutputStream__capacity <- function(stream) {
+ .Call(`_arrow_io___BufferOutputStream__capacity`, stream)
+}
+
+io___BufferOutputStream__Finish <- function(stream) {
+ .Call(`_arrow_io___BufferOutputStream__Finish`, stream)
+}
+
+io___BufferOutputStream__Tell <- function(stream) {
+ .Call(`_arrow_io___BufferOutputStream__Tell`, stream)
+}
+
+io___BufferOutputStream__Write <- function(stream, bytes) {
+ invisible(.Call(`_arrow_io___BufferOutputStream__Write`, stream, bytes))
+}
+
+io___MockOutputStream__initialize <- function() {
+ .Call(`_arrow_io___MockOutputStream__initialize`)
+}
+
+io___MockOutputStream__GetExtentBytesWritten <- function(stream) {
+ .Call(`_arrow_io___MockOutputStream__GetExtentBytesWritten`, stream)
+}
+
+io___FixedSizeBufferWriter__initialize <- function(buffer) {
+ .Call(`_arrow_io___FixedSizeBufferWriter__initialize`, buffer)
+}
+
MemoryPool__default <- function() {
.Call(`_arrow_MemoryPool__default`)
}
@@ -445,22 +489,6 @@ RecordBatch__to_dataframe <- function(batch) {
.Call(`_arrow_RecordBatch__to_dataframe`, batch)
}
-read_record_batch_RandomAccessFile <- function(stream) {
- .Call(`_arrow_read_record_batch_RandomAccessFile`, stream)
-}
-
-read_record_batch_BufferReader <- function(stream) {
- .Call(`_arrow_read_record_batch_BufferReader`, stream)
-}
-
-RecordBatch__to_file <- function(batch, path) {
- .Call(`_arrow_RecordBatch__to_file`, batch, path)
-}
-
-RecordBatch__to_stream <- function(batch) {
- .Call(`_arrow_RecordBatch__to_stream`, batch)
-}
-
RecordBatch__from_dataframe <- function(tbl) {
.Call(`_arrow_RecordBatch__from_dataframe`, tbl)
}
@@ -489,36 +517,76 @@ RecordBatch__Slice2 <- function(self, offset, length) {
.Call(`_arrow_RecordBatch__Slice2`, self, offset, length)
}
-Table__from_dataframe <- function(tbl) {
- .Call(`_arrow_Table__from_dataframe`, tbl)
+RecordBatchReader__schema <- function(reader) {
+ .Call(`_arrow_RecordBatchReader__schema`, reader)
}
-Table__num_columns <- function(x) {
- .Call(`_arrow_Table__num_columns`, x)
+RecordBatchReader__ReadNext <- function(reader) {
+ .Call(`_arrow_RecordBatchReader__ReadNext`, reader)
}
-Table__num_rows <- function(x) {
- .Call(`_arrow_Table__num_rows`, x)
+ipc___RecordBatchStreamReader__Open <- function(stream) {
+ .Call(`_arrow_ipc___RecordBatchStreamReader__Open`, stream)
}
-Table__schema <- function(x) {
- .Call(`_arrow_Table__schema`, x)
+ipc___RecordBatchFileReader__schema <- function(reader) {
+ .Call(`_arrow_ipc___RecordBatchFileReader__schema`, reader)
+}
+
+ipc___RecordBatchFileReader__num_record_batches <- function(reader) {
+ .Call(`_arrow_ipc___RecordBatchFileReader__num_record_batches`, reader)
+}
+
+ipc___RecordBatchFileReader__ReadRecordBatch <- function(reader, i) {
+ .Call(`_arrow_ipc___RecordBatchFileReader__ReadRecordBatch`, reader, i)
+}
+
+ipc___RecordBatchFileReader__Open <- function(file) {
+ .Call(`_arrow_ipc___RecordBatchFileReader__Open`, file)
+}
+
+Table__from_RecordBatchFileReader <- function(reader) {
+ .Call(`_arrow_Table__from_RecordBatchFileReader`, reader)
+}
+
+Table__from_RecordBatchStreamReader <- function(reader) {
+ .Call(`_arrow_Table__from_RecordBatchStreamReader`, reader)
+}
+
+ipc___RecordBatchFileWriter__Open <- function(stream, schema) {
+ .Call(`_arrow_ipc___RecordBatchFileWriter__Open`, stream, schema)
}
-Table__to_file <- function(table, path) {
- .Call(`_arrow_Table__to_file`, table, path)
+ipc___RecordBatchStreamWriter__Open <- function(stream, schema) {
+ .Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema)
}
-Table__to_stream <- function(table) {
- .Call(`_arrow_Table__to_stream`, table)
+ipc___RecordBatchWriter__WriteRecordBatch <- function(batch_writer, batch,
allow_64bit) {
+ invisible(.Call(`_arrow_ipc___RecordBatchWriter__WriteRecordBatch`,
batch_writer, batch, allow_64bit))
}
-read_table_RandomAccessFile <- function(stream) {
- .Call(`_arrow_read_table_RandomAccessFile`, stream)
+ipc___RecordBatchWriter__WriteTable <- function(batch_writer, table) {
+ invisible(.Call(`_arrow_ipc___RecordBatchWriter__WriteTable`,
batch_writer, table))
}
-read_table_BufferReader <- function(stream) {
- .Call(`_arrow_read_table_BufferReader`, stream)
+ipc___RecordBatchWriter__Close <- function(batch_writer) {
+ invisible(.Call(`_arrow_ipc___RecordBatchWriter__Close`, batch_writer))
+}
+
+Table__from_dataframe <- function(tbl) {
+ .Call(`_arrow_Table__from_dataframe`, tbl)
+}
+
+Table__num_columns <- function(x) {
+ .Call(`_arrow_Table__num_columns`, x)
+}
+
+Table__num_rows <- function(x) {
+ .Call(`_arrow_Table__num_rows`, x)
+}
+
+Table__schema <- function(x) {
+ .Call(`_arrow_Table__schema`, x)
}
Table__to_dataframe <- function(table) {
diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R
index 4e2e581..e0866f6 100644
--- a/r/R/RecordBatch.R
+++ b/r/R/RecordBatch.R
@@ -22,8 +22,6 @@
num_columns = function() RecordBatch__num_columns(self),
num_rows = function() RecordBatch__num_rows(self),
schema = function() `arrow::Schema`$new(RecordBatch__schema(self)),
- to_file = function(path) invisible(RecordBatch__to_file(self,
fs::path_abs(path))),
- to_stream = function() RecordBatch__to_stream(self),
column = function(i) `arrow::Array`$new(RecordBatch__column(self, i)),
column_name = function(i) RecordBatch__column_name(self, i),
names = function() RecordBatch__names(self),
@@ -40,7 +38,9 @@
} else {
`arrow::RecordBatch`$new(RecordBatch__Slice2(self, offset, length))
}
- }
+ },
+
+ serialize = function(output_stream, ...) write_record_batch(self,
output_stream, ...)
)
)
@@ -67,43 +67,3 @@
record_batch <- function(.data){
`arrow::RecordBatch`$new(RecordBatch__from_dataframe(.data))
}
-
-#' Read a single record batch from a stream
-#'
-#' @param stream input stream
-#'
-#' @details `stream` can be a `arrow::io::RandomAccessFile` stream as created
by [file_open()] or [mmap_open()] or a path.
-#'
-#' @export
-read_record_batch <- function(stream){
- UseMethod("read_record_batch")
-}
-
-#' @export
-read_record_batch.character <- function(stream){
- assert_that(length(stream) == 1L)
- read_record_batch(fs::path_abs(stream))
-}
-
-#' @export
-read_record_batch.fs_path <- function(stream){
- stream <- file_open(stream); on.exit(stream$Close())
- read_record_batch(stream)
-}
-
-#' @export
-`read_record_batch.arrow::io::RandomAccessFile` <- function(stream){
- `arrow::RecordBatch`$new(read_record_batch_RandomAccessFile(stream))
-}
-
-#' @export
-`read_record_batch.arrow::io::BufferReader` <- function(stream){
- `arrow::RecordBatch`$new(read_record_batch_BufferReader(stream))
-}
-
-#' @export
-read_record_batch.raw <- function(stream){
- stream <- buffer_reader(stream); on.exit(stream$Close())
- read_record_batch(stream)
-}
-
diff --git a/r/R/RecordBatchReader.R b/r/R/RecordBatchReader.R
new file mode 100644
index 0000000..b7c8b00
--- /dev/null
+++ b/r/R/RecordBatchReader.R
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include R6.R
+
+`arrow::RecordBatchReader` <- R6Class("arrow::RecordBatchReader", inherit =
`arrow::Object`,
+ public = list(
+ schema = function() `arrow::Schema`$new(RecordBatchReader__schema(self)),
+ ReadNext = function() {
+ `arrow::RecordBatch`$new(RecordBatchReader__ReadNext(self))
+ }
+ )
+)
+
+`arrow::ipc::RecordBatchStreamReader` <-
R6Class("arrow::ipc::RecordBatchStreamReader", inherit =
`arrow::RecordBatchReader`)
+
+`arrow::ipc::RecordBatchFileReader` <-
R6Class("arrow::ipc::RecordBatchFileReader", inherit = `arrow::Object`,
+ public = list(
+ schema = function()
`arrow::Schema`$new(ipc___RecordBatchFileReader__schema(self)),
+ num_record_batches = function()
ipc___RecordBatchFileReader__num_record_batches(self),
+ ReadRecordBatch = function(i)
`arrow::RecordBatch`$new(ipc___RecordBatchFileReader__ReadRecordBatch(self, i))
+ )
+)
+
+
+#' Create a `arrow::ipc::RecordBatchStreamReader` from an input stream
+#'
+#' @param stream input stream
+#' @export
+record_batch_stream_reader <- function(stream){
+ UseMethod("record_batch_stream_reader")
+}
+
+#' @export
+`record_batch_stream_reader.arrow::io::InputStream` <- function(stream) {
+
`arrow::ipc::RecordBatchStreamReader`$new(ipc___RecordBatchStreamReader__Open(stream))
+}
+
+#' @export
+`record_batch_stream_reader.raw` <- function(stream) {
+ record_batch_stream_reader(buffer_reader(stream))
+}
+
+
+#' Create an `arrow::ipc::RecordBatchFileReader` from a file
+#'
+#' @param file The file to read from
+#'
+#' @export
+record_batch_file_reader <- function(file) {
+ UseMethod("record_batch_file_reader")
+}
+
+#' @export
+`record_batch_file_reader.arrow::io::RandomAccessFile` <- function(file) {
+
`arrow::ipc::RecordBatchFileReader`$new(ipc___RecordBatchFileReader__Open(file))
+}
+
+#' @export
+`record_batch_file_reader.character` <- function(file) {
+ assert_that(length(file) == 1L)
+ record_batch_file_reader(fs::path_abs(file))
+}
+
+#' @export
+`record_batch_file_reader.fs_path` <- function(file) {
+ record_batch_file_reader(file_open(file))
+}
+
+#-------- read_record_batch
+
+#' Read a single record batch from a stream
+#'
+#' @param stream input stream
+#' @param ... additional parameters
+#'
+#' @details `stream` can be a `arrow::io::RandomAccessFile` stream as created
by [file_open()] or [mmap_open()] or a path.
+#'
+#' @export
+read_record_batch <- function(stream, ...){
+ UseMethod("read_record_batch")
+}
+
+#' @export
+read_record_batch.character <- function(stream, ...){
+ assert_that(length(stream) == 1L)
+ read_record_batch(fs::path_abs(stream))
+}
+
+#' @export
+read_record_batch.fs_path <- function(stream, ...){
+ stream <- close_on_exit(file_open(stream))
+ read_record_batch(stream)
+}
+
+#' @export
+`read_record_batch.arrow::io::RandomAccessFile` <- function(stream, ...){
+ reader <- record_batch_file_reader(stream)
+ reader$ReadRecordBatch(0)
+}
+
+#' @export
+`read_record_batch.arrow::io::BufferReader` <- function(stream, ...){
+ reader <- record_batch_stream_reader(stream)
+ reader$ReadNext()
+}
+
+#' @export
+read_record_batch.raw <- function(stream, ...){
+ stream <- close_on_exit(buffer_reader(stream))
+ read_record_batch(stream)
+}
+
+#' @export
+`read_record_batch.arrow::ipc::RecordBatchStreamReader` <- function(stream,
...) {
+ stream$ReadNext()
+}
+
+#' @export
+`read_record_batch.arrow::ipc::RecordBatchFileReader` <- function(stream, i =
0, ...) {
+ stream$ReadRecordBatch(i)
+}
+
+#--------- read_table
+
+#' Read an arrow::Table from a stream
+#'
+#' @param stream stream. Either a stream created by [file_open()] or
[mmap_open()] or a file path.
+#'
+#' @export
+read_table <- function(stream){
+ UseMethod("read_table")
+}
+
+#' @export
+read_table.character <- function(stream){
+ assert_that(length(stream) == 1L)
+ read_table(fs::path_abs(stream))
+}
+
+#' @export
+read_table.fs_path <- function(stream) {
+ stream <- close_on_exit(file_open(stream))
+ read_table(stream)
+}
+
+#' @export
+`read_table.arrow::io::RandomAccessFile` <- function(stream) {
+ reader <- record_batch_file_reader(stream)
+ read_table(reader)
+}
+
+#' @export
+`read_table.arrow::ipc::RecordBatchFileReader` <- function(stream) {
+ `arrow::Table`$new(Table__from_RecordBatchFileReader(stream))
+}
+
+#' @export
+`read_table.arrow::ipc::RecordBatchStreamReader` <- function(stream) {
+ `arrow::Table`$new(Table__from_RecordBatchStreamReader(stream))
+}
+
+#' @export
+`read_table.arrow::io::BufferReader` <- function(stream) {
+ reader <- record_batch_stream_reader(stream)
+ read_table(reader)
+}
+
+#' @export
+`read_table.raw` <- function(stream) {
+ stream <- close_on_exit(buffer_reader(stream))
+ read_table(stream)
+}
+
diff --git a/r/R/RecordBatchWriter.R b/r/R/RecordBatchWriter.R
new file mode 100644
index 0000000..f1ab29d
--- /dev/null
+++ b/r/R/RecordBatchWriter.R
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include R6.R
+
+`arrow::ipc::RecordBatchWriter` <- R6Class("arrow::ipc::RecordBatchWriter",
inherit = `arrow::Object`,
+ public = list(
+ WriteRecordBatch = function(batch, allow_64bit)
ipc___RecordBatchWriter__WriteRecordBatch(self, batch, allow_64bit),
+ WriteTable = function(table) ipc___RecordBatchWriter__WriteTable(self,
table),
+ Close = function() ipc___RecordBatchWriter__Close(self)
+ )
+)
+
+`arrow::ipc::RecordBatchStreamWriter` <-
R6Class("arrow::ipc::RecordBatchStreamWriter", inherit =
`arrow::ipc::RecordBatchWriter`)
+`arrow::ipc::RecordBatchFileWriter` <-
R6Class("arrow::ipc::RecordBatchFileWriter", inherit =
`arrow::ipc::RecordBatchStreamWriter`)
+
+#' Create a record batch file writer from a stream
+#'
+#' @param stream a stream
+#' @param schema the schema of the batches
+#'
+#' @return an `arrow::ipc::RecordBatchWriter` object
+#'
+#' @export
+record_batch_file_writer <- function(stream, schema) {
+ assert_that(
+ inherits(stream, "arrow::io::OutputStream"),
+ inherits(schema, "arrow::Schema")
+ )
+
`arrow::ipc::RecordBatchFileWriter`$new(ipc___RecordBatchFileWriter__Open(stream,
schema))
+}
+
+#' Create a record batch stream writer
+#'
+#' @param stream a stream
+#' @param schema a schema
+#'
+#' @export
+record_batch_stream_writer <- function(stream, schema) {
+ assert_that(
+ inherits(stream, "arrow::io::OutputStream"),
+ inherits(schema, "arrow::Schema")
+ )
+
`arrow::ipc::RecordBatchStreamWriter`$new(ipc___RecordBatchStreamWriter__Open(stream,
schema))
+}
+
+#-------- write_record_batch
+
+#' write a record batch
+#'
+#' @param x a `arrow::RecordBatch`
+#' @param stream where to stream the record batch
+#' @param ... extra parameters
+#'
+#' @export
+write_record_batch <- function(x, stream, ...){
+ UseMethod("write_record_batch", stream)
+}
+
+#' @export
+`write_record_batch.arrow::io::OutputStream` <- function(x, stream, ...) {
+ stream_writer <- close_on_exit(record_batch_stream_writer(stream,
x$schema()))
+ write_record_batch(x, stream_writer)
+}
+
+#' @export
+`write_record_batch.arrow::ipc::RecordBatchWriter` <- function(x, stream,
allow_64bit = TRUE, ...){
+ stream$WriteRecordBatch(x, allow_64bit)
+}
+
+#' @export
+`write_record_batch.character` <- function(x, stream, ...) {
+ assert_that(length(stream) == 1L)
+ write_record_batch(x, fs::path_abs(stream), ...)
+}
+
+#' @export
+`write_record_batch.fs_path` <- function(x, stream, ...) {
+ assert_that(length(stream) == 1L)
+ file_stream <- close_on_exit(file_output_stream(stream))
+ file_writer <- close_on_exit(record_batch_file_writer(file_stream,
x$schema()))
+ write_record_batch(x, file_writer, ...)
+}
+
+#' @export
+`write_record_batch.raw` <- function(x, stream, ...) {
+ # how many bytes do we need
+ mock <- mock_output_stream()
+ write_record_batch(x, mock)
+ n <- mock$GetExtentBytesWritten()
+
+ bytes <- raw(n)
+ buffer <- buffer(bytes)
+ buffer_writer <- fixed_size_buffer_writer(buffer)
+ write_record_batch(x, buffer_writer)
+
+ bytes
+}
+
+#-------- stream Table
+
+#' write an arrow::Table
+#'
+#' @param x an `arrow::Table`
+#' @param stream where to stream the record batch
+#' @param ... extra parameters
+#'
+#' @export
+write_table <- function(x, stream, ...) {
+ UseMethod("write_table", stream)
+}
+
+#' @export
+`write_table.arrow::io::OutputStream` <- function(x, stream, ...) {
+ stream_writer <- close_on_exit(record_batch_stream_writer(stream,
x$schema()))
+ write_table(x, stream_writer)
+}
+
+#' @export
+`write_table.arrow::ipc::RecordBatchWriter` <- function(x, stream, ...){
+ stream$WriteTable(x)
+}
+
+#' @export
+`write_table.character` <- function(x, stream, ...) {
+ assert_that(length(stream) == 1L)
+ write_table(x, fs::path_abs(stream), ...)
+}
+
+#' @export
+`write_table.fs_path` <- function(x, stream, ...) {
+ assert_that(length(stream) == 1L)
+ file_stream <- close_on_exit(file_output_stream(stream))
+ file_writer <- close_on_exit(record_batch_file_writer(file_stream,
x$schema()))
+ write_table(x, file_writer, ...)
+}
+
+#' @export
+`write_table.raw` <- function(x, stream, ...) {
+ # how many bytes do we need
+ mock <- mock_output_stream()
+ write_table(x, mock)
+ n <- mock$GetExtentBytesWritten()
+
+ bytes <- raw(n)
+ buffer <- buffer(bytes)
+ buffer_writer <- fixed_size_buffer_writer(buffer)
+ write_table(x, buffer_writer)
+
+ bytes
+}
+
+#' Write an object to a stream
+#'
+#' @param x An object to stream
+#' @param stream A stream
+#' @param ... additional parameters
+#'
+#' @export
+write_arrow <- function(x, stream, ...){
+ UseMethod("write_arrow")
+}
+
+#' @export
+`write_arrow.arrow::RecordBatch` <- function(x, stream, ...) {
+ write_record_batch(x, stream, ...)
+}
+
+#' @export
+`write_arrow.arrow::Table` <- function(x, stream, ...) {
+ write_table(x, stream, ...)
+}
+
+#' @export
+`write_arrow.data.frame` <- function(x, stream, ...) {
+ write_record_batch(record_batch(x), stream, ...)
+}
diff --git a/r/R/Table.R b/r/R/Table.R
index 8acf30d..62011fc 100644
--- a/r/R/Table.R
+++ b/r/R/Table.R
@@ -22,9 +22,9 @@
num_columns = function() Table__num_columns(self),
num_rows = function() Table__num_rows(self),
schema = function() `arrow::Schema`$new(Table__schema(self)),
- to_file = function(path) invisible(Table__to_file(self,
fs::path_abs(path))),
- to_stream = function() Table__to_stream(self),
- column = function(i) `arrow::Column`$new(Table__column(self, i))
+ column = function(i) `arrow::Column`$new(Table__column(self, i)),
+
+ serialize = function(output_stream, ...) write_table(self, output_stream,
...)
)
)
@@ -37,53 +37,6 @@ table <- function(.data){
`arrow::Table`$new(Table__from_dataframe(.data))
}
-#' Write a tibble in a binary arrow file
-#'
-#' @param data a [tibble::tibble]
-#' @param path file path
-#'
-#' @export
-write_arrow <- function(data, path){
- table(data)$to_file(path)
-}
-
-#' Read an arrow::Table from a stream
-#'
-#' @param stream stream. Either a stream created by [file_open()] or
[mmap_open()] or a file path.
-#'
-#' @export
-read_table <- function(stream){
- UseMethod("read_table")
-}
-
-#' @export
-read_table.character <- function(stream){
- assert_that(length(stream) == 1L)
- read_table(fs::path_abs(stream))
-}
-
-#' @export
-read_table.fs_path <- function(stream) {
- stream <- file_open(stream); on.exit(stream$Close())
- read_table(stream)
-}
-
-#' @export
-`read_table.arrow::io::RandomAccessFile` <- function(stream) {
- `arrow::Table`$new(read_table_RandomAccessFile(stream))
-}
-
-#' @export
-`read_table.arrow::io::BufferReader` <- function(stream) {
- `arrow::Table`$new(read_table_BufferReader(stream))
-}
-
-#' @export
-`read_table.raw` <- function(stream) {
- stream <- buffer_reader(stream); on.exit(stream$Close())
- read_table(stream)
-}
-
#' @export
`as_tibble.arrow::Table` <- function(x, ...){
Table__to_dataframe(x)
diff --git a/r/R/buffer.R b/r/R/buffer.R
index 6389fcb..5f65721 100644
--- a/r/R/buffer.R
+++ b/r/R/buffer.R
@@ -27,6 +27,8 @@
)
)
+`arrow::MutableBuffer` <- R6Class("arrow::Buffer", inherit = `arrow::Buffer`)
+
#' Create a buffer from an R object
#'
#' @param x R object
diff --git a/r/R/enums.R b/r/R/enums.R
index 6b4ff24..657dc1a 100644
--- a/r/R/enums.R
+++ b/r/R/enums.R
@@ -20,8 +20,6 @@
NextMethod()
}
-#' @importFrom rlang seq2 quo_name set_names
-#' @importFrom purrr map_chr
enum <- function(class, ..., .list = list(...)){
structure(
.list,
diff --git a/r/R/io.R b/r/R/io.R
index 76fa1dc..1516bd3 100644
--- a/r/R/io.R
+++ b/r/R/io.R
@@ -31,6 +31,33 @@
)
)
+`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit =
`arrow::Object`)
+
+`arrow::io::OutputStream` <- R6Class("arrow::io::OutputStream", inherit =
`arrow::io::Writable`,
+ public = list(
+ Close = function() io___OutputStream__Close(self)
+ )
+)
+
+`arrow::io::FileOutputStream` <- R6Class("arrow::io::FileOutputStream",
inherit = `arrow::io::OutputStream`)
+
+`arrow::io::MockOutputStream` <- R6Class("arrow::io::MockOutputStream",
inherit = `arrow::io::OutputStream`,
+ public = list(
+ GetExtentBytesWritten = function()
io___MockOutputStream__GetExtentBytesWritten(self)
+ )
+)
+
+`arrow::io::BufferOutputStream` <- R6Class("arrow::io::BufferOutputStream",
inherit = `arrow::io::OutputStream`,
+ public = list(
+ capacity = function() io___BufferOutputStream__capacity(self),
+ Finish = function()
`arrow::Buffer`$new(io___BufferOutputStream__Finish(self)),
+ Write = function(bytes) io___BufferOutputStream__Write(self, bytes),
+ Tell = function() io___BufferOutputStream__Tell(self)
+ )
+)
+
+`arrow::io::FixedSizeBufferWriter` <-
R6Class("arrow::io::FixedSizeBufferWriter", inherit = `arrow::io::OutputStream`)
+
`arrow::io::RandomAccessFile` <- R6Class("arrow::io::RandomAccessFile",
inherit = `arrow::io::InputStream`,
public = list(
GetSize = function() io___RandomAccessFile__GetSize(self),
@@ -49,11 +76,14 @@
`arrow::io::ReadableFile` <- R6Class("arrow::io::ReadableFile", inherit =
`arrow::io::RandomAccessFile`)
`arrow::io::BufferReader` <- R6Class("arrow::io::BufferReader", inherit =
`arrow::io::RandomAccessFile`)
+
#' Create a new read/write memory mapped file of a given size
#'
#' @param path file path
#' @param size size in bytes
#' @param mode file mode (read/write/readwrite)
+#' @param buffer an `arrow::Buffer`, typically created by [buffer()]
+#' @param initial_capacity initial capacity for the buffer output stream
#'
#' @rdname io
#' @export
@@ -74,6 +104,41 @@ file_open <- `arrow::io::ReadableFile`$open <-
function(path) {
`arrow::io::ReadableFile`$new(io___ReadableFile__Open(fs::path_abs(path)))
}
+#' @rdname io
+#' @export
+file_output_stream <- function(path) {
+ `arrow::io::FileOutputStream`$new(io___FileOutputStream__Open(path))
+}
+
+#' @rdname io
+#' @export
+mock_output_stream <- function() {
+ `arrow::io::MockOutputStream`$new(io___MockOutputStream__initialize())
+}
+
+#' @rdname io
+#' @export
+buffer_output_stream <- function(initial_capacity = 0L) {
+
`arrow::io::BufferOutputStream`$new(io___BufferOutputStream__Create(initial_capacity))
+}
+
+#' @rdname io
+#' @export
+fixed_size_buffer_writer <- function(buffer){
+ UseMethod("fixed_size_buffer_writer")
+}
+
+#' @export
+fixed_size_buffer_writer.default <- function(buffer){
+ fixed_size_buffer_writer(buffer(buffer))
+}
+
+#' @export
+`fixed_size_buffer_writer.arrow::Buffer` <- function(buffer){
+ assert_that(buffer$is_mutable())
+
`arrow::io::FixedSizeBufferWriter`$new(io___FixedSizeBufferWriter__initialize(buffer))
+}
+
#' Create a `arrow::BufferReader`
#'
#' @param x R object to treat as a buffer or a buffer created by [buffer()]
diff --git a/r/R/on_exit.R b/r/R/on_exit.R
new file mode 100644
index 0000000..9387169
--- /dev/null
+++ b/r/R/on_exit.R
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @importFrom withr defer_parent
+close_on_exit <- function(x, ...){
+ defer_parent(x$Close(), ...)
+ x
+}
+
+local_tempfile <- function(...){
+ tf <- tempfile()
+ defer_parent(unlink(tf), ...)
+ tf
+}
diff --git a/r/data-raw/test.R b/r/data-raw/test.R
index 60f2607..516af58 100644
--- a/r/data-raw/test.R
+++ b/r/data-raw/test.R
@@ -48,7 +48,7 @@ arr <- array(1:3, 5:80)
arr
arr$as_vector()
-#------- read_arrow / write_arrow
+#------- read_arrow / stream
tbl <- tibble(x=1:10, y=rnorm(10))
write_arrow(tbl, "/tmp/test.arrow")
readr::write_rds(tbl, "/tmp/test.rds")
@@ -60,7 +60,7 @@ fs::file_info(c("/tmp/test.arrow", "/tmp/test.rds"))
(batch <- record_batch(tbl))
batch$num_columns()
batch$num_rows()
-batch$to_file("/tmp/test")
+write_arrow(batch, "/tmp/test")
readBin("/tmp/test", what = raw(), n = 1000)
batch$schema()
all.equal(tbl, data)
@@ -79,7 +79,7 @@ tab$schema()
tab$num_columns()
tab$num_rows()
-# read_arrow, write_arrow
+# read_arrow, stream
tbl <- tibble(x = rnorm(20), y = seq_len(20))
write_arrow(tbl, tf)
diff --git a/r/man/io.Rd b/r/man/io.Rd
index 8c572d8..74817bf 100644
--- a/r/man/io.Rd
+++ b/r/man/io.Rd
@@ -4,6 +4,10 @@
\alias{mmap_create}
\alias{mmap_open}
\alias{file_open}
+\alias{file_output_stream}
+\alias{mock_output_stream}
+\alias{buffer_output_stream}
+\alias{fixed_size_buffer_writer}
\title{Create a new read/write memory mapped file of a given size}
\usage{
mmap_create(path, size)
@@ -11,6 +15,14 @@ mmap_create(path, size)
mmap_open(path, mode = c("read", "write", "readwrite"))
file_open(path)
+
+file_output_stream(path)
+
+mock_output_stream()
+
+buffer_output_stream(initial_capacity = 0L)
+
+fixed_size_buffer_writer(buffer)
}
\arguments{
\item{path}{file path}
@@ -18,6 +30,10 @@ file_open(path)
\item{size}{size in bytes}
\item{mode}{file mode (read/write/readwrite)}
+
+\item{initial_capacity}{initial capacity for the buffer output stream}
+
+\item{buffer}{an \code{arrow::Buffer}, typically created by
\code{\link[=buffer]{buffer()}}}
}
\description{
Create a new read/write memory mapped file of a given size
diff --git a/r/man/read_record_batch.Rd b/r/man/read_record_batch.Rd
index 86e7673..4ca048f 100644
--- a/r/man/read_record_batch.Rd
+++ b/r/man/read_record_batch.Rd
@@ -1,13 +1,15 @@
% Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/RecordBatch.R
+% Please edit documentation in R/RecordBatchReader.R
\name{read_record_batch}
\alias{read_record_batch}
\title{Read a single record batch from a stream}
\usage{
-read_record_batch(stream)
+read_record_batch(stream, ...)
}
\arguments{
\item{stream}{input stream}
+
+\item{...}{additional parameters}
}
\description{
Read a single record batch from a stream
diff --git a/r/man/read_table.Rd b/r/man/read_table.Rd
index 7b34654..f851057 100644
--- a/r/man/read_table.Rd
+++ b/r/man/read_table.Rd
@@ -1,5 +1,5 @@
% Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/Table.R
+% Please edit documentation in R/RecordBatchReader.R
\name{read_table}
\alias{read_table}
\title{Read an arrow::Table from a stream}
diff --git a/r/man/record_batch_file_reader.Rd
b/r/man/record_batch_file_reader.Rd
new file mode 100644
index 0000000..b7e211d
--- /dev/null
+++ b/r/man/record_batch_file_reader.Rd
@@ -0,0 +1,14 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchReader.R
+\name{record_batch_file_reader}
+\alias{record_batch_file_reader}
+\title{Create an \code{arrow::ipc::RecordBatchFileReader} from a file}
+\usage{
+record_batch_file_reader(file)
+}
+\arguments{
+\item{file}{The file to read from}
+}
+\description{
+Create an \code{arrow::ipc::RecordBatchFileReader} from a file
+}
diff --git a/r/man/record_batch_file_writer.Rd
b/r/man/record_batch_file_writer.Rd
new file mode 100644
index 0000000..b7dcb0c
--- /dev/null
+++ b/r/man/record_batch_file_writer.Rd
@@ -0,0 +1,19 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{record_batch_file_writer}
+\alias{record_batch_file_writer}
+\title{Create a record batch file writer from a stream}
+\usage{
+record_batch_file_writer(stream, schema)
+}
+\arguments{
+\item{stream}{a stream}
+
+\item{schema}{the schema of the batches}
+}
+\value{
+an \code{arrow::ipc::RecordBatchWriter} object
+}
+\description{
+Create a record batch file writer from a stream
+}
diff --git a/r/man/record_batch_stream_reader.Rd
b/r/man/record_batch_stream_reader.Rd
new file mode 100644
index 0000000..018045f
--- /dev/null
+++ b/r/man/record_batch_stream_reader.Rd
@@ -0,0 +1,14 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchReader.R
+\name{record_batch_stream_reader}
+\alias{record_batch_stream_reader}
+\title{Create a \code{arrow::ipc::RecordBatchStreamReader} from an input
stream}
+\usage{
+record_batch_stream_reader(stream)
+}
+\arguments{
+\item{stream}{input stream}
+}
+\description{
+Create a \code{arrow::ipc::RecordBatchStreamReader} from an input stream
+}
diff --git a/r/man/record_batch_stream_writer.Rd
b/r/man/record_batch_stream_writer.Rd
new file mode 100644
index 0000000..d720d50
--- /dev/null
+++ b/r/man/record_batch_stream_writer.Rd
@@ -0,0 +1,16 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{record_batch_stream_writer}
+\alias{record_batch_stream_writer}
+\title{Create a record batch stream writer}
+\usage{
+record_batch_stream_writer(stream, schema)
+}
+\arguments{
+\item{stream}{a stream}
+
+\item{schema}{a schema}
+}
+\description{
+Create a record batch stream writer
+}
diff --git a/r/man/write_arrow.Rd b/r/man/write_arrow.Rd
index ff48d1e..42b39f1 100644
--- a/r/man/write_arrow.Rd
+++ b/r/man/write_arrow.Rd
@@ -1,16 +1,18 @@
% Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/Table.R
+% Please edit documentation in R/RecordBatchWriter.R
\name{write_arrow}
\alias{write_arrow}
-\title{Write a tibble in a binary arrow file}
+\title{Write an object to a stream}
\usage{
-write_arrow(data, path)
+write_arrow(x, stream, ...)
}
\arguments{
-\item{data}{a \link[tibble:tibble]{tibble::tibble}}
+\item{x}{An object to stream}
-\item{path}{file path}
+\item{stream}{A stream}
+
+\item{...}{additional parameters}
}
\description{
-Write a tibble in a binary arrow file
+Write an object to a stream
}
diff --git a/r/man/write_record_batch.Rd b/r/man/write_record_batch.Rd
new file mode 100644
index 0000000..afc3363
--- /dev/null
+++ b/r/man/write_record_batch.Rd
@@ -0,0 +1,18 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{write_record_batch}
+\alias{write_record_batch}
+\title{write a record batch}
+\usage{
+write_record_batch(x, stream, ...)
+}
+\arguments{
+\item{x}{a \code{arrow::RecordBatch}}
+
+\item{stream}{where to stream the record batch}
+
+\item{...}{extra parameters}
+}
+\description{
+write a record batch
+}
diff --git a/r/man/write_table.Rd b/r/man/write_table.Rd
new file mode 100644
index 0000000..a247870
--- /dev/null
+++ b/r/man/write_table.Rd
@@ -0,0 +1,18 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{write_table}
+\alias{write_table}
+\title{write an arrow::Table}
+\usage{
+write_table(x, stream, ...)
+}
+\arguments{
+\item{x}{an \code{arrow::Table}}
+
+\item{stream}{where to stream the record batch}
+
+\item{...}{extra parameters}
+}
+\description{
+write an arrow::Table
+}
diff --git a/r/src/DataType.cpp b/r/src/DataType.cpp
index ab9351b..595dee3 100644
--- a/r/src/DataType.cpp
+++ b/r/src/DataType.cpp
@@ -215,6 +215,9 @@ std::string Object__pointer_address(SEXP obj) {
}
// [[Rcpp::export]]
+bool Object__is_null(const std::shared_ptr<void>& obj) { return obj.get() ==
nullptr; }
+
+// [[Rcpp::export]]
std::shared_ptr<arrow::DataType> DictionaryType__initialize(
const std::shared_ptr<arrow::DataType>& type,
const std::shared_ptr<arrow::Array>& array, bool ordered) {
diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp
index e98a432..8b4c89a 100644
--- a/r/src/RcppExports.cpp
+++ b/r/src/RcppExports.cpp
@@ -920,6 +920,17 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
+// Object__is_null
+bool Object__is_null(const std::shared_ptr<void>& obj);
+RcppExport SEXP _arrow_Object__is_null(SEXP objSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::shared_ptr<void>& >::type
obj(objSEXP);
+ rcpp_result_gen = Rcpp::wrap(Object__is_null(obj));
+ return rcpp_result_gen;
+END_RCPP
+}
// DictionaryType__initialize
std::shared_ptr<arrow::DataType> DictionaryType__initialize(const
std::shared_ptr<arrow::DataType>& type, const std::shared_ptr<arrow::Array>&
array, bool ordered);
RcppExport SEXP _arrow_DictionaryType__initialize(SEXP typeSEXP, SEXP
arraySEXP, SEXP orderedSEXP) {
@@ -1045,6 +1056,16 @@ BEGIN_RCPP
return R_NilValue;
END_RCPP
}
+// io___OutputStream__Close
+void io___OutputStream__Close(const std::shared_ptr<arrow::io::OutputStream>&
x);
+RcppExport SEXP _arrow_io___OutputStream__Close(SEXP xSEXP) {
+BEGIN_RCPP
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::OutputStream>& >::type x(xSEXP);
+ io___OutputStream__Close(x);
+ return R_NilValue;
+END_RCPP
+}
// io___RandomAccessFile__GetSize
int64_t io___RandomAccessFile__GetSize(const
std::shared_ptr<arrow::io::RandomAccessFile>& x);
RcppExport SEXP _arrow_io___RandomAccessFile__GetSize(SEXP xSEXP) {
@@ -1146,6 +1167,104 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
+// io___FileOutputStream__Open
+std::shared_ptr<arrow::io::FileOutputStream> io___FileOutputStream__Open(const
std::string& path);
+RcppExport SEXP _arrow_io___FileOutputStream__Open(SEXP pathSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::string& >::type path(pathSEXP);
+ rcpp_result_gen = Rcpp::wrap(io___FileOutputStream__Open(path));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Create
+std::shared_ptr<arrow::io::BufferOutputStream>
io___BufferOutputStream__Create(int64_t initial_capacity);
+RcppExport SEXP _arrow_io___BufferOutputStream__Create(SEXP
initial_capacitySEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< int64_t >::type
initial_capacity(initial_capacitySEXP);
+ rcpp_result_gen =
Rcpp::wrap(io___BufferOutputStream__Create(initial_capacity));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__capacity
+int64_t io___BufferOutputStream__capacity(const
std::shared_ptr<arrow::io::BufferOutputStream>& stream);
+RcppExport SEXP _arrow_io___BufferOutputStream__capacity(SEXP streamSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+ rcpp_result_gen = Rcpp::wrap(io___BufferOutputStream__capacity(stream));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Finish
+std::shared_ptr<arrow::Buffer> io___BufferOutputStream__Finish(const
std::shared_ptr<arrow::io::BufferOutputStream>& stream);
+RcppExport SEXP _arrow_io___BufferOutputStream__Finish(SEXP streamSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+ rcpp_result_gen = Rcpp::wrap(io___BufferOutputStream__Finish(stream));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Tell
+int64_t io___BufferOutputStream__Tell(const
std::shared_ptr<arrow::io::BufferOutputStream>& stream);
+RcppExport SEXP _arrow_io___BufferOutputStream__Tell(SEXP streamSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+ rcpp_result_gen = Rcpp::wrap(io___BufferOutputStream__Tell(stream));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Write
+void io___BufferOutputStream__Write(const
std::shared_ptr<arrow::io::BufferOutputStream>& stream, RawVector_ bytes);
+RcppExport SEXP _arrow_io___BufferOutputStream__Write(SEXP streamSEXP, SEXP
bytesSEXP) {
+BEGIN_RCPP
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+ Rcpp::traits::input_parameter< RawVector_ >::type bytes(bytesSEXP);
+ io___BufferOutputStream__Write(stream, bytes);
+ return R_NilValue;
+END_RCPP
+}
+// io___MockOutputStream__initialize
+std::shared_ptr<arrow::io::MockOutputStream>
io___MockOutputStream__initialize();
+RcppExport SEXP _arrow_io___MockOutputStream__initialize() {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ rcpp_result_gen = Rcpp::wrap(io___MockOutputStream__initialize());
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___MockOutputStream__GetExtentBytesWritten
+int64_t io___MockOutputStream__GetExtentBytesWritten(const
std::shared_ptr<arrow::io::MockOutputStream>& stream);
+RcppExport SEXP _arrow_io___MockOutputStream__GetExtentBytesWritten(SEXP
streamSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::MockOutputStream>& >::type stream(streamSEXP);
+ rcpp_result_gen =
Rcpp::wrap(io___MockOutputStream__GetExtentBytesWritten(stream));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___FixedSizeBufferWriter__initialize
+std::shared_ptr<arrow::io::FixedSizeBufferWriter>
io___FixedSizeBufferWriter__initialize(const std::shared_ptr<arrow::Buffer>&
buffer);
+RcppExport SEXP _arrow_io___FixedSizeBufferWriter__initialize(SEXP bufferSEXP)
{
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Buffer>&
>::type buffer(bufferSEXP);
+ rcpp_result_gen =
Rcpp::wrap(io___FixedSizeBufferWriter__initialize(buffer));
+ return rcpp_result_gen;
+END_RCPP
+}
// MemoryPool__default
std::shared_ptr<arrow::MemoryPool> MemoryPool__default();
RcppExport SEXP _arrow_MemoryPool__default() {
@@ -1234,51 +1353,6 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
-// read_record_batch_RandomAccessFile
-std::shared_ptr<arrow::RecordBatch> read_record_batch_RandomAccessFile(const
std::shared_ptr<arrow::io::RandomAccessFile>& stream);
-RcppExport SEXP _arrow_read_record_batch_RandomAccessFile(SEXP streamSEXP) {
-BEGIN_RCPP
- Rcpp::RObject rcpp_result_gen;
- Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::RandomAccessFile>& >::type stream(streamSEXP);
- rcpp_result_gen = Rcpp::wrap(read_record_batch_RandomAccessFile(stream));
- return rcpp_result_gen;
-END_RCPP
-}
-// read_record_batch_BufferReader
-std::shared_ptr<arrow::RecordBatch> read_record_batch_BufferReader(const
std::shared_ptr<arrow::io::BufferReader>& stream);
-RcppExport SEXP _arrow_read_record_batch_BufferReader(SEXP streamSEXP) {
-BEGIN_RCPP
- Rcpp::RObject rcpp_result_gen;
- Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::BufferReader>& >::type stream(streamSEXP);
- rcpp_result_gen = Rcpp::wrap(read_record_batch_BufferReader(stream));
- return rcpp_result_gen;
-END_RCPP
-}
-// RecordBatch__to_file
-int RecordBatch__to_file(const std::shared_ptr<arrow::RecordBatch>& batch,
std::string path);
-RcppExport SEXP _arrow_RecordBatch__to_file(SEXP batchSEXP, SEXP pathSEXP) {
-BEGIN_RCPP
- Rcpp::RObject rcpp_result_gen;
- Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>&
>::type batch(batchSEXP);
- Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
- rcpp_result_gen = Rcpp::wrap(RecordBatch__to_file(batch, path));
- return rcpp_result_gen;
-END_RCPP
-}
-// RecordBatch__to_stream
-RawVector RecordBatch__to_stream(const std::shared_ptr<arrow::RecordBatch>&
batch);
-RcppExport SEXP _arrow_RecordBatch__to_stream(SEXP batchSEXP) {
-BEGIN_RCPP
- Rcpp::RObject rcpp_result_gen;
- Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>&
>::type batch(batchSEXP);
- rcpp_result_gen = Rcpp::wrap(RecordBatch__to_stream(batch));
- return rcpp_result_gen;
-END_RCPP
-}
// RecordBatch__from_dataframe
std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl);
RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) {
@@ -1362,92 +1436,204 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
-// Table__from_dataframe
-std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl);
-RcppExport SEXP _arrow_Table__from_dataframe(SEXP tblSEXP) {
+// RecordBatchReader__schema
+std::shared_ptr<arrow::Schema> RecordBatchReader__schema(const
std::shared_ptr<arrow::RecordBatchReader>& reader);
+RcppExport SEXP _arrow_RecordBatchReader__schema(SEXP readerSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< DataFrame >::type tbl(tblSEXP);
- rcpp_result_gen = Rcpp::wrap(Table__from_dataframe(tbl));
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::RecordBatchReader>& >::type reader(readerSEXP);
+ rcpp_result_gen = Rcpp::wrap(RecordBatchReader__schema(reader));
return rcpp_result_gen;
END_RCPP
}
-// Table__num_columns
-int Table__num_columns(const std::shared_ptr<arrow::Table>& x);
-RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) {
+// RecordBatchReader__ReadNext
+std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(const
std::shared_ptr<arrow::RecordBatchReader>& reader);
+RcppExport SEXP _arrow_RecordBatchReader__ReadNext(SEXP readerSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type x(xSEXP);
- rcpp_result_gen = Rcpp::wrap(Table__num_columns(x));
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::RecordBatchReader>& >::type reader(readerSEXP);
+ rcpp_result_gen = Rcpp::wrap(RecordBatchReader__ReadNext(reader));
return rcpp_result_gen;
END_RCPP
}
-// Table__num_rows
-int Table__num_rows(const std::shared_ptr<arrow::Table>& x);
-RcppExport SEXP _arrow_Table__num_rows(SEXP xSEXP) {
+// ipc___RecordBatchStreamReader__Open
+std::shared_ptr<arrow::RecordBatchReader>
ipc___RecordBatchStreamReader__Open(const
std::shared_ptr<arrow::io::InputStream>& stream);
+RcppExport SEXP _arrow_ipc___RecordBatchStreamReader__Open(SEXP streamSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type x(xSEXP);
- rcpp_result_gen = Rcpp::wrap(Table__num_rows(x));
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::InputStream>& >::type stream(streamSEXP);
+ rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchStreamReader__Open(stream));
return rcpp_result_gen;
END_RCPP
}
-// Table__schema
-std::shared_ptr<arrow::Schema> Table__schema(const
std::shared_ptr<arrow::Table>& x);
-RcppExport SEXP _arrow_Table__schema(SEXP xSEXP) {
+// ipc___RecordBatchFileReader__schema
+std::shared_ptr<arrow::Schema> ipc___RecordBatchFileReader__schema(const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__schema(SEXP readerSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type x(xSEXP);
- rcpp_result_gen = Rcpp::wrap(Table__schema(x));
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+ rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchFileReader__schema(reader));
return rcpp_result_gen;
END_RCPP
}
-// Table__to_file
-int Table__to_file(const std::shared_ptr<arrow::Table>& table, std::string
path);
-RcppExport SEXP _arrow_Table__to_file(SEXP tableSEXP, SEXP pathSEXP) {
+// ipc___RecordBatchFileReader__num_record_batches
+int ipc___RecordBatchFileReader__num_record_batches(const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__num_record_batches(SEXP
readerSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type table(tableSEXP);
- Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
- rcpp_result_gen = Rcpp::wrap(Table__to_file(table, path));
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+ rcpp_result_gen =
Rcpp::wrap(ipc___RecordBatchFileReader__num_record_batches(reader));
+ return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchFileReader__ReadRecordBatch
+std::shared_ptr<arrow::RecordBatch>
ipc___RecordBatchFileReader__ReadRecordBatch(const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader, int i);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__ReadRecordBatch(SEXP
readerSEXP, SEXP iSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+ Rcpp::traits::input_parameter< int >::type i(iSEXP);
+ rcpp_result_gen =
Rcpp::wrap(ipc___RecordBatchFileReader__ReadRecordBatch(reader, i));
+ return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchFileReader__Open
+std::shared_ptr<arrow::ipc::RecordBatchFileReader>
ipc___RecordBatchFileReader__Open(const
std::shared_ptr<arrow::io::RandomAccessFile>& file);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__Open(SEXP fileSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::RandomAccessFile>& >::type file(fileSEXP);
+ rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchFileReader__Open(file));
+ return rcpp_result_gen;
+END_RCPP
+}
+// Table__from_RecordBatchFileReader
+std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+RcppExport SEXP _arrow_Table__from_RecordBatchFileReader(SEXP readerSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+ rcpp_result_gen = Rcpp::wrap(Table__from_RecordBatchFileReader(reader));
+ return rcpp_result_gen;
+END_RCPP
+}
+// Table__from_RecordBatchStreamReader
+std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(const
std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader);
+RcppExport SEXP _arrow_Table__from_RecordBatchStreamReader(SEXP readerSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& >::type
reader(readerSEXP);
+ rcpp_result_gen = Rcpp::wrap(Table__from_RecordBatchStreamReader(reader));
+ return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchFileWriter__Open
+std::shared_ptr<arrow::ipc::RecordBatchWriter>
ipc___RecordBatchFileWriter__Open(const
std::shared_ptr<arrow::io::OutputStream>& stream, const
std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP streamSEXP, SEXP
schemaSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::OutputStream>& >::type stream(streamSEXP);
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Schema>&
>::type schema(schemaSEXP);
+ rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchFileWriter__Open(stream,
schema));
return rcpp_result_gen;
END_RCPP
}
-// Table__to_stream
-RawVector Table__to_stream(const std::shared_ptr<arrow::Table>& table);
-RcppExport SEXP _arrow_Table__to_stream(SEXP tableSEXP) {
+// ipc___RecordBatchStreamWriter__Open
+std::shared_ptr<arrow::ipc::RecordBatchWriter>
ipc___RecordBatchStreamWriter__Open(const
std::shared_ptr<arrow::io::OutputStream>& stream, const
std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP streamSEXP,
SEXP schemaSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::OutputStream>& >::type stream(streamSEXP);
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Schema>&
>::type schema(schemaSEXP);
+ rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchStreamWriter__Open(stream,
schema));
+ return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchWriter__WriteRecordBatch
+void ipc___RecordBatchWriter__WriteRecordBatch(const
std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer, const
std::shared_ptr<arrow::RecordBatch>& batch, bool allow_64bit);
+RcppExport SEXP _arrow_ipc___RecordBatchWriter__WriteRecordBatch(SEXP
batch_writerSEXP, SEXP batchSEXP, SEXP allow_64bitSEXP) {
+BEGIN_RCPP
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchWriter>& >::type
batch_writer(batch_writerSEXP);
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>&
>::type batch(batchSEXP);
+ Rcpp::traits::input_parameter< bool >::type allow_64bit(allow_64bitSEXP);
+ ipc___RecordBatchWriter__WriteRecordBatch(batch_writer, batch,
allow_64bit);
+ return R_NilValue;
+END_RCPP
+}
+// ipc___RecordBatchWriter__WriteTable
+void ipc___RecordBatchWriter__WriteTable(const
std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer, const
std::shared_ptr<arrow::Table>& table);
+RcppExport SEXP _arrow_ipc___RecordBatchWriter__WriteTable(SEXP
batch_writerSEXP, SEXP tableSEXP) {
+BEGIN_RCPP
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchWriter>& >::type
batch_writer(batch_writerSEXP);
Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type table(tableSEXP);
- rcpp_result_gen = Rcpp::wrap(Table__to_stream(table));
+ ipc___RecordBatchWriter__WriteTable(batch_writer, table);
+ return R_NilValue;
+END_RCPP
+}
+// ipc___RecordBatchWriter__Close
+void ipc___RecordBatchWriter__Close(const
std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer);
+RcppExport SEXP _arrow_ipc___RecordBatchWriter__Close(SEXP batch_writerSEXP) {
+BEGIN_RCPP
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::ipc::RecordBatchWriter>& >::type
batch_writer(batch_writerSEXP);
+ ipc___RecordBatchWriter__Close(batch_writer);
+ return R_NilValue;
+END_RCPP
+}
+// Table__from_dataframe
+std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl);
+RcppExport SEXP _arrow_Table__from_dataframe(SEXP tblSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< DataFrame >::type tbl(tblSEXP);
+ rcpp_result_gen = Rcpp::wrap(Table__from_dataframe(tbl));
+ return rcpp_result_gen;
+END_RCPP
+}
+// Table__num_columns
+int Table__num_columns(const std::shared_ptr<arrow::Table>& x);
+RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type x(xSEXP);
+ rcpp_result_gen = Rcpp::wrap(Table__num_columns(x));
return rcpp_result_gen;
END_RCPP
}
-// read_table_RandomAccessFile
-std::shared_ptr<arrow::Table> read_table_RandomAccessFile(const
std::shared_ptr<arrow::io::RandomAccessFile>& stream);
-RcppExport SEXP _arrow_read_table_RandomAccessFile(SEXP streamSEXP) {
+// Table__num_rows
+int Table__num_rows(const std::shared_ptr<arrow::Table>& x);
+RcppExport SEXP _arrow_Table__num_rows(SEXP xSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::RandomAccessFile>& >::type stream(streamSEXP);
- rcpp_result_gen = Rcpp::wrap(read_table_RandomAccessFile(stream));
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type x(xSEXP);
+ rcpp_result_gen = Rcpp::wrap(Table__num_rows(x));
return rcpp_result_gen;
END_RCPP
}
-// read_table_BufferReader
-std::shared_ptr<arrow::Table> read_table_BufferReader(const
std::shared_ptr<arrow::io::BufferReader>& stream);
-RcppExport SEXP _arrow_read_table_BufferReader(SEXP streamSEXP) {
+// Table__schema
+std::shared_ptr<arrow::Schema> Table__schema(const
std::shared_ptr<arrow::Table>& x);
+RcppExport SEXP _arrow_Table__schema(SEXP xSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< const
std::shared_ptr<arrow::io::BufferReader>& >::type stream(streamSEXP);
- rcpp_result_gen = Rcpp::wrap(read_table_BufferReader(stream));
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>&
>::type x(xSEXP);
+ rcpp_result_gen = Rcpp::wrap(Table__schema(x));
return rcpp_result_gen;
END_RCPP
}
@@ -1559,6 +1745,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_TimestampType__timezone", (DL_FUNC)
&_arrow_TimestampType__timezone, 1},
{"_arrow_TimestampType__unit", (DL_FUNC) &_arrow_TimestampType__unit, 1},
{"_arrow_Object__pointer_address", (DL_FUNC)
&_arrow_Object__pointer_address, 1},
+ {"_arrow_Object__is_null", (DL_FUNC) &_arrow_Object__is_null, 1},
{"_arrow_DictionaryType__initialize", (DL_FUNC)
&_arrow_DictionaryType__initialize, 3},
{"_arrow_DictionaryType__index_type", (DL_FUNC)
&_arrow_DictionaryType__index_type, 1},
{"_arrow_DictionaryType__name", (DL_FUNC) &_arrow_DictionaryType__name, 1},
@@ -1570,6 +1757,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_Field__nullable", (DL_FUNC) &_arrow_Field__nullable, 1},
{"_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2},
{"_arrow_io___InputStream__Close", (DL_FUNC)
&_arrow_io___InputStream__Close, 1},
+ {"_arrow_io___OutputStream__Close", (DL_FUNC)
&_arrow_io___OutputStream__Close, 1},
{"_arrow_io___RandomAccessFile__GetSize", (DL_FUNC)
&_arrow_io___RandomAccessFile__GetSize, 1},
{"_arrow_io___RandomAccessFile__supports_zero_copy", (DL_FUNC)
&_arrow_io___RandomAccessFile__supports_zero_copy, 1},
{"_arrow_io___RandomAccessFile__Seek", (DL_FUNC)
&_arrow_io___RandomAccessFile__Seek, 2},
@@ -1579,6 +1767,15 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_io___MemoryMappedFile__Resize", (DL_FUNC)
&_arrow_io___MemoryMappedFile__Resize, 2},
{"_arrow_io___ReadableFile__Open", (DL_FUNC)
&_arrow_io___ReadableFile__Open, 1},
{"_arrow_io___BufferReader__initialize", (DL_FUNC)
&_arrow_io___BufferReader__initialize, 1},
+ {"_arrow_io___FileOutputStream__Open", (DL_FUNC)
&_arrow_io___FileOutputStream__Open, 1},
+ {"_arrow_io___BufferOutputStream__Create", (DL_FUNC)
&_arrow_io___BufferOutputStream__Create, 1},
+ {"_arrow_io___BufferOutputStream__capacity", (DL_FUNC)
&_arrow_io___BufferOutputStream__capacity, 1},
+ {"_arrow_io___BufferOutputStream__Finish", (DL_FUNC)
&_arrow_io___BufferOutputStream__Finish, 1},
+ {"_arrow_io___BufferOutputStream__Tell", (DL_FUNC)
&_arrow_io___BufferOutputStream__Tell, 1},
+ {"_arrow_io___BufferOutputStream__Write", (DL_FUNC)
&_arrow_io___BufferOutputStream__Write, 2},
+ {"_arrow_io___MockOutputStream__initialize", (DL_FUNC)
&_arrow_io___MockOutputStream__initialize, 0},
+ {"_arrow_io___MockOutputStream__GetExtentBytesWritten", (DL_FUNC)
&_arrow_io___MockOutputStream__GetExtentBytesWritten, 1},
+ {"_arrow_io___FixedSizeBufferWriter__initialize", (DL_FUNC)
&_arrow_io___FixedSizeBufferWriter__initialize, 1},
{"_arrow_MemoryPool__default", (DL_FUNC) &_arrow_MemoryPool__default, 0},
{"_arrow_MemoryPool__bytes_allocated", (DL_FUNC)
&_arrow_MemoryPool__bytes_allocated, 1},
{"_arrow_MemoryPool__max_memory", (DL_FUNC)
&_arrow_MemoryPool__max_memory, 1},
@@ -1587,10 +1784,6 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1},
{"_arrow_RecordBatch__column", (DL_FUNC) &_arrow_RecordBatch__column, 2},
{"_arrow_RecordBatch__to_dataframe", (DL_FUNC)
&_arrow_RecordBatch__to_dataframe, 1},
- {"_arrow_read_record_batch_RandomAccessFile", (DL_FUNC)
&_arrow_read_record_batch_RandomAccessFile, 1},
- {"_arrow_read_record_batch_BufferReader", (DL_FUNC)
&_arrow_read_record_batch_BufferReader, 1},
- {"_arrow_RecordBatch__to_file", (DL_FUNC) &_arrow_RecordBatch__to_file, 2},
- {"_arrow_RecordBatch__to_stream", (DL_FUNC)
&_arrow_RecordBatch__to_stream, 1},
{"_arrow_RecordBatch__from_dataframe", (DL_FUNC)
&_arrow_RecordBatch__from_dataframe, 1},
{"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2},
{"_arrow_RecordBatch__RemoveColumn", (DL_FUNC)
&_arrow_RecordBatch__RemoveColumn, 2},
@@ -1598,14 +1791,24 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_RecordBatch__names", (DL_FUNC) &_arrow_RecordBatch__names, 1},
{"_arrow_RecordBatch__Slice1", (DL_FUNC) &_arrow_RecordBatch__Slice1, 2},
{"_arrow_RecordBatch__Slice2", (DL_FUNC) &_arrow_RecordBatch__Slice2, 3},
+ {"_arrow_RecordBatchReader__schema", (DL_FUNC)
&_arrow_RecordBatchReader__schema, 1},
+ {"_arrow_RecordBatchReader__ReadNext", (DL_FUNC)
&_arrow_RecordBatchReader__ReadNext, 1},
+ {"_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC)
&_arrow_ipc___RecordBatchStreamReader__Open, 1},
+ {"_arrow_ipc___RecordBatchFileReader__schema", (DL_FUNC)
&_arrow_ipc___RecordBatchFileReader__schema, 1},
+ {"_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC)
&_arrow_ipc___RecordBatchFileReader__num_record_batches, 1},
+ {"_arrow_ipc___RecordBatchFileReader__ReadRecordBatch", (DL_FUNC)
&_arrow_ipc___RecordBatchFileReader__ReadRecordBatch, 2},
+ {"_arrow_ipc___RecordBatchFileReader__Open", (DL_FUNC)
&_arrow_ipc___RecordBatchFileReader__Open, 1},
+ {"_arrow_Table__from_RecordBatchFileReader", (DL_FUNC)
&_arrow_Table__from_RecordBatchFileReader, 1},
+ {"_arrow_Table__from_RecordBatchStreamReader", (DL_FUNC)
&_arrow_Table__from_RecordBatchStreamReader, 1},
+ {"_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC)
&_arrow_ipc___RecordBatchFileWriter__Open, 2},
+ {"_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC)
&_arrow_ipc___RecordBatchStreamWriter__Open, 2},
+ {"_arrow_ipc___RecordBatchWriter__WriteRecordBatch", (DL_FUNC)
&_arrow_ipc___RecordBatchWriter__WriteRecordBatch, 3},
+ {"_arrow_ipc___RecordBatchWriter__WriteTable", (DL_FUNC)
&_arrow_ipc___RecordBatchWriter__WriteTable, 2},
+ {"_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC)
&_arrow_ipc___RecordBatchWriter__Close, 1},
{"_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe,
1},
{"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1},
{"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1},
{"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1},
- {"_arrow_Table__to_file", (DL_FUNC) &_arrow_Table__to_file, 2},
- {"_arrow_Table__to_stream", (DL_FUNC) &_arrow_Table__to_stream, 1},
- {"_arrow_read_table_RandomAccessFile", (DL_FUNC)
&_arrow_read_table_RandomAccessFile, 1},
- {"_arrow_read_table_BufferReader", (DL_FUNC)
&_arrow_read_table_BufferReader, 1},
{"_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 1},
{"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2},
{NULL, NULL, 0}
diff --git a/r/src/RecordBatch.cpp b/r/src/RecordBatch.cpp
index cd05bc0..5428f21 100644
--- a/r/src/RecordBatch.cpp
+++ b/r/src/RecordBatch.cpp
@@ -63,66 +63,6 @@ List RecordBatch__to_dataframe(const
std::shared_ptr<arrow::RecordBatch>& batch)
}
// [[Rcpp::export]]
-std::shared_ptr<arrow::RecordBatch> read_record_batch_RandomAccessFile(
- const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
- std::shared_ptr<arrow::ipc::RecordBatchFileReader> rbf_reader;
- R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(stream, &rbf_reader));
-
- std::shared_ptr<arrow::RecordBatch> batch;
- R_ERROR_NOT_OK(rbf_reader->ReadRecordBatch(0, &batch));
-
- return batch;
-}
-
-// [[Rcpp::export]]
-std::shared_ptr<arrow::RecordBatch> read_record_batch_BufferReader(
- const std::shared_ptr<arrow::io::BufferReader>& stream) {
- std::shared_ptr<arrow::ipc::RecordBatchReader> rbf_reader;
- R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamReader::Open(stream,
&rbf_reader));
-
- std::shared_ptr<arrow::RecordBatch> batch;
- R_ERROR_NOT_OK(rbf_reader->ReadNext(&batch));
-
- return batch;
-}
-
-// [[Rcpp::export]]
-int RecordBatch__to_file(const std::shared_ptr<arrow::RecordBatch>& batch,
- std::string path) {
- std::shared_ptr<arrow::io::OutputStream> stream;
- std::shared_ptr<arrow::ipc::RecordBatchWriter> file_writer;
-
- R_ERROR_NOT_OK(arrow::io::FileOutputStream::Open(path, &stream));
- R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileWriter::Open(stream.get(),
batch->schema(),
- &file_writer));
- R_ERROR_NOT_OK(file_writer->WriteRecordBatch(*batch, true));
- R_ERROR_NOT_OK(file_writer->Close());
-
- int64_t offset;
- R_ERROR_NOT_OK(stream->Tell(&offset));
- R_ERROR_NOT_OK(stream->Close());
- return offset;
-}
-
-int64_t RecordBatch_size(const std::shared_ptr<arrow::RecordBatch>& batch) {
- io::MockOutputStream mock_sink;
- R_ERROR_NOT_OK(arrow::ipc::WriteRecordBatchStream({batch}, &mock_sink));
- return mock_sink.GetExtentBytesWritten();
-}
-
-// [[Rcpp::export]]
-RawVector RecordBatch__to_stream(const std::shared_ptr<arrow::RecordBatch>&
batch) {
- auto n = RecordBatch_size(batch);
-
- RawVector res(n);
- auto raw_buffer = std::make_shared<arrow::MutableBuffer>(res.begin(),
res.size());
- arrow::io::FixedSizeBufferWriter sink(raw_buffer);
- R_ERROR_NOT_OK(arrow::ipc::WriteRecordBatchStream({batch}, &sink));
-
- return res;
-}
-
-// [[Rcpp::export]]
std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl)
{
CharacterVector names = tbl.names();
diff --git a/r/src/RecordBatchReader.cpp b/r/src/RecordBatchReader.cpp
new file mode 100644
index 0000000..1ddc397
--- /dev/null
+++ b/r/src/RecordBatchReader.cpp
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow_types.h"
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Schema> RecordBatchReader__schema(
+ const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+ return reader->schema();
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(
+ const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+ std::shared_ptr<arrow::RecordBatch> batch;
+ R_ERROR_NOT_OK(reader->ReadNext(&batch));
+ return batch;
+}
+
+// -------- RecordBatchStreamReader
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::RecordBatchReader> ipc___RecordBatchStreamReader__Open(
+ const std::shared_ptr<arrow::io::InputStream>& stream) {
+ std::shared_ptr<arrow::RecordBatchReader> reader;
+ R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamReader::Open(stream, &reader));
+ return reader;
+}
+
+// -------- RecordBatchFileReader
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Schema> ipc___RecordBatchFileReader__schema(
+ const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+ return reader->schema();
+}
+
+// [[Rcpp::export]]
+int ipc___RecordBatchFileReader__num_record_batches(
+ const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+ return reader->num_record_batches();
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::RecordBatch>
ipc___RecordBatchFileReader__ReadRecordBatch(
+ const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader, int i) {
+ std::shared_ptr<arrow::RecordBatch> batch;
+ R_ERROR_NOT_OK(reader->ReadRecordBatch(i, &batch));
+ return batch;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::ipc::RecordBatchFileReader>
ipc___RecordBatchFileReader__Open(
+ const std::shared_ptr<arrow::io::RandomAccessFile>& file) {
+ std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
+ R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(file, &reader));
+ return reader;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(
+ const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+ int num_batches = reader->num_record_batches();
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches(num_batches);
+ for (int i = 0; i < num_batches; i++) {
+ R_ERROR_NOT_OK(reader->ReadRecordBatch(i, &batches[i]));
+ }
+
+ std::shared_ptr<arrow::Table> table;
+ R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
+
+ return table;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(
+ const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader) {
+ std::shared_ptr<arrow::RecordBatch> batch;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ while (true) {
+ R_ERROR_NOT_OK(reader->ReadNext(&batch));
+ if (!batch) break;
+ batches.push_back(batch);
+ }
+
+ std::shared_ptr<arrow::Table> table;
+ R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
+
+ return table;
+}
diff --git a/r/src/RecordBatchWriter.cpp b/r/src/RecordBatchWriter.cpp
new file mode 100644
index 0000000..7ba9257
--- /dev/null
+++ b/r/src/RecordBatchWriter.cpp
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow_types.h"
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::ipc::RecordBatchWriter>
ipc___RecordBatchFileWriter__Open(
+ const std::shared_ptr<arrow::io::OutputStream>& stream,
+ const std::shared_ptr<arrow::Schema>& schema) {
+ std::shared_ptr<arrow::ipc::RecordBatchWriter> file_writer;
+ R_ERROR_NOT_OK(
+ arrow::ipc::RecordBatchFileWriter::Open(stream.get(), schema,
&file_writer));
+ return file_writer;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::ipc::RecordBatchWriter>
ipc___RecordBatchStreamWriter__Open(
+ const std::shared_ptr<arrow::io::OutputStream>& stream,
+ const std::shared_ptr<arrow::Schema>& schema) {
+ std::shared_ptr<arrow::ipc::RecordBatchWriter> stream_writer;
+ R_ERROR_NOT_OK(
+ arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
&stream_writer));
+ return stream_writer;
+}
+
+// [[Rcpp::export]]
+void ipc___RecordBatchWriter__WriteRecordBatch(
+ const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer,
+ const std::shared_ptr<arrow::RecordBatch>& batch, bool allow_64bit) {
+ R_ERROR_NOT_OK(batch_writer->WriteRecordBatch(*batch, allow_64bit));
+}
+
+// [[Rcpp::export]]
+void ipc___RecordBatchWriter__WriteTable(
+ const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer,
+ const std::shared_ptr<arrow::Table>& table) {
+ R_ERROR_NOT_OK(batch_writer->WriteTable(*table));
+}
+
+// [[Rcpp::export]]
+void ipc___RecordBatchWriter__Close(
+ const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer) {
+ R_ERROR_NOT_OK(batch_writer->Close());
+}
diff --git a/r/src/Table.cpp b/r/src/Table.cpp
index adc3832..79d7031 100644
--- a/r/src/Table.cpp
+++ b/r/src/Table.cpp
@@ -46,83 +46,6 @@ std::shared_ptr<arrow::Schema> Table__schema(const
std::shared_ptr<arrow::Table>
}
// [[Rcpp::export]]
-int Table__to_file(const std::shared_ptr<arrow::Table>& table, std::string
path) {
- std::shared_ptr<arrow::io::OutputStream> stream;
- std::shared_ptr<arrow::ipc::RecordBatchWriter> file_writer;
-
- R_ERROR_NOT_OK(arrow::io::FileOutputStream::Open(path, &stream));
- R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileWriter::Open(stream.get(),
table->schema(),
- &file_writer));
- R_ERROR_NOT_OK(file_writer->WriteTable(*table));
- R_ERROR_NOT_OK(file_writer->Close());
-
- int64_t offset;
- R_ERROR_NOT_OK(stream->Tell(&offset));
- R_ERROR_NOT_OK(stream->Close());
- return offset;
-}
-
-// [[Rcpp::export]]
-RawVector Table__to_stream(const std::shared_ptr<arrow::Table>& table) {
- arrow::io::MockOutputStream mock_sink;
- std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
- R_ERROR_NOT_OK(
- arrow::ipc::RecordBatchStreamWriter::Open(&mock_sink, table->schema(),
&writer));
- R_ERROR_NOT_OK(writer->WriteTable(*table));
- R_ERROR_NOT_OK(writer->Close());
- auto n = mock_sink.GetExtentBytesWritten();
-
- RawVector res(no_init(n));
- auto raw_buffer = std::make_shared<arrow::MutableBuffer>(res.begin(),
res.size());
- arrow::io::FixedSizeBufferWriter sink(raw_buffer);
-
- R_ERROR_NOT_OK(
- arrow::ipc::RecordBatchStreamWriter::Open(&sink, table->schema(),
&writer));
- R_ERROR_NOT_OK(writer->WriteTable(*table));
- R_ERROR_NOT_OK(writer->Close());
-
- return res;
-}
-
-// [[Rcpp::export]]
-std::shared_ptr<arrow::Table> read_table_RandomAccessFile(
- const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
- std::shared_ptr<arrow::ipc::RecordBatchFileReader> rbf_reader;
- R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(stream, &rbf_reader));
-
- int num_batches = rbf_reader->num_record_batches();
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches(num_batches);
- for (int i = 0; i < num_batches; i++) {
- R_ERROR_NOT_OK(rbf_reader->ReadRecordBatch(i, &batches[i]));
- }
-
- std::shared_ptr<arrow::Table> table;
- R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
-
- return table;
-}
-
-// [[Rcpp::export]]
-std::shared_ptr<arrow::Table> read_table_BufferReader(
- const std::shared_ptr<arrow::io::BufferReader>& stream) {
- std::shared_ptr<arrow::ipc::RecordBatchReader> rb_reader;
- R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamReader::Open(stream,
&rb_reader));
- std::shared_ptr<arrow::RecordBatch> batch;
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- while (true) {
- R_ERROR_NOT_OK(rb_reader->ReadNext(&batch));
- if (!batch) break;
- batches.push_back(batch);
- }
-
- std::shared_ptr<arrow::Table> table;
- R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
-
- return table;
-}
-
-// [[Rcpp::export]]
List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table) {
int nc = table->num_columns();
int nr = table->num_rows();
diff --git a/r/src/array.cpp b/r/src/array.cpp
index 4f9eb6c..71bdb52 100644
--- a/r/src/array.cpp
+++ b/r/src/array.cpp
@@ -282,7 +282,6 @@ inline int64_t time_cast<double>(double value) {
template <int RTYPE>
std::shared_ptr<Array> Date64Array_From_POSIXct(SEXP x) {
- using stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
Rcpp::Vector<RTYPE> vec(x);
auto p_vec = vec.begin();
auto n = vec.size();
@@ -420,8 +419,6 @@ inline SEXP StringArray_to_Vector(const
std::shared_ptr<arrow::Array>& array) {
}
Rcpp::CharacterVector res(no_init(n));
- const auto& buffers = array->data()->buffers;
-
auto p_offset = GetValuesSafely<int32_t>(array->data(), 1, array->offset());
auto p_data = GetValuesSafely<char>(array->data(), 2, *p_offset);
@@ -593,6 +590,31 @@ SEXP Date64Array_to_Vector(const
std::shared_ptr<arrow::Array> array) {
return vec;
}
+template <int RTYPE, typename Type>
+SEXP promotion_Array_to_Vector(const std::shared_ptr<Array>& array) {
+ using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
+ using value_type = typename TypeTraits<Type>::ArrayType::value_type;
+
+ auto n = array->length();
+ auto start = reinterpret_cast<const
value_type*>(array->data()->buffers[1]->data()) +
+ array->offset();
+
+ Rcpp::Vector<RTYPE> vec(no_init(n));
+ if (array->null_count()) {
+ internal::BitmapReader bitmap_reader(array->null_bitmap()->data(),
array->offset(),
+ n);
+ for (size_t i = 0; i < n; i++, bitmap_reader.Next()) {
+ vec[i] = bitmap_reader.IsNotSet() ? Rcpp::Vector<RTYPE>::get_na()
+ : static_cast<r_stored_type>(start[i]);
+ }
+ } else {
+ std::transform(start, start + n, vec.begin(),
+ [](value_type x) { return static_cast<r_stored_type>(x); });
+ }
+
+ return vec;
+}
+
} // namespace r
} // namespace arrow
@@ -601,22 +623,51 @@ SEXP Array__as_vector(const
std::shared_ptr<arrow::Array>& array) {
using namespace arrow::r;
switch (array->type_id()) {
- case Type::BOOL:
- return BooleanArray_to_Vector(array);
+ // direct support
case Type::INT8:
return simple_Array_to_Vector<RAWSXP>(array);
case Type::INT32:
return simple_Array_to_Vector<INTSXP>(array);
case Type::DOUBLE:
return simple_Array_to_Vector<REALSXP>(array);
+
+ // need to handle 1-bit case
+ case Type::BOOL:
+ return BooleanArray_to_Vector(array);
+
+ // handle memory dense strings
case Type::STRING:
return StringArray_to_Vector(array);
case Type::DICTIONARY:
return
DictionaryArray_to_Vector(static_cast<arrow::DictionaryArray*>(array.get()));
+
case Type::DATE32:
return Date32Array_to_Vector(array);
case Type::DATE64:
return Date64Array_to_Vector(array);
+
+ // promotions to integer vector
+ case Type::UINT8:
+ return arrow::r::promotion_Array_to_Vector<INTSXP,
arrow::UInt8Type>(array);
+ case Type::INT16:
+ return arrow::r::promotion_Array_to_Vector<INTSXP,
arrow::Int16Type>(array);
+ case Type::UINT16:
+ return arrow::r::promotion_Array_to_Vector<INTSXP,
arrow::UInt16Type>(array);
+
+ // promotions to numeric vector
+ case Type::UINT32:
+ return arrow::r::promotion_Array_to_Vector<REALSXP,
arrow::UInt32Type>(array);
+ case Type::HALF_FLOAT:
+ return arrow::r::promotion_Array_to_Vector<REALSXP,
arrow::UInt32Type>(array);
+ case Type::FLOAT:
+ return arrow::r::promotion_Array_to_Vector<REALSXP,
arrow::UInt32Type>(array);
+
+ // lossy promotions to numeric vector
+ case Type::INT64:
+ return arrow::r::promotion_Array_to_Vector<REALSXP,
arrow::Int64Type>(array);
+ case Type::UINT64:
+ return arrow::r::promotion_Array_to_Vector<REALSXP,
arrow::UInt64Type>(array);
+
default:
break;
}
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 43ee41d..0d21dad 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -23,6 +23,8 @@
#include <arrow/api.h>
#include <arrow/io/file.h>
#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
#include <arrow/type.h>
#define R_ERROR_NOT_OK(s) \
@@ -104,6 +106,7 @@ using IntegerVector_ = Rcpp::Vector<INTSXP,
Rcpp::NoProtectStorage>;
using LogicalVector_ = Rcpp::Vector<LGLSXP, Rcpp::NoProtectStorage>;
using StringVector_ = Rcpp::Vector<STRSXP, Rcpp::NoProtectStorage>;
using CharacterVector_ = StringVector_;
+using RawVector_ = Rcpp::Vector<RAWSXP, Rcpp::NoProtectStorage>;
template <int RTYPE>
inline typename Rcpp::Vector<RTYPE>::stored_type default_value() {
@@ -135,11 +138,11 @@ inline const T* GetValuesSafely(const
std::shared_ptr<ArrayData>& data, int i,
}
template <int RTYPE, typename Vec = Rcpp::Vector<RTYPE>>
-class RBuffer : public Buffer {
+class RBuffer : public MutableBuffer {
public:
RBuffer(Vec vec)
- : Buffer(reinterpret_cast<const uint8_t*>(vec.begin()),
- vec.size() * sizeof(typename Vec::stored_type)),
+ : MutableBuffer(reinterpret_cast<uint8_t*>(vec.begin()),
+ vec.size() * sizeof(typename Vec::stored_type)),
vec_(vec) {}
private:
diff --git a/r/src/io.cpp b/r/src/io.cpp
index e528720..cda4065 100644
--- a/r/src/io.cpp
+++ b/r/src/io.cpp
@@ -36,6 +36,13 @@ void io___InputStream__Close(const
std::shared_ptr<arrow::io::InputStream>& x) {
R_ERROR_NOT_OK(x->Close());
}
+// ------ arrow::io::OutputStream
+
+// [[Rcpp::export]]
+void io___OutputStream__Close(const std::shared_ptr<arrow::io::OutputStream>&
x) {
+ R_ERROR_NOT_OK(x->Close());
+}
+
// ------ arrow::io::RandomAccessFile
// [[Rcpp::export]]
@@ -107,3 +114,73 @@ std::shared_ptr<arrow::io::BufferReader>
io___BufferReader__initialize(
const std::shared_ptr<arrow::Buffer>& buffer) {
return std::make_shared<arrow::io::BufferReader>(buffer);
}
+
+// ------ arrow::io::FileOutputStream
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::FileOutputStream> io___FileOutputStream__Open(
+ const std::string& path) {
+ std::shared_ptr<arrow::io::FileOutputStream> stream;
+ R_ERROR_NOT_OK(arrow::io::FileOutputStream::Open(path, &stream));
+ return stream;
+}
+
+// ------ arrow::BufferOutputStream
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::BufferOutputStream> io___BufferOutputStream__Create(
+ int64_t initial_capacity) {
+ std::shared_ptr<arrow::io::BufferOutputStream> stream;
+ R_ERROR_NOT_OK(arrow::io::BufferOutputStream::Create(
+ initial_capacity, arrow::default_memory_pool(), &stream));
+ return stream;
+}
+
+// [[Rcpp::export]]
+int64_t io___BufferOutputStream__capacity(
+ const std::shared_ptr<arrow::io::BufferOutputStream>& stream) {
+ return stream->capacity();
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Buffer> io___BufferOutputStream__Finish(
+ const std::shared_ptr<arrow::io::BufferOutputStream>& stream) {
+ std::shared_ptr<arrow::Buffer> buffer;
+ R_ERROR_NOT_OK(stream->Finish(&buffer));
+ return buffer;
+}
+
+// [[Rcpp::export]]
+int64_t io___BufferOutputStream__Tell(
+ const std::shared_ptr<arrow::io::BufferOutputStream>& stream) {
+ int64_t res;
+ R_ERROR_NOT_OK(stream->Tell(&res));
+ return res;
+}
+
+// [[Rcpp::export]]
+void io___BufferOutputStream__Write(
+ const std::shared_ptr<arrow::io::BufferOutputStream>& stream, RawVector_
bytes) {
+ R_ERROR_NOT_OK(stream->Write(bytes.begin(), bytes.size()));
+}
+
+// ------ arrow::io::MockOutputStream
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::MockOutputStream>
io___MockOutputStream__initialize() {
+ return std::make_shared<arrow::io::MockOutputStream>();
+}
+
+// [[Rcpp::export]]
+int64_t io___MockOutputStream__GetExtentBytesWritten(
+ const std::shared_ptr<arrow::io::MockOutputStream>& stream) {
+ return stream->GetExtentBytesWritten();
+}
+
+// ------ arrow::io::FixedSizeBufferWriter
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::FixedSizeBufferWriter>
io___FixedSizeBufferWriter__initialize(
+ const std::shared_ptr<arrow::Buffer>& buffer) {
+ return std::make_shared<arrow::io::FixedSizeBufferWriter>(buffer);
+}
diff --git a/r/tests/testthat/test-RecordBatch.R
b/r/tests/testthat/test-RecordBatch.R
index 0d9d0d9..c6a31bb 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -108,53 +108,49 @@ test_that("RecordBatch with 0 rows are supported", {
)
)
- tf <- tempfile(); on.exit(unlink(tf))
- batch$to_file(tf)
-
+ tf <- local_tempfile()
+ write_record_batch(batch, tf)
res <- read_record_batch(tf)
expect_equal(res, batch)
})
-test_that("RecordBatch can output stream", {
- tbl <- tibble::tibble(
- int = 1:10, dbl = as.numeric(1:10),
- lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
- chr = letters[1:10]
- )
- record <- record_batch(tbl)
- stream <- record$to_stream()
-
- expect_gt(length(stream), 0)
-})
-
-test_that("read_record_batch handles ReadableFile and MemoryMappedFile
(ARROW-3450)", {
+test_that("read_record_batch handles various streams (ARROW-3450,
ARROW-3505)", {
tbl <- tibble::tibble(
int = 1:10, dbl = as.numeric(1:10),
lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
chr = letters[1:10]
)
batch <- record_batch(tbl)
- tf <- tempfile(); on.exit(unlink(tf))
- batch$to_file(tf)
+ tf <- local_tempfile()
+ write_record_batch(batch, tf)
- bytes <- batch$to_stream()
+ bytes <- write_record_batch(batch, raw())
buf_reader <- buffer_reader(bytes)
batch1 <- read_record_batch(tf)
batch2 <- read_record_batch(fs::path_abs(tf))
- readable_file <- file_open(tf); on.exit(readable_file$Close())
+ readable_file <- close_on_exit(file_open(tf))
batch3 <- read_record_batch(readable_file)
- mmap_file <- mmap_open(tf); on.exit(mmap_file$Close())
+ mmap_file <- close_on_exit(mmap_open(tf))
batch4 <- read_record_batch(mmap_file)
batch5 <- read_record_batch(bytes)
batch6 <- read_record_batch(buf_reader)
+ stream_reader <- record_batch_stream_reader(bytes)
+ batch7 <- read_record_batch(stream_reader)
+
+ file_reader <- record_batch_file_reader(tf)
+ batch8 <- read_record_batch(file_reader)
+
expect_equal(batch, batch1)
expect_equal(batch, batch2)
expect_equal(batch, batch3)
expect_equal(batch, batch4)
expect_equal(batch, batch5)
+ expect_equal(batch, batch6)
+ expect_equal(batch, batch7)
+ expect_equal(batch, batch8)
})
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index e6b2cad..681b7a9 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -17,35 +17,43 @@
context("arrow::Table")
-test_that("read_table handles various input streams (ARROW-3450)", {
+test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)",
{
tbl <- tibble::tibble(
int = 1:10, dbl = as.numeric(1:10),
lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
chr = letters[1:10]
)
tab <- arrow::table(tbl)
- tf <- tempfile(); on.exit(unlink(tf))
- tab$to_file(tf)
+ tf <- local_tempfile()
+ write_table(tab, tf)
- bytes <- tab$to_stream()
+ bytes <- write_table(tab, raw())
buf_reader <- buffer_reader(bytes)
tab1 <- read_table(tf)
tab2 <- read_table(fs::path_abs(tf))
- readable_file <- file_open(tf); on.exit(readable_file$Close())
+ readable_file <- close_on_exit(file_open(tf))
tab3 <- read_table(readable_file)
- mmap_file <- mmap_open(tf); on.exit(mmap_file$Close())
+ mmap_file <- close_on_exit(mmap_open(tf))
tab4 <- read_table(mmap_file)
tab5 <- read_table(bytes)
tab6 <- read_table(buf_reader)
+ stream_reader <- record_batch_stream_reader(bytes)
+ tab7 <- read_table(stream_reader)
+
+ file_reader <- record_batch_file_reader(tf)
+ tab8 <- read_table(file_reader)
+
expect_equal(tab, tab1)
expect_equal(tab, tab2)
expect_equal(tab, tab3)
expect_equal(tab, tab4)
expect_equal(tab, tab5)
expect_equal(tab, tab6)
+ expect_equal(tab, tab7)
+ expect_equal(tab, tab8)
})
diff --git a/r/tests/testthat/test-read-write.R
b/r/tests/testthat/test-read-write.R
index c67fe72..bcf3922 100644
--- a/r/tests/testthat/test-read-write.R
+++ b/r/tests/testthat/test-read-write.R
@@ -84,8 +84,8 @@ test_that("arrow::table round trip", {
for( i in seq_along(chunks_raw)){
expect_equal(chunked_array_raw$chunk(i-1L), chunks_raw[[i]])
}
- tf <- tempfile(); on.exit(unlink(tf))
- write_arrow(tbl, path = tf)
+ tf <- local_tempfile()
+ write_arrow(tbl, tf)
res <- read_arrow(tf)
expect_identical(tbl, res)
@@ -114,8 +114,8 @@ test_that("arrow::table round trip handles NA in integer
and numeric", {
expect_equal(tab$column(1)$type(), float64())
expect_equal(tab$column(2)$type(), int8())
- tf <- tempfile(); on.exit(unlink(tf))
- write_arrow(tbl, path = tf)
+ tf <- local_tempfile()
+ write_arrow(tbl, tf)
res <- read_arrow(tf)
expect_identical(tbl, res)