This is an automated email from the ASF dual-hosted git repository.

thisisnic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 47a602dbd9 GH-34437: [R] Use FetchNode and OrderByNode (#34685)
47a602dbd9 is described below

commit 47a602dbd9b7b7f7720a5e62467e3e6c61712cf3
Author: Neal Richardson <[email protected]>
AuthorDate: Tue Apr 11 12:19:49 2023 -0400

    GH-34437: [R] Use FetchNode and OrderByNode (#34685)
    
    ### Rationale for this change
    
    See also #32991. By using the new nodes, we're closer to having all dplyr 
query business happening inside the ExecPlan. Unfortunately, there are still 
two cases where we have to apply operations in R after running a query:
    
    * #34941: Taking head/tail on unordered data, which has non-deterministic 
results but that should be possible, in the case where the user wants to see a 
slice of the result, any slice
    * #34942: Implementing tail in the FetchNode or similar would enable 
removing more hacks and workarounds.
    
    Once those are resolved, we can simply further and then move to the new 
Declaration class.
    
    ### What changes are included in this PR?
    
    This removes the use of different SinkNodes and many R-specific workarounds 
to support sorting and head/tail, so *almost*
    everything we do in a query should be represented in an ExecPlan.
    
    ### Are these changes tested?
    
    Yes. This is mostly an internal refactor, but behavior changes are 
accompanied by test updates.
    
    ### Are there any user-facing changes?
    
    The `show_query()` method will print slightly different ExecPlans. In many 
cases, they will be more informative.
    
    `tail()` now actually returns the tail of the data in cases where the data 
has an implicit order (currently only in-memory tables). Previously it was 
non-deterministic (and would return the head or some other slice of the data).
    
    When printing query objects that include `summarize()` when the 
`arrow.summarize.sort = TRUE` option is set, the sorting is correctly printed.
    
    It's unclear if there should be changes in performance; running benchmarks 
would be good but it's also not clear that our benchmarks cover all affected 
scenarios.
    
    * Closes: #34437
    * Closes: #31980
    * Closes: #31982
    
    Authored-by: Neal Richardson <[email protected]>
    Signed-off-by: Nic Crane <[email protected]>
---
 r/R/arrowExports.R                      |  16 +++-
 r/R/dplyr-summarize.R                   |   5 +
 r/R/dplyr.R                             |  35 ++++---
 r/R/query-engine.R                      | 158 ++++++++++++++++----------------
 r/src/arrowExports.cpp                  |  56 +++++++++--
 r/src/compute-exec.cpp                  |  50 +++++-----
 r/tests/testthat/test-dataset-dplyr.R   |  13 +--
 r/tests/testthat/test-dplyr-collapse.R  |   1 +
 r/tests/testthat/test-dplyr-query.R     | 114 ++++++++---------------
 r/tests/testthat/test-dplyr-summarize.R |  26 +++---
 10 files changed, 246 insertions(+), 228 deletions(-)

diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index a318c7a4f3..b3dd3a9601 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -444,8 +444,8 @@ ExecPlanReader__PlanStatus <- function(reader) {
   .Call(`_arrow_ExecPlanReader__PlanStatus`, reader)
 }
 
-ExecPlan_run <- function(plan, final_node, sort_options, metadata, head) {
-  .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, metadata, head)
+ExecPlan_run <- function(plan, final_node, metadata) {
+  .Call(`_arrow_ExecPlan_run`, plan, final_node, metadata)
 }
 
 ExecPlan_ToString <- function(plan) {
@@ -460,6 +460,10 @@ ExecNode_output_schema <- function(node) {
   .Call(`_arrow_ExecNode_output_schema`, node)
 }
 
+ExecNode_has_ordered_batches <- function(node) {
+  .Call(`_arrow_ExecNode_has_ordered_batches`, node)
+}
+
 ExecNode_Scan <- function(plan, dataset, filter, projection) {
   .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
 }
@@ -488,6 +492,14 @@ ExecNode_Union <- function(input, right_data) {
   .Call(`_arrow_ExecNode_Union`, input, right_data)
 }
 
+ExecNode_Fetch <- function(input, offset, limit) {
+  .Call(`_arrow_ExecNode_Fetch`, input, offset, limit)
+}
+
+ExecNode_OrderBy <- function(input, sort_options) {
+  .Call(`_arrow_ExecNode_OrderBy`, input, sort_options)
+}
+
 ExecNode_SourceNode <- function(plan, reader) {
   .Call(`_arrow_ExecNode_SourceNode`, plan, reader)
 }
diff --git a/r/R/dplyr-summarize.R b/r/R/dplyr-summarize.R
index 184c0aade4..5d943633a8 100644
--- a/r/R/dplyr-summarize.R
+++ b/r/R/dplyr-summarize.R
@@ -287,6 +287,11 @@ do_arrow_summarize <- function(.data, ..., .groups = NULL) 
{
       stop(paste("Invalid .groups argument:", .groups))
     }
     out$drop_empty_groups <- .data$drop_empty_groups
+    if (getOption("arrow.summarise.sort", FALSE)) {
+      # Add sorting instructions for the rows to match dplyr
+      out$arrange_vars <- .data$selected_columns[.data$group_by_vars]
+      out$arrange_desc <- rep(FALSE, length(.data$group_by_vars))
+    }
   }
   out
 }
diff --git a/r/R/dplyr.R b/r/R/dplyr.R
index 72e7480968..54ecc80aad 100644
--- a/r/R/dplyr.R
+++ b/r/R/dplyr.R
@@ -284,17 +284,7 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) {
 #'   mutate(x = gear / carb) %>%
 #'   show_exec_plan()
 show_exec_plan <- function(x) {
-  adq <- as_adq(x)
-
-  # do not show the plan if we have a nested query (as this will force the
-  # evaluation of the inner query/queries)
-  # TODO see if we can remove after ARROW-16628
-  if (is_collapsed(x) && has_head_tail(x$.data)) {
-    warn("The `ExecPlan` cannot be printed for a nested query.")
-    return(invisible(x))
-  }
-
-  result <- as_record_batch_reader(adq)
+  result <- as_record_batch_reader(as_adq(x))
   plan <- result$Plan()
   on.exit({
     plan$.unsafe_delete()
@@ -419,6 +409,25 @@ query_can_stream <- function(x) {
 
 is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query")
 
-has_head_tail <- function(x) {
-  !is.null(x$head) || !is.null(x$tail) || (is_collapsed(x) && 
has_head_tail(x$.data))
+has_unordered_head <- function(x) {
+  if (is.null(x$head %||% x$tail)) {
+    # no head/tail
+    return(FALSE)
+  }
+  !has_order(x)
+}
+
+has_order <- function(x) {
+  length(x$arrange_vars) > 0 ||
+    has_implicit_order(x) ||
+    (is_collapsed(x) && has_order(x$.data))
+}
+
+has_implicit_order <- function(x) {
+  # Approximate what ExecNode$has_ordered_batches() would return (w/o building 
ExecPlan)
+  # An in-memory table has an implicit order
+  # TODO(GH-34698): FileSystemDataset and RecordBatchReader will have implicit 
order
+  inherits(x$.data, "ArrowTabular") &&
+    # But joins, aggregations, etc. will result in non-deterministic order
+    is.null(x$aggregations) && is.null(x$join) && is.null(x$union_all)
 }
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 4b9b7ac459..79227546dd 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -74,11 +74,11 @@ ExecPlan <- R6Class("ExecPlan",
 
       if (is_collapsed(.data)) {
         # We have a nested query.
-        if (has_head_tail(.data$.data)) {
-          # head and tail are not ExecNodes; at best we can handle them via
-          # SinkNode, so if there are any steps done after head/tail, we need 
to
-          # evaluate the query up to then and then do a new query for the rest.
-          # as_record_batch_reader() will build and run an ExecPlan
+        if (has_unordered_head(.data$.data)) {
+          # TODO(GH-34941): FetchNode should do non-deterministic fetch
+          # Instead, we need to evaluate the query up to here,
+          # and then do a new query for the rest.
+          # as_record_batch_reader() will build and run an ExecPlan and do 
head() on it
           reader <- as_record_batch_reader(.data$.data)
           on.exit(reader$.unsafe_delete())
           node <- self$SourceNode(reader)
@@ -126,15 +126,6 @@ ExecPlan <- R6Class("ExecPlan",
           options = .data$aggregations,
           key_names = group_vars
         )
-
-        if (grouped && getOption("arrow.summarise.sort", FALSE)) {
-          # Add sorting instructions for the rows too to match dplyr
-          # (see below about why sorting isn't itself a Node)
-          node$extras$sort <- list(
-            names = group_vars,
-            orders = rep(0L, length(group_vars))
-          )
-        }
       } else {
         # If any columns are derived, reordered, or renamed we need to Project
         # If there are aggregations, the projection was already handled above.
@@ -166,82 +157,81 @@ ExecPlan <- R6Class("ExecPlan",
         }
       }
 
-      # Apply sorting: this is currently not an ExecNode itself, it is a
-      # sink node option.
-      # TODO: handle some cases:
-      # (1) arrange > summarize > arrange
-      # (2) ARROW-13779: arrange then operation where order matters (e.g. 
cumsum)
+      # Apply sorting and head/tail
+      head_or_tail <- .data$head %||% .data$tail
       if (length(.data$arrange_vars)) {
-        node$extras$sort <- list(
+        if (!is.null(.data$tail)) {
+          # Handle tail first: Reverse sort, take head
+          # TODO(GH-34942): FetchNode support for tail
+          node <- node$OrderBy(list(
+            names = names(.data$arrange_vars),
+            orders = as.integer(!.data$arrange_desc)
+          ))
+          node <- node$Fetch(.data$tail)
+        }
+        # Apply sorting
+        node <- node$OrderBy(list(
           names = names(.data$arrange_vars),
-          orders = .data$arrange_desc,
-          temp_columns = names(.data$temp_columns)
-        )
-      }
-      # This is only safe because we are going to evaluate queries that end
-      # with head/tail first, then evaluate any subsequent query as a new query
-      if (!is.null(.data$head)) {
-        node$extras$head <- .data$head
-      }
-      if (!is.null(.data$tail)) {
-        node$extras$tail <- .data$tail
+          orders = as.integer(.data$arrange_desc)
+        ))
+
+        if (length(.data$temp_columns)) {
+          # If we sorted on ad-hoc derived columns, Project to drop them
+          temp_schema <- node$schema
+          cols_to_keep <- setdiff(names(temp_schema), 
names(.data$temp_columns))
+          node <- node$Project(make_field_refs(cols_to_keep))
+        }
+
+        if (!is.null(.data$head)) {
+          # Take the head now
+          node <- node$Fetch(.data$head)
+        }
+      } else if (!is.null(head_or_tail)) {
+        # Unsorted head/tail
+        # Handle a couple of special cases here:
+        if (node$has_ordered_batches()) {
+          # Data that has order, even implicit order from an in-memory table, 
is supported
+          # in FetchNode
+          if (!is.null(.data$head)) {
+            node <- node$Fetch(.data$head)
+          } else {
+            # TODO(GH-34942): FetchNode support for tail
+            # FetchNode currently doesn't support tail, but it has limit + 
offset
+            # So if we know how many rows the query will result in, we can 
offset
+            data_without_tail <- .data
+            data_without_tail$tail <- NULL
+            row_count <- nrow(data_without_tail)
+            if (!is.na(row_count)) {
+              node <- node$Fetch(.data$tail, offset = row_count - .data$tail)
+            } else {
+              # Workaround: non-deterministic tail
+              node$extras$slice_size <- head_or_tail
+            }
+          }
+        } else {
+          # TODO(GH-34941): non-deterministic FetchNode
+          # Data has non-deterministic order, so head/tail means "just show me 
any N rows"
+          # FetchNode does not support non-deterministic scans, so we have to 
handle outside
+          node$extras$slice_size <- head_or_tail
+        }
       }
       node
     },
     Run = function(node) {
       assert_is(node, "ExecNode")
-
-      # Sorting and head/tail (if sorted) are handled in the SinkNode,
-      # created in ExecPlan_build
-      sorting <- node$extras$sort %||% list()
-      select_k <- node$extras$head %||% -1L
-      has_sorting <- length(sorting) > 0
-      if (has_sorting) {
-        if (!is.null(node$extras$tail)) {
-          # Reverse the sort order and take the top K, then after we'll reverse
-          # the resulting rows so that it is ordered as expected
-          sorting$orders <- !sorting$orders
-          select_k <- node$extras$tail
-        }
-        sorting$orders <- as.integer(sorting$orders)
-      }
-
       out <- ExecPlan_run(
         self,
         node,
-        sorting,
-        prepare_key_value_metadata(node$final_metadata()),
-        select_k
+        prepare_key_value_metadata(node$final_metadata())
       )
 
-      if (!has_sorting) {
-        # Since ExecPlans don't scan in deterministic order, head/tail are both
+      if (!is.null(node$extras$slice_size)) {
+        # For non-deterministic scans, head/tail are
         # essentially taking a random slice from somewhere in the dataset.
         # And since the head() implementation is way more efficient than 
tail(),
         # just use it to take the random slice
-        # TODO(ARROW-16628): handle limit in ExecNode
-        slice_size <- node$extras$head %||% node$extras$tail
-        if (!is.null(slice_size)) {
-          out <- head(out, slice_size)
-        }
-      } else if (!is.null(node$extras$tail)) {
-        # TODO(ARROW-16630): proper BottomK support
-        # Reverse the row order to get back what we expect
-        out <- as_arrow_table(out)
-        out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
-        out <- as_record_batch_reader(out)
-      }
-
-      # If arrange() created $temp_columns, make sure to omit them from the 
result
-      # We can't currently handle this in ExecPlan_run itself because sorting
-      # happens in the end (SinkNode) so nothing comes after it.
-      # TODO(ARROW-16631): move into ExecPlan
-      if (length(node$extras$sort$temp_columns) > 0) {
-        tab <- as_arrow_table(out)
-        tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop 
= FALSE]
-        out <- as_record_batch_reader(tab)
+        out <- head(out, node$extras$slice_size)
       }
-
       out
     },
     Write = function(node, ...) {
@@ -272,13 +262,8 @@ ExecNode <- R6Class("ExecNode",
   inherit = ArrowObject,
   public = list(
     extras = list(
-      # `sort` is a slight hack to be able to keep around arrange() params,
-      # which don't currently yield their own ExecNode but rather are consumed
-      # in the SinkNode (in ExecPlan$run())
-      sort = NULL,
-      # Similar hacks for head and tail
-      head = NULL,
-      tail = NULL,
+      # Workaround for non-deterministic head/tail
+      slice_size = NULL,
       # `source_schema` is put here in Scan() so that at Run/Write, we can
       # extract the relevant metadata and keep it in the result
       source_schema = NULL
@@ -295,6 +280,7 @@ ExecNode <- R6Class("ExecNode",
       old_meta$r <- get_r_metadata_from_old_schema(self$schema, old_schema)
       old_meta
     },
+    has_ordered_batches = function() ExecNode_has_ordered_batches(self),
     Project = function(cols) {
       if (length(cols)) {
         assert_is_list_of(cols, "Expression")
@@ -336,6 +322,16 @@ ExecNode <- R6Class("ExecNode",
     },
     Union = function(right_node) {
       self$preserve_extras(ExecNode_Union(self, right_node))
+    },
+    Fetch = function(limit, offset = 0L) {
+      self$preserve_extras(
+        ExecNode_Fetch(self, offset, limit)
+      )
+    },
+    OrderBy = function(sorting) {
+      self$preserve_extras(
+        ExecNode_OrderBy(self, sorting)
+      )
     }
   ),
   active = list(
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index dc4d0e9c70..65adcb764c 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -988,19 +988,17 @@ extern "C" SEXP _arrow_ExecPlanReader__PlanStatus(SEXP 
reader_sexp){
 
 // compute-exec.cpp
 #if defined(ARROW_R_WITH_ACERO)
-std::shared_ptr<ExecPlanReader> ExecPlan_run(const 
std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>& 
final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head);
-extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP 
sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){
+std::shared_ptr<ExecPlanReader> ExecPlan_run(const 
std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>& 
final_node, cpp11::strings metadata);
+extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP 
metadata_sexp){
 BEGIN_CPP11
        arrow::r::Input<const std::shared_ptr<acero::ExecPlan>&>::type 
plan(plan_sexp);
        arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type 
final_node(final_node_sexp);
-       arrow::r::Input<cpp11::list>::type sort_options(sort_options_sexp);
        arrow::r::Input<cpp11::strings>::type metadata(metadata_sexp);
-       arrow::r::Input<int64_t>::type head(head_sexp);
-       return cpp11::as_sexp(ExecPlan_run(plan, final_node, sort_options, 
metadata, head));
+       return cpp11::as_sexp(ExecPlan_run(plan, final_node, metadata));
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP 
sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){
+extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP 
metadata_sexp){
        Rf_error("Cannot call ExecPlan_run(). See 
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow 
C++ libraries. ");
 }
 #endif
@@ -1051,6 +1049,14 @@ extern "C" SEXP _arrow_ExecNode_output_schema(SEXP 
node_sexp){
 }
 #endif
 
+// compute-exec.cpp
+bool ExecNode_has_ordered_batches(const std::shared_ptr<acero::ExecNode>& 
node);
+extern "C" SEXP _arrow_ExecNode_has_ordered_batches(SEXP node_sexp){
+BEGIN_CPP11
+       arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type 
node(node_sexp);
+       return cpp11::as_sexp(ExecNode_has_ordered_batches(node));
+END_CPP11
+}
 // compute-exec.cpp
 #if defined(ARROW_R_WITH_DATASET)
 std::shared_ptr<acero::ExecNode> ExecNode_Scan(const 
std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& 
dataset, const std::shared_ptr<compute::Expression>& filter, cpp11::list 
projection);
@@ -1187,6 +1193,39 @@ extern "C" SEXP _arrow_ExecNode_Union(SEXP input_sexp, 
SEXP right_data_sexp){
 }
 #endif
 
+// compute-exec.cpp
+#if defined(ARROW_R_WITH_ACERO)
+std::shared_ptr<acero::ExecNode> ExecNode_Fetch(const 
std::shared_ptr<acero::ExecNode>& input, int64_t offset, int64_t limit);
+extern "C" SEXP _arrow_ExecNode_Fetch(SEXP input_sexp, SEXP offset_sexp, SEXP 
limit_sexp){
+BEGIN_CPP11
+       arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type 
input(input_sexp);
+       arrow::r::Input<int64_t>::type offset(offset_sexp);
+       arrow::r::Input<int64_t>::type limit(limit_sexp);
+       return cpp11::as_sexp(ExecNode_Fetch(input, offset, limit));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ExecNode_Fetch(SEXP input_sexp, SEXP offset_sexp, SEXP 
limit_sexp){
+       Rf_error("Cannot call ExecNode_Fetch(). See 
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow 
C++ libraries. ");
+}
+#endif
+
+// compute-exec.cpp
+#if defined(ARROW_R_WITH_ACERO)
+std::shared_ptr<acero::ExecNode> ExecNode_OrderBy(const 
std::shared_ptr<acero::ExecNode>& input, cpp11::list sort_options);
+extern "C" SEXP _arrow_ExecNode_OrderBy(SEXP input_sexp, SEXP 
sort_options_sexp){
+BEGIN_CPP11
+       arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type 
input(input_sexp);
+       arrow::r::Input<cpp11::list>::type sort_options(sort_options_sexp);
+       return cpp11::as_sexp(ExecNode_OrderBy(input, sort_options));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ExecNode_OrderBy(SEXP input_sexp, SEXP 
sort_options_sexp){
+       Rf_error("Cannot call ExecNode_OrderBy(). See 
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow 
C++ libraries. ");
+}
+#endif
+
 // compute-exec.cpp
 #if defined(ARROW_R_WITH_ACERO)
 std::shared_ptr<acero::ExecNode> ExecNode_SourceNode(const 
std::shared_ptr<acero::ExecPlan>& plan, const 
std::shared_ptr<arrow::RecordBatchReader>& reader);
@@ -5555,10 +5594,11 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_Table__from_ExecPlanReader", (DL_FUNC) 
&_arrow_Table__from_ExecPlanReader, 1}, 
                { "_arrow_ExecPlanReader__Plan", (DL_FUNC) 
&_arrow_ExecPlanReader__Plan, 1}, 
                { "_arrow_ExecPlanReader__PlanStatus", (DL_FUNC) 
&_arrow_ExecPlanReader__PlanStatus, 1}, 
-               { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5}, 
+               { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 3}, 
                { "_arrow_ExecPlan_ToString", (DL_FUNC) 
&_arrow_ExecPlan_ToString, 1}, 
                { "_arrow_ExecPlan_UnsafeDelete", (DL_FUNC) 
&_arrow_ExecPlan_UnsafeDelete, 1}, 
                { "_arrow_ExecNode_output_schema", (DL_FUNC) 
&_arrow_ExecNode_output_schema, 1}, 
+               { "_arrow_ExecNode_has_ordered_batches", (DL_FUNC) 
&_arrow_ExecNode_has_ordered_batches, 1}, 
                { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, 
                { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 
14}, 
                { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 
2}, 
@@ -5566,6 +5606,8 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_ExecNode_Aggregate", (DL_FUNC) 
&_arrow_ExecNode_Aggregate, 3}, 
                { "_arrow_ExecNode_Join", (DL_FUNC) &_arrow_ExecNode_Join, 9}, 
                { "_arrow_ExecNode_Union", (DL_FUNC) &_arrow_ExecNode_Union, 
2}, 
+               { "_arrow_ExecNode_Fetch", (DL_FUNC) &_arrow_ExecNode_Fetch, 
3}, 
+               { "_arrow_ExecNode_OrderBy", (DL_FUNC) 
&_arrow_ExecNode_OrderBy, 2}, 
                { "_arrow_ExecNode_SourceNode", (DL_FUNC) 
&_arrow_ExecNode_SourceNode, 2}, 
                { "_arrow_ExecNode_TableSourceNode", (DL_FUNC) 
&_arrow_ExecNode_TableSourceNode, 2}, 
                { "_arrow_substrait__internal__SubstraitToJSON", (DL_FUNC) 
&_arrow_substrait__internal__SubstraitToJSON, 1}, 
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index 9c7de915fa..347d4787fd 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -224,35 +224,13 @@ std::string ExecPlanReader__PlanStatus(const 
std::shared_ptr<ExecPlanReader>& re
 // [[acero::export]]
 std::shared_ptr<ExecPlanReader> ExecPlan_run(
     const std::shared_ptr<acero::ExecPlan>& plan,
-    const std::shared_ptr<acero::ExecNode>& final_node, cpp11::list 
sort_options,
-    cpp11::strings metadata, int64_t head = -1) {
+    const std::shared_ptr<acero::ExecNode>& final_node, cpp11::strings 
metadata) {
   // For now, don't require R to construct SinkNodes.
   // Instead, just pass the node we should collect as an argument.
   arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen;
 
-  // Sorting uses a different sink node; there is no general sort yet
-  if (sort_options.size() > 0) {
-    if (head >= 0) {
-      // Use the SelectK node to take only what we need
-      MakeExecNodeOrStop(
-          "select_k_sink", plan.get(), {final_node.get()},
-          acero::SelectKSinkNodeOptions{
-              arrow::compute::SelectKOptions(
-                  head, std::dynamic_pointer_cast<compute::SortOptions>(
-                            make_compute_options("sort_indices", sort_options))
-                            ->sort_keys),
-              &sink_gen});
-    } else {
-      MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
-                         acero::OrderBySinkNodeOptions{
-                             *std::dynamic_pointer_cast<compute::SortOptions>(
-                                 make_compute_options("sort_indices", 
sort_options)),
-                             &sink_gen});
-    }
-  } else {
-    MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
-                       acero::SinkNodeOptions{&sink_gen});
-  }
+  MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
+                     acero::SinkNodeOptions{&sink_gen});
 
   StopIfNotOk(plan->Validate());
 
@@ -283,6 +261,11 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema(
   return node->output_schema();
 }
 
+// [[arrow::export]]
+bool ExecNode_has_ordered_batches(const std::shared_ptr<acero::ExecNode>& 
node) {
+  return !node->ordering().is_unordered();
+}
+
 #if defined(ARROW_R_WITH_DATASET)
 
 #include <arrow/dataset/file_base.h>
@@ -460,6 +443,23 @@ std::shared_ptr<acero::ExecNode> ExecNode_Union(
   return MakeExecNodeOrStop("union", input->plan(), {input.get(), 
right_data.get()}, {});
 }
 
+// [[acero::export]]
+std::shared_ptr<acero::ExecNode> ExecNode_Fetch(
+    const std::shared_ptr<acero::ExecNode>& input, int64_t offset, int64_t 
limit) {
+  return MakeExecNodeOrStop("fetch", input->plan(), {input.get()},
+                            acero::FetchNodeOptions{offset, limit});
+}
+
+// [[acero::export]]
+std::shared_ptr<acero::ExecNode> ExecNode_OrderBy(
+    const std::shared_ptr<acero::ExecNode>& input, cpp11::list sort_options) {
+  return MakeExecNodeOrStop(
+      "order_by", input->plan(), {input.get()},
+      
acero::OrderByNodeOptions{std::dynamic_pointer_cast<compute::SortOptions>(
+                                    make_compute_options("sort_indices", 
sort_options))
+                                    ->AsOrdering()});
+}
+
 // [[acero::export]]
 std::shared_ptr<acero::ExecNode> ExecNode_SourceNode(
     const std::shared_ptr<acero::ExecPlan>& plan,
diff --git a/r/tests/testthat/test-dataset-dplyr.R 
b/r/tests/testthat/test-dataset-dplyr.R
index e20a6262b7..b8d9384192 100644
--- a/r/tests/testthat/test-dataset-dplyr.R
+++ b/r/tests/testthat/test-dataset-dplyr.R
@@ -397,22 +397,11 @@ test_that("show_exec_plan(), show_query() and explain() 
with datasets", {
       show_exec_plan(),
     regexp = paste0(
       "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
-      "OrderBySinkNode.*chr.*ASC.*", # arrange goes via the OrderBy sink node
+      "OrderByNode.*chr.*ASC.*", # arrange goes via the OrderBy node
       "ProjectNode.*", # output columns
       "FilterNode.*", # filter node
       "filter=lgl.*", # filtering expression
       "SourceNode" # entry point
     )
   )
-
-  # printing the ExecPlan for a nested query would currently force the
-  # evaluation of the inner one(s), which we want to avoid => no output
-  expect_warning(
-    ds %>%
-      filter(lgl) %>%
-      arrange(chr) %>%
-      head() %>%
-      show_exec_plan(),
-    "The `ExecPlan` cannot be printed for a nested query."
-  )
 })
diff --git a/r/tests/testthat/test-dplyr-collapse.R 
b/r/tests/testthat/test-dplyr-collapse.R
index 198827e235..a8aa5556f1 100644
--- a/r/tests/testthat/test-dplyr-collapse.R
+++ b/r/tests/testthat/test-dplyr-collapse.R
@@ -167,6 +167,7 @@ lgl: bool
 total: int64
 extra: int64 (multiply_checked(total, 5))
 
+* Sorted by lgl [asc]
 See $.data for the source Arrow object",
     fixed = TRUE
   )
diff --git a/r/tests/testthat/test-dplyr-query.R 
b/r/tests/testthat/test-dplyr-query.R
index 5dbdb0e522..e478d0e4c4 100644
--- a/r/tests/testthat/test-dplyr-query.R
+++ b/r/tests/testthat/test-dplyr-query.R
@@ -180,58 +180,26 @@ test_that("compute()", {
 })
 
 test_that("head", {
-  batch <- record_batch(tbl)
-
-  b2 <- batch %>%
-    select(int, chr) %>%
-    filter(int > 5) %>%
-    head(2)
-  expect_s3_class(b2, "arrow_dplyr_query")
-  expected <- tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")][1:2, ]
-  expect_equal(collect(b2), expected)
-
-  b3 <- batch %>%
-    select(int, strng = chr) %>%
-    filter(int > 5) %>%
-    head(2)
-  expect_s3_class(b3, "arrow_dplyr_query")
-  expect_equal(as.data.frame(b3), set_names(expected, c("int", "strng")))
-
-  b4 <- batch %>%
-    select(int, strng = chr) %>%
-    filter(int > 5) %>%
-    group_by(int) %>%
-    head(2)
-  expect_s3_class(b4, "arrow_dplyr_query")
-  expect_equal(
-    as.data.frame(b4),
-    expected %>%
-      rename(strng = chr) %>%
-      group_by(int)
-  )
-
-  expect_equal(
-    batch %>%
+  compare_dplyr_binding(
+    .input %>%
       select(int, strng = chr) %>%
       filter(int > 5) %>%
+      group_by(int) %>%
       head(2) %>%
-      mutate(twice = int * 2) %>%
       collect(),
-    expected %>%
-      rename(strng = chr) %>%
-      mutate(twice = int * 2)
+    tbl
   )
 
   # This would fail if we evaluated head() after filter()
-  expect_equal(
-    batch %>%
+  compare_dplyr_binding(
+    .input %>%
       select(int, strng = chr) %>%
+      arrange(int) %>%
       head(2) %>%
       filter(int > 5) %>%
+      mutate(twice = int * 2) %>%
       collect(),
-    expected %>%
-      rename(strng = chr) %>%
-      filter(FALSE)
+    tbl
   )
 })
 
@@ -260,38 +228,25 @@ test_that("arrange then tail returns the right data", {
 })
 
 test_that("tail", {
-  batch <- record_batch(tbl)
-
-  b2 <- batch %>%
-    select(int, chr) %>%
-    filter(int > 5) %>%
-    arrange(int) %>%
-    tail(2)
-
-  expect_s3_class(b2, "arrow_dplyr_query")
-  expected <- tail(tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")], 2)
-  expect_equal(as.data.frame(b2), expected)
-
-  b3 <- batch %>%
-    select(int, strng = chr) %>%
-    filter(int > 5) %>%
-    arrange(int) %>%
-    tail(2)
-  expect_s3_class(b3, "arrow_dplyr_query")
-  expect_equal(as.data.frame(b3), set_names(expected, c("int", "strng")))
-
-  b4 <- batch %>%
-    select(int, strng = chr) %>%
-    filter(int > 5) %>%
-    group_by(int) %>%
-    arrange(int) %>%
-    tail(2)
-  expect_s3_class(b4, "arrow_dplyr_query")
-  expect_equal(
-    as.data.frame(b4),
-    expected %>%
-      rename(strng = chr) %>%
-      group_by(int)
+  # With sorting
+  compare_dplyr_binding(
+    .input %>%
+      select(int, chr) %>%
+      filter(int < 5) %>%
+      arrange(int) %>%
+      tail(2) %>%
+      collect(),
+    tbl
+  )
+  # Without sorting: table order is implicit, and we can compute the filter
+  # row length, so the query can use Fetch with offset
+  compare_dplyr_binding(
+    .input %>%
+      select(int, chr) %>%
+      filter(int < 5) %>%
+      tail(2) %>%
+      collect(),
+    tbl
   )
 })
 
@@ -550,7 +505,7 @@ test_that("show_exec_plan(), show_query() and explain()", {
       show_exec_plan(),
     regexp = paste0(
       "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
-      "OrderBySinkNode.*wt.*DESC.*", # arrange goes via the OrderBy sink node
+      "OrderBy.*wt.*DESC.*", # arrange goes via the OrderBy node
       "FilterNode.*", # filter node
       "TableSourceNode.*" # entry point
     )
@@ -558,14 +513,19 @@ test_that("show_exec_plan(), show_query() and explain()", 
{
 
   # printing the ExecPlan for a nested query would currently force the
   # evaluation of the inner one(s), which we want to avoid => no output
-  expect_warning(
+  expect_output(
     mtcars %>%
       arrow_table() %>%
       filter(mpg > 20) %>%
-      arrange(desc(wt)) %>%
       head(3) %>%
       show_exec_plan(),
-    "The `ExecPlan` cannot be printed for a nested query."
+    paste0(
+      "ExecPlan with 4 nodes:.*",
+      "3:SinkNode.*",
+      "2:FetchNode.offset=0 count=3.*",
+      "1:FilterNode.filter=.mpg > 20.*",
+      "0:TableSourceNode.*"
+    )
   )
 })
 
diff --git a/r/tests/testthat/test-dplyr-summarize.R 
b/r/tests/testthat/test-dplyr-summarize.R
index 1b834df19f..3eb1a6ed2b 100644
--- a/r/tests/testthat/test-dplyr-summarize.R
+++ b/r/tests/testthat/test-dplyr-summarize.R
@@ -616,17 +616,21 @@ test_that("min() and max() on character strings", {
       collect(),
     tbl,
   )
-  compare_dplyr_binding(
-    .input %>%
-      group_by(fct) %>%
-      summarize(
-        min_chr = min(chr, na.rm = TRUE),
-        max_chr = max(chr, na.rm = TRUE)
-      ) %>%
-      arrange(min_chr) %>%
-      collect(),
-    tbl,
-  )
+  withr::with_options(list(arrow.summarise.sort = FALSE), {
+    # TODO(#29887 / ARROW-14313) sorting on dictionary columns not supported
+    # so turn off arrow.summarise.sort so that we don't order_by fct after 
summarize
+    compare_dplyr_binding(
+      .input %>%
+        group_by(fct) %>%
+        summarize(
+          min_chr = min(chr, na.rm = TRUE),
+          max_chr = max(chr, na.rm = TRUE)
+        ) %>%
+        arrange(min_chr) %>%
+        collect(),
+      tbl,
+    )
+  })
 })
 
 test_that("summarise() with !!sym()", {

Reply via email to