paleolimbot commented on issue #38828:
URL: https://github.com/apache/arrow/issues/38828#issuecomment-1828386637

   I'll work on the PR that enables this usage more directly, but I'd like to 
point out that if you don't use dictionary encoding (i.e., no factors in your 
input data frame), you can serialize the schema and batches directly and send 
them to the connection. This might even be faster than doing it directly from 
Arrow (since Arrow C++ under the hood is just calling `writeBin()` anyway):
   
   ``` r
   tmp <- tempfile()
   proc <- callr::r_bg(function() {
     server <- function() {
       library(arrow)
       
       while (TRUE) {
         writeLines("Listening...")
         con <- socketConnection(host = "localhost", port = 6011, blocking = 
TRUE,
                                 server = TRUE, open = "r+b")
         socketTimeout(con, 3600)
         
         data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
         print(head(as.data.frame(data)))
         
       }
     }
     
     server()
   }, stdout = tmp)
   
   Sys.sleep(0.5)
   
   library(arrow, warn.conflicts = FALSE)
   #> Some features are not enabled in this build of Arrow. Run `arrow_info()` 
for more information.
   
   # Have to pick something without factors for this to work currently:
   # dictionary encoding doesn't quite work without the stream writer
   rb <- arrow::record_batch(nycflights13::flights)
   
   socketDriver <- socketConnection(host = "localhost", 
                                    port = "6011",
                                    blocking = TRUE,
                                    server = FALSE,
                                    open = "w+b")
   
   reader <- as_record_batch_reader(rb)
   
   writeBin(reader$schema$serialize(), socketDriver)
   while (!is.null(batch <- reader$read_next_batch())) {
     writeBin(batch$serialize(), socketDriver)
   }
   
   # Write end-of-stream
   writeBin(as.raw(c(0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00)), 
socketDriver)
   
   close(socketDriver)
   
   Sys.sleep(0.5)
   cat(brio::read_file(tmp))
   #> Listening...
   #>   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
   #> 1 2013     1   1      517            515         2      830            819
   #> 2 2013     1   1      533            529         4      850            830
   #> 3 2013     1   1      542            540         2      923            850
   #> 4 2013     1   1      544            545        -1     1004           1022
   #> 5 2013     1   1      554            600        -6      812            837
   #> 6 2013     1   1      554            558        -4      740            728
   #>   arr_delay carrier flight tailnum origin dest air_time distance hour 
minute
   #> 1        11      UA   1545  N14228    EWR  IAH      227     1400    5     
15
   #> 2        20      UA   1714  N24211    LGA  IAH      227     1416    5     
29
   #> 3        33      AA   1141  N619AA    JFK  MIA      160     1089    5     
40
   #> 4       -18      B6    725  N804JB    JFK  BQN      183     1576    5     
45
   #> 5       -25      DL    461  N668DN    LGA  ATL      116      762    6     
 0
   #> 6        12      UA   1696  N39463    EWR  ORD      150      719    5     
58
   #>             time_hour
   #> 1 2013-01-01 05:00:00
   #> 2 2013-01-01 05:00:00
   #> 3 2013-01-01 05:00:00
   #> 4 2013-01-01 05:00:00
   #> 5 2013-01-01 06:00:00
   #> 6 2013-01-01 05:00:00
   #> Listening...
   
   # Shutdown server
   proc$interrupt()
   #> [1] TRUE
   Sys.sleep(0.5)
   proc$is_alive()
   #> [1] FALSE
   ```
   
   <sup>Created on 2023-11-27 with [reprex 
v2.0.2](https://reprex.tidyverse.org)</sup>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to