[ 
https://issues.apache.org/jira/browse/ARROW-15405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dewey Dunnington updated ARROW-15405:
-------------------------------------
    Description: 
After ARROW-15040 we will be able to use a RecordBatchReader as input to a CSV 
file writer. It would be similarly helpful to be able to use a 
RecordBatchReader as input to write Parquet, IPC, and Feather files. The 
RecordBatchReader is important because it can be imported from 
C/Python/database results (e.g., DuckDB).

There is a workaround (at least for Parquet), but it's quite verbose:

{code:R}
tf <- tempfile(fileext = ".parquet")
df <- data.frame(a = 1:1e6)
arrow::write_parquet(df, tf)

dbdir <- ":memory:"
sink <- tempfile(fileext = ".parquet")

con <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir))
res <- DBI::dbSendQuery(con, glue::glue_sql("SELECT * FROM { tf }", .con = 
con), arrow = TRUE)

reader <- duckdb::duckdb_fetch_record_batch(res)

# write_parquet() doesn't *quite* support a record batch reader yet
# and we want to stream to the writer to support possibly
# bigger-than-memory results
fs <- arrow::LocalFileSystem$create()
stream <- fs$OpenOutputStream(sink)

chunk_size <- asNamespace("arrow")$calculate_chunk_size(1e6, 1)
batch <- arrow::Table$create(reader$read_next_batch())
writer <- arrow::ParquetFileWriter$create(
  reader$schema,
  stream,
  properties = arrow::ParquetWriterProperties$create(batch)
)
writer$WriteTable(batch, chunk_size = chunk_size)

while (!is.null(batch <- reader$read_next_batch())) {
  writer$WriteTable(arrow::Table$create(batch), chunk_size = chunk_size)
}

writer$Close()
stream$close()
DBI::dbDisconnect(con, shutdown = TRUE)

df2 <- dplyr::arrange(arrow::read_parquet(sink), a)
identical(df2$a, df$a)
#> [1] TRUE
{code}


  was:
After ARROW-14741 we can use a RecordBatchReader as input to a CSV file writer. 
It would be similarly helpful to be able to use a RecordBatchReader as input to 
write Parquet, IPC, and Feather files. The RecordBatchReader is important 
because it can be imported from C/Python/database results (e.g., DuckDB).

There is a workaround (at least for Parquet), but it's quite verbose:

{code:R}
tf <- tempfile(fileext = ".parquet")
df <- data.frame(a = 1:1e6)
arrow::write_parquet(df, tf)

dbdir <- ":memory:"
sink <- tempfile(fileext = ".parquet")

con <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir))
res <- DBI::dbSendQuery(con, glue::glue_sql("SELECT * FROM { tf }", .con = 
con), arrow = TRUE)

reader <- duckdb::duckdb_fetch_record_batch(res)

# write_parquet() doesn't *quite* support a record batch reader yet
# and we want to stream to the writer to support possibly
# bigger-than-memory results
fs <- arrow::LocalFileSystem$create()
stream <- fs$OpenOutputStream(sink)

chunk_size <- asNamespace("arrow")$calculate_chunk_size(1e6, 1)
batch <- arrow::Table$create(reader$read_next_batch())
writer <- arrow::ParquetFileWriter$create(
  reader$schema,
  stream,
  properties = arrow::ParquetWriterProperties$create(batch)
)
writer$WriteTable(batch, chunk_size = chunk_size)

while (!is.null(batch <- reader$read_next_batch())) {
  writer$WriteTable(arrow::Table$create(batch), chunk_size = chunk_size)
}

writer$Close()
stream$close()
DBI::dbDisconnect(con, shutdown = TRUE)

df2 <- dplyr::arrange(arrow::read_parquet(sink), a)
identical(df2$a, df$a)
#> [1] TRUE
{code}



> [R] Allow all file writers to write from a RecordBatchReader
> ------------------------------------------------------------
>
>                 Key: ARROW-15405
>                 URL: https://issues.apache.org/jira/browse/ARROW-15405
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: R
>            Reporter: Dewey Dunnington
>            Priority: Major
>
> After ARROW-15040 we will be able to use a RecordBatchReader as input to a 
> CSV file writer. It would be similarly helpful to be able to use a 
> RecordBatchReader as input to write Parquet, IPC, and Feather files. The 
> RecordBatchReader is important because it can be imported from 
> C/Python/database results (e.g., DuckDB).
> There is a workaround (at least for Parquet), but it's quite verbose:
> {code:R}
> tf <- tempfile(fileext = ".parquet")
> df <- data.frame(a = 1:1e6)
> arrow::write_parquet(df, tf)
> dbdir <- ":memory:"
> sink <- tempfile(fileext = ".parquet")
> con <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir))
> res <- DBI::dbSendQuery(con, glue::glue_sql("SELECT * FROM { tf }", .con = 
> con), arrow = TRUE)
> reader <- duckdb::duckdb_fetch_record_batch(res)
> # write_parquet() doesn't *quite* support a record batch reader yet
> # and we want to stream to the writer to support possibly
> # bigger-than-memory results
> fs <- arrow::LocalFileSystem$create()
> stream <- fs$OpenOutputStream(sink)
> chunk_size <- asNamespace("arrow")$calculate_chunk_size(1e6, 1)
> batch <- arrow::Table$create(reader$read_next_batch())
> writer <- arrow::ParquetFileWriter$create(
>   reader$schema,
>   stream,
>   properties = arrow::ParquetWriterProperties$create(batch)
> )
> writer$WriteTable(batch, chunk_size = chunk_size)
> while (!is.null(batch <- reader$read_next_batch())) {
>   writer$WriteTable(arrow::Table$create(batch), chunk_size = chunk_size)
> }
> writer$Close()
> stream$close()
> DBI::dbDisconnect(con, shutdown = TRUE)
> df2 <- dplyr::arrange(arrow::read_parquet(sink), a)
> identical(df2$a, df$a)
> #> [1] TRUE
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to