paleolimbot commented on issue #38828:
URL: https://github.com/apache/arrow/issues/38828#issuecomment-1828386637
I'll work on the PR that enables this usage more directly, but I'd like to
point out that if you don't use dictionary encoding (i.e., no factors in your
input data frame), you can serialize the schema and batches directly and send
them to the connection. This might even be faster than doing it directly from
Arrow (since Arrow C++ under the hood is just calling `writeBin()` anyway):
``` r
tmp <- tempfile()
proc <- callr::r_bg(function() {
server <- function() {
library(arrow)
while (TRUE) {
writeLines("Listening...")
con <- socketConnection(host = "localhost", port = 6011, blocking =
TRUE,
server = TRUE, open = "r+b")
socketTimeout(con, 3600)
data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
print(head(as.data.frame(data)))
}
}
server()
}, stdout = tmp)
Sys.sleep(0.5)
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()`
for more information.
# Have to pick something without factors for this to work currently:
# dictionary encoding doesn't quite work without the stream writer
rb <- arrow::record_batch(nycflights13::flights)
socketDriver <- socketConnection(host = "localhost",
port = "6011",
blocking = TRUE,
server = FALSE,
open = "w+b")
reader <- as_record_batch_reader(rb)
writeBin(reader$schema$serialize(), socketDriver)
while (!is.null(batch <- reader$read_next_batch())) {
writeBin(batch$serialize(), socketDriver)
}
# Write end-of-stream
writeBin(as.raw(c(0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00)),
socketDriver)
close(socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013 1 1 517 515 2 830 819
#> 2 2013 1 1 533 529 4 850 830
#> 3 2013 1 1 542 540 2 923 850
#> 4 2013 1 1 544 545 -1 1004 1022
#> 5 2013 1 1 554 600 -6 812 837
#> 6 2013 1 1 554 558 -4 740 728
#> arr_delay carrier flight tailnum origin dest air_time distance hour
minute
#> 1 11 UA 1545 N14228 EWR IAH 227 1400 5
15
#> 2 20 UA 1714 N24211 LGA IAH 227 1416 5
29
#> 3 33 AA 1141 N619AA JFK MIA 160 1089 5
40
#> 4 -18 B6 725 N804JB JFK BQN 183 1576 5
45
#> 5 -25 DL 461 N668DN LGA ATL 116 762 6
0
#> 6 12 UA 1696 N39463 EWR ORD 150 719 5
58
#> time_hour
#> 1 2013-01-01 05:00:00
#> 2 2013-01-01 05:00:00
#> 3 2013-01-01 05:00:00
#> 4 2013-01-01 05:00:00
#> 5 2013-01-01 06:00:00
#> 6 2013-01-01 05:00:00
#> Listening...
# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE
```
<sup>Created on 2023-11-27 with [reprex
v2.0.2](https://reprex.tidyverse.org)</sup>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]