paleolimbot commented on code in PR #13650:
URL: https://github.com/apache/arrow/pull/13650#discussion_r925075067


##########
r/R/dataset-scan.R:
##########
@@ -197,25 +199,50 @@ map_batches <- function(X, FUN, ..., .data.frame = NULL) {
   }
   FUN <- as_mapper(FUN)
   reader <- as_record_batch_reader(X)
+  dots <- rlang::list2(...)
 
-  # TODO: for future consideration
-  # * Move eval to C++ and make it a generator so it can stream, not block
-  # * Accept an output schema argument: with that, we could make this lazy 
(via collapse)
-  batch <- reader$read_next_batch()
-  res <- vector("list", 1024)
-  i <- 0L
-  while (!is.null(batch)) {
-    i <- i + 1L
-    res[[i]] <- as_record_batch(FUN(batch, ...))
+  # If no schema is supplied, we have to evaluate the first batch here
+  if (is.null(.schema)) {
     batch <- reader$read_next_batch()
-  }
+    if (is.null(batch)) {
+      abort("Can't infer schema from a RecordBatchReader with zero batches")
+    }
+
+    first_result <- as_record_batch(do.call(FUN, c(list(batch, dots))))
+    .schema <- first_result$schema
+    fun <- function() {
+      if (!is.null(first_result)) {
+        result <- first_result
+        first_result <<- NULL
+        result
+      } else {
+        batch <- reader$read_next_batch()
+        if (is.null(batch)) {
+          NULL
+        } else {
+          as_record_batch(
+            do.call(FUN, c(list(batch, dots))),

Review Comment:
   That was a really good catch (it needed to be `c(list(batch), dots)`)!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to