This is an automated email from the ASF dual-hosted git repository.
thisisnic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 47a602dbd9 GH-34437: [R] Use FetchNode and OrderByNode (#34685)
47a602dbd9 is described below
commit 47a602dbd9b7b7f7720a5e62467e3e6c61712cf3
Author: Neal Richardson <[email protected]>
AuthorDate: Tue Apr 11 12:19:49 2023 -0400
GH-34437: [R] Use FetchNode and OrderByNode (#34685)
### Rationale for this change
See also #32991. By using the new nodes, we're closer to having all dplyr
query business happening inside the ExecPlan. Unfortunately, there are still
two cases where we have to apply operations in R after running a query:
* #34941: Taking head/tail on unordered data, which has non-deterministic
results but that should be possible, in the case where the user wants to see a
slice of the result, any slice
* #34942: Implementing tail in the FetchNode or similar would enable
removing more hacks and workarounds.
Once those are resolved, we can simply further and then move to the new
Declaration class.
### What changes are included in this PR?
This removes the use of different SinkNodes and many R-specific workarounds
to support sorting and head/tail, so *almost*
everything we do in a query should be represented in an ExecPlan.
### Are these changes tested?
Yes. This is mostly an internal refactor, but behavior changes are
accompanied by test updates.
### Are there any user-facing changes?
The `show_query()` method will print slightly different ExecPlans. In many
cases, they will be more informative.
`tail()` now actually returns the tail of the data in cases where the data
has an implicit order (currently only in-memory tables). Previously it was
non-deterministic (and would return the head or some other slice of the data).
When printing query objects that include `summarize()` when the
`arrow.summarize.sort = TRUE` option is set, the sorting is correctly printed.
It's unclear if there should be changes in performance; running benchmarks
would be good but it's also not clear that our benchmarks cover all affected
scenarios.
* Closes: #34437
* Closes: #31980
* Closes: #31982
Authored-by: Neal Richardson <[email protected]>
Signed-off-by: Nic Crane <[email protected]>
---
r/R/arrowExports.R | 16 +++-
r/R/dplyr-summarize.R | 5 +
r/R/dplyr.R | 35 ++++---
r/R/query-engine.R | 158 ++++++++++++++++----------------
r/src/arrowExports.cpp | 56 +++++++++--
r/src/compute-exec.cpp | 50 +++++-----
r/tests/testthat/test-dataset-dplyr.R | 13 +--
r/tests/testthat/test-dplyr-collapse.R | 1 +
r/tests/testthat/test-dplyr-query.R | 114 ++++++++---------------
r/tests/testthat/test-dplyr-summarize.R | 26 +++---
10 files changed, 246 insertions(+), 228 deletions(-)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index a318c7a4f3..b3dd3a9601 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -444,8 +444,8 @@ ExecPlanReader__PlanStatus <- function(reader) {
.Call(`_arrow_ExecPlanReader__PlanStatus`, reader)
}
-ExecPlan_run <- function(plan, final_node, sort_options, metadata, head) {
- .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, metadata, head)
+ExecPlan_run <- function(plan, final_node, metadata) {
+ .Call(`_arrow_ExecPlan_run`, plan, final_node, metadata)
}
ExecPlan_ToString <- function(plan) {
@@ -460,6 +460,10 @@ ExecNode_output_schema <- function(node) {
.Call(`_arrow_ExecNode_output_schema`, node)
}
+ExecNode_has_ordered_batches <- function(node) {
+ .Call(`_arrow_ExecNode_has_ordered_batches`, node)
+}
+
ExecNode_Scan <- function(plan, dataset, filter, projection) {
.Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
}
@@ -488,6 +492,14 @@ ExecNode_Union <- function(input, right_data) {
.Call(`_arrow_ExecNode_Union`, input, right_data)
}
+ExecNode_Fetch <- function(input, offset, limit) {
+ .Call(`_arrow_ExecNode_Fetch`, input, offset, limit)
+}
+
+ExecNode_OrderBy <- function(input, sort_options) {
+ .Call(`_arrow_ExecNode_OrderBy`, input, sort_options)
+}
+
ExecNode_SourceNode <- function(plan, reader) {
.Call(`_arrow_ExecNode_SourceNode`, plan, reader)
}
diff --git a/r/R/dplyr-summarize.R b/r/R/dplyr-summarize.R
index 184c0aade4..5d943633a8 100644
--- a/r/R/dplyr-summarize.R
+++ b/r/R/dplyr-summarize.R
@@ -287,6 +287,11 @@ do_arrow_summarize <- function(.data, ..., .groups = NULL)
{
stop(paste("Invalid .groups argument:", .groups))
}
out$drop_empty_groups <- .data$drop_empty_groups
+ if (getOption("arrow.summarise.sort", FALSE)) {
+ # Add sorting instructions for the rows to match dplyr
+ out$arrange_vars <- .data$selected_columns[.data$group_by_vars]
+ out$arrange_desc <- rep(FALSE, length(.data$group_by_vars))
+ }
}
out
}
diff --git a/r/R/dplyr.R b/r/R/dplyr.R
index 72e7480968..54ecc80aad 100644
--- a/r/R/dplyr.R
+++ b/r/R/dplyr.R
@@ -284,17 +284,7 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) {
#' mutate(x = gear / carb) %>%
#' show_exec_plan()
show_exec_plan <- function(x) {
- adq <- as_adq(x)
-
- # do not show the plan if we have a nested query (as this will force the
- # evaluation of the inner query/queries)
- # TODO see if we can remove after ARROW-16628
- if (is_collapsed(x) && has_head_tail(x$.data)) {
- warn("The `ExecPlan` cannot be printed for a nested query.")
- return(invisible(x))
- }
-
- result <- as_record_batch_reader(adq)
+ result <- as_record_batch_reader(as_adq(x))
plan <- result$Plan()
on.exit({
plan$.unsafe_delete()
@@ -419,6 +409,25 @@ query_can_stream <- function(x) {
is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query")
-has_head_tail <- function(x) {
- !is.null(x$head) || !is.null(x$tail) || (is_collapsed(x) &&
has_head_tail(x$.data))
+has_unordered_head <- function(x) {
+ if (is.null(x$head %||% x$tail)) {
+ # no head/tail
+ return(FALSE)
+ }
+ !has_order(x)
+}
+
+has_order <- function(x) {
+ length(x$arrange_vars) > 0 ||
+ has_implicit_order(x) ||
+ (is_collapsed(x) && has_order(x$.data))
+}
+
+has_implicit_order <- function(x) {
+ # Approximate what ExecNode$has_ordered_batches() would return (w/o building
ExecPlan)
+ # An in-memory table has an implicit order
+ # TODO(GH-34698): FileSystemDataset and RecordBatchReader will have implicit
order
+ inherits(x$.data, "ArrowTabular") &&
+ # But joins, aggregations, etc. will result in non-deterministic order
+ is.null(x$aggregations) && is.null(x$join) && is.null(x$union_all)
}
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 4b9b7ac459..79227546dd 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -74,11 +74,11 @@ ExecPlan <- R6Class("ExecPlan",
if (is_collapsed(.data)) {
# We have a nested query.
- if (has_head_tail(.data$.data)) {
- # head and tail are not ExecNodes; at best we can handle them via
- # SinkNode, so if there are any steps done after head/tail, we need
to
- # evaluate the query up to then and then do a new query for the rest.
- # as_record_batch_reader() will build and run an ExecPlan
+ if (has_unordered_head(.data$.data)) {
+ # TODO(GH-34941): FetchNode should do non-deterministic fetch
+ # Instead, we need to evaluate the query up to here,
+ # and then do a new query for the rest.
+ # as_record_batch_reader() will build and run an ExecPlan and do
head() on it
reader <- as_record_batch_reader(.data$.data)
on.exit(reader$.unsafe_delete())
node <- self$SourceNode(reader)
@@ -126,15 +126,6 @@ ExecPlan <- R6Class("ExecPlan",
options = .data$aggregations,
key_names = group_vars
)
-
- if (grouped && getOption("arrow.summarise.sort", FALSE)) {
- # Add sorting instructions for the rows too to match dplyr
- # (see below about why sorting isn't itself a Node)
- node$extras$sort <- list(
- names = group_vars,
- orders = rep(0L, length(group_vars))
- )
- }
} else {
# If any columns are derived, reordered, or renamed we need to Project
# If there are aggregations, the projection was already handled above.
@@ -166,82 +157,81 @@ ExecPlan <- R6Class("ExecPlan",
}
}
- # Apply sorting: this is currently not an ExecNode itself, it is a
- # sink node option.
- # TODO: handle some cases:
- # (1) arrange > summarize > arrange
- # (2) ARROW-13779: arrange then operation where order matters (e.g.
cumsum)
+ # Apply sorting and head/tail
+ head_or_tail <- .data$head %||% .data$tail
if (length(.data$arrange_vars)) {
- node$extras$sort <- list(
+ if (!is.null(.data$tail)) {
+ # Handle tail first: Reverse sort, take head
+ # TODO(GH-34942): FetchNode support for tail
+ node <- node$OrderBy(list(
+ names = names(.data$arrange_vars),
+ orders = as.integer(!.data$arrange_desc)
+ ))
+ node <- node$Fetch(.data$tail)
+ }
+ # Apply sorting
+ node <- node$OrderBy(list(
names = names(.data$arrange_vars),
- orders = .data$arrange_desc,
- temp_columns = names(.data$temp_columns)
- )
- }
- # This is only safe because we are going to evaluate queries that end
- # with head/tail first, then evaluate any subsequent query as a new query
- if (!is.null(.data$head)) {
- node$extras$head <- .data$head
- }
- if (!is.null(.data$tail)) {
- node$extras$tail <- .data$tail
+ orders = as.integer(.data$arrange_desc)
+ ))
+
+ if (length(.data$temp_columns)) {
+ # If we sorted on ad-hoc derived columns, Project to drop them
+ temp_schema <- node$schema
+ cols_to_keep <- setdiff(names(temp_schema),
names(.data$temp_columns))
+ node <- node$Project(make_field_refs(cols_to_keep))
+ }
+
+ if (!is.null(.data$head)) {
+ # Take the head now
+ node <- node$Fetch(.data$head)
+ }
+ } else if (!is.null(head_or_tail)) {
+ # Unsorted head/tail
+ # Handle a couple of special cases here:
+ if (node$has_ordered_batches()) {
+ # Data that has order, even implicit order from an in-memory table,
is supported
+ # in FetchNode
+ if (!is.null(.data$head)) {
+ node <- node$Fetch(.data$head)
+ } else {
+ # TODO(GH-34942): FetchNode support for tail
+ # FetchNode currently doesn't support tail, but it has limit +
offset
+ # So if we know how many rows the query will result in, we can
offset
+ data_without_tail <- .data
+ data_without_tail$tail <- NULL
+ row_count <- nrow(data_without_tail)
+ if (!is.na(row_count)) {
+ node <- node$Fetch(.data$tail, offset = row_count - .data$tail)
+ } else {
+ # Workaround: non-deterministic tail
+ node$extras$slice_size <- head_or_tail
+ }
+ }
+ } else {
+ # TODO(GH-34941): non-deterministic FetchNode
+ # Data has non-deterministic order, so head/tail means "just show me
any N rows"
+ # FetchNode does not support non-deterministic scans, so we have to
handle outside
+ node$extras$slice_size <- head_or_tail
+ }
}
node
},
Run = function(node) {
assert_is(node, "ExecNode")
-
- # Sorting and head/tail (if sorted) are handled in the SinkNode,
- # created in ExecPlan_build
- sorting <- node$extras$sort %||% list()
- select_k <- node$extras$head %||% -1L
- has_sorting <- length(sorting) > 0
- if (has_sorting) {
- if (!is.null(node$extras$tail)) {
- # Reverse the sort order and take the top K, then after we'll reverse
- # the resulting rows so that it is ordered as expected
- sorting$orders <- !sorting$orders
- select_k <- node$extras$tail
- }
- sorting$orders <- as.integer(sorting$orders)
- }
-
out <- ExecPlan_run(
self,
node,
- sorting,
- prepare_key_value_metadata(node$final_metadata()),
- select_k
+ prepare_key_value_metadata(node$final_metadata())
)
- if (!has_sorting) {
- # Since ExecPlans don't scan in deterministic order, head/tail are both
+ if (!is.null(node$extras$slice_size)) {
+ # For non-deterministic scans, head/tail are
# essentially taking a random slice from somewhere in the dataset.
# And since the head() implementation is way more efficient than
tail(),
# just use it to take the random slice
- # TODO(ARROW-16628): handle limit in ExecNode
- slice_size <- node$extras$head %||% node$extras$tail
- if (!is.null(slice_size)) {
- out <- head(out, slice_size)
- }
- } else if (!is.null(node$extras$tail)) {
- # TODO(ARROW-16630): proper BottomK support
- # Reverse the row order to get back what we expect
- out <- as_arrow_table(out)
- out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
- out <- as_record_batch_reader(out)
- }
-
- # If arrange() created $temp_columns, make sure to omit them from the
result
- # We can't currently handle this in ExecPlan_run itself because sorting
- # happens in the end (SinkNode) so nothing comes after it.
- # TODO(ARROW-16631): move into ExecPlan
- if (length(node$extras$sort$temp_columns) > 0) {
- tab <- as_arrow_table(out)
- tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop
= FALSE]
- out <- as_record_batch_reader(tab)
+ out <- head(out, node$extras$slice_size)
}
-
out
},
Write = function(node, ...) {
@@ -272,13 +262,8 @@ ExecNode <- R6Class("ExecNode",
inherit = ArrowObject,
public = list(
extras = list(
- # `sort` is a slight hack to be able to keep around arrange() params,
- # which don't currently yield their own ExecNode but rather are consumed
- # in the SinkNode (in ExecPlan$run())
- sort = NULL,
- # Similar hacks for head and tail
- head = NULL,
- tail = NULL,
+ # Workaround for non-deterministic head/tail
+ slice_size = NULL,
# `source_schema` is put here in Scan() so that at Run/Write, we can
# extract the relevant metadata and keep it in the result
source_schema = NULL
@@ -295,6 +280,7 @@ ExecNode <- R6Class("ExecNode",
old_meta$r <- get_r_metadata_from_old_schema(self$schema, old_schema)
old_meta
},
+ has_ordered_batches = function() ExecNode_has_ordered_batches(self),
Project = function(cols) {
if (length(cols)) {
assert_is_list_of(cols, "Expression")
@@ -336,6 +322,16 @@ ExecNode <- R6Class("ExecNode",
},
Union = function(right_node) {
self$preserve_extras(ExecNode_Union(self, right_node))
+ },
+ Fetch = function(limit, offset = 0L) {
+ self$preserve_extras(
+ ExecNode_Fetch(self, offset, limit)
+ )
+ },
+ OrderBy = function(sorting) {
+ self$preserve_extras(
+ ExecNode_OrderBy(self, sorting)
+ )
}
),
active = list(
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index dc4d0e9c70..65adcb764c 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -988,19 +988,17 @@ extern "C" SEXP _arrow_ExecPlanReader__PlanStatus(SEXP
reader_sexp){
// compute-exec.cpp
#if defined(ARROW_R_WITH_ACERO)
-std::shared_ptr<ExecPlanReader> ExecPlan_run(const
std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>&
final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head);
-extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP
sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){
+std::shared_ptr<ExecPlanReader> ExecPlan_run(const
std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>&
final_node, cpp11::strings metadata);
+extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP
metadata_sexp){
BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<acero::ExecPlan>&>::type
plan(plan_sexp);
arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type
final_node(final_node_sexp);
- arrow::r::Input<cpp11::list>::type sort_options(sort_options_sexp);
arrow::r::Input<cpp11::strings>::type metadata(metadata_sexp);
- arrow::r::Input<int64_t>::type head(head_sexp);
- return cpp11::as_sexp(ExecPlan_run(plan, final_node, sort_options,
metadata, head));
+ return cpp11::as_sexp(ExecPlan_run(plan, final_node, metadata));
END_CPP11
}
#else
-extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP
sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){
+extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP
metadata_sexp){
Rf_error("Cannot call ExecPlan_run(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
}
#endif
@@ -1051,6 +1049,14 @@ extern "C" SEXP _arrow_ExecNode_output_schema(SEXP
node_sexp){
}
#endif
+// compute-exec.cpp
+bool ExecNode_has_ordered_batches(const std::shared_ptr<acero::ExecNode>&
node);
+extern "C" SEXP _arrow_ExecNode_has_ordered_batches(SEXP node_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type
node(node_sexp);
+ return cpp11::as_sexp(ExecNode_has_ordered_batches(node));
+END_CPP11
+}
// compute-exec.cpp
#if defined(ARROW_R_WITH_DATASET)
std::shared_ptr<acero::ExecNode> ExecNode_Scan(const
std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>&
dataset, const std::shared_ptr<compute::Expression>& filter, cpp11::list
projection);
@@ -1187,6 +1193,39 @@ extern "C" SEXP _arrow_ExecNode_Union(SEXP input_sexp,
SEXP right_data_sexp){
}
#endif
+// compute-exec.cpp
+#if defined(ARROW_R_WITH_ACERO)
+std::shared_ptr<acero::ExecNode> ExecNode_Fetch(const
std::shared_ptr<acero::ExecNode>& input, int64_t offset, int64_t limit);
+extern "C" SEXP _arrow_ExecNode_Fetch(SEXP input_sexp, SEXP offset_sexp, SEXP
limit_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type
input(input_sexp);
+ arrow::r::Input<int64_t>::type offset(offset_sexp);
+ arrow::r::Input<int64_t>::type limit(limit_sexp);
+ return cpp11::as_sexp(ExecNode_Fetch(input, offset, limit));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ExecNode_Fetch(SEXP input_sexp, SEXP offset_sexp, SEXP
limit_sexp){
+ Rf_error("Cannot call ExecNode_Fetch(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
+}
+#endif
+
+// compute-exec.cpp
+#if defined(ARROW_R_WITH_ACERO)
+std::shared_ptr<acero::ExecNode> ExecNode_OrderBy(const
std::shared_ptr<acero::ExecNode>& input, cpp11::list sort_options);
+extern "C" SEXP _arrow_ExecNode_OrderBy(SEXP input_sexp, SEXP
sort_options_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type
input(input_sexp);
+ arrow::r::Input<cpp11::list>::type sort_options(sort_options_sexp);
+ return cpp11::as_sexp(ExecNode_OrderBy(input, sort_options));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ExecNode_OrderBy(SEXP input_sexp, SEXP
sort_options_sexp){
+ Rf_error("Cannot call ExecNode_OrderBy(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
+}
+#endif
+
// compute-exec.cpp
#if defined(ARROW_R_WITH_ACERO)
std::shared_ptr<acero::ExecNode> ExecNode_SourceNode(const
std::shared_ptr<acero::ExecPlan>& plan, const
std::shared_ptr<arrow::RecordBatchReader>& reader);
@@ -5555,10 +5594,11 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_Table__from_ExecPlanReader", (DL_FUNC)
&_arrow_Table__from_ExecPlanReader, 1},
{ "_arrow_ExecPlanReader__Plan", (DL_FUNC)
&_arrow_ExecPlanReader__Plan, 1},
{ "_arrow_ExecPlanReader__PlanStatus", (DL_FUNC)
&_arrow_ExecPlanReader__PlanStatus, 1},
- { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5},
+ { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 3},
{ "_arrow_ExecPlan_ToString", (DL_FUNC)
&_arrow_ExecPlan_ToString, 1},
{ "_arrow_ExecPlan_UnsafeDelete", (DL_FUNC)
&_arrow_ExecPlan_UnsafeDelete, 1},
{ "_arrow_ExecNode_output_schema", (DL_FUNC)
&_arrow_ExecNode_output_schema, 1},
+ { "_arrow_ExecNode_has_ordered_batches", (DL_FUNC)
&_arrow_ExecNode_has_ordered_batches, 1},
{ "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4},
{ "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write,
14},
{ "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter,
2},
@@ -5566,6 +5606,8 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ExecNode_Aggregate", (DL_FUNC)
&_arrow_ExecNode_Aggregate, 3},
{ "_arrow_ExecNode_Join", (DL_FUNC) &_arrow_ExecNode_Join, 9},
{ "_arrow_ExecNode_Union", (DL_FUNC) &_arrow_ExecNode_Union,
2},
+ { "_arrow_ExecNode_Fetch", (DL_FUNC) &_arrow_ExecNode_Fetch,
3},
+ { "_arrow_ExecNode_OrderBy", (DL_FUNC)
&_arrow_ExecNode_OrderBy, 2},
{ "_arrow_ExecNode_SourceNode", (DL_FUNC)
&_arrow_ExecNode_SourceNode, 2},
{ "_arrow_ExecNode_TableSourceNode", (DL_FUNC)
&_arrow_ExecNode_TableSourceNode, 2},
{ "_arrow_substrait__internal__SubstraitToJSON", (DL_FUNC)
&_arrow_substrait__internal__SubstraitToJSON, 1},
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index 9c7de915fa..347d4787fd 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -224,35 +224,13 @@ std::string ExecPlanReader__PlanStatus(const
std::shared_ptr<ExecPlanReader>& re
// [[acero::export]]
std::shared_ptr<ExecPlanReader> ExecPlan_run(
const std::shared_ptr<acero::ExecPlan>& plan,
- const std::shared_ptr<acero::ExecNode>& final_node, cpp11::list
sort_options,
- cpp11::strings metadata, int64_t head = -1) {
+ const std::shared_ptr<acero::ExecNode>& final_node, cpp11::strings
metadata) {
// For now, don't require R to construct SinkNodes.
// Instead, just pass the node we should collect as an argument.
arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen;
- // Sorting uses a different sink node; there is no general sort yet
- if (sort_options.size() > 0) {
- if (head >= 0) {
- // Use the SelectK node to take only what we need
- MakeExecNodeOrStop(
- "select_k_sink", plan.get(), {final_node.get()},
- acero::SelectKSinkNodeOptions{
- arrow::compute::SelectKOptions(
- head, std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices", sort_options))
- ->sort_keys),
- &sink_gen});
- } else {
- MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
- acero::OrderBySinkNodeOptions{
- *std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices",
sort_options)),
- &sink_gen});
- }
- } else {
- MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
- acero::SinkNodeOptions{&sink_gen});
- }
+ MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
+ acero::SinkNodeOptions{&sink_gen});
StopIfNotOk(plan->Validate());
@@ -283,6 +261,11 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema(
return node->output_schema();
}
+// [[arrow::export]]
+bool ExecNode_has_ordered_batches(const std::shared_ptr<acero::ExecNode>&
node) {
+ return !node->ordering().is_unordered();
+}
+
#if defined(ARROW_R_WITH_DATASET)
#include <arrow/dataset/file_base.h>
@@ -460,6 +443,23 @@ std::shared_ptr<acero::ExecNode> ExecNode_Union(
return MakeExecNodeOrStop("union", input->plan(), {input.get(),
right_data.get()}, {});
}
+// [[acero::export]]
+std::shared_ptr<acero::ExecNode> ExecNode_Fetch(
+ const std::shared_ptr<acero::ExecNode>& input, int64_t offset, int64_t
limit) {
+ return MakeExecNodeOrStop("fetch", input->plan(), {input.get()},
+ acero::FetchNodeOptions{offset, limit});
+}
+
+// [[acero::export]]
+std::shared_ptr<acero::ExecNode> ExecNode_OrderBy(
+ const std::shared_ptr<acero::ExecNode>& input, cpp11::list sort_options) {
+ return MakeExecNodeOrStop(
+ "order_by", input->plan(), {input.get()},
+
acero::OrderByNodeOptions{std::dynamic_pointer_cast<compute::SortOptions>(
+ make_compute_options("sort_indices",
sort_options))
+ ->AsOrdering()});
+}
+
// [[acero::export]]
std::shared_ptr<acero::ExecNode> ExecNode_SourceNode(
const std::shared_ptr<acero::ExecPlan>& plan,
diff --git a/r/tests/testthat/test-dataset-dplyr.R
b/r/tests/testthat/test-dataset-dplyr.R
index e20a6262b7..b8d9384192 100644
--- a/r/tests/testthat/test-dataset-dplyr.R
+++ b/r/tests/testthat/test-dataset-dplyr.R
@@ -397,22 +397,11 @@ test_that("show_exec_plan(), show_query() and explain()
with datasets", {
show_exec_plan(),
regexp = paste0(
"ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
- "OrderBySinkNode.*chr.*ASC.*", # arrange goes via the OrderBy sink node
+ "OrderByNode.*chr.*ASC.*", # arrange goes via the OrderBy node
"ProjectNode.*", # output columns
"FilterNode.*", # filter node
"filter=lgl.*", # filtering expression
"SourceNode" # entry point
)
)
-
- # printing the ExecPlan for a nested query would currently force the
- # evaluation of the inner one(s), which we want to avoid => no output
- expect_warning(
- ds %>%
- filter(lgl) %>%
- arrange(chr) %>%
- head() %>%
- show_exec_plan(),
- "The `ExecPlan` cannot be printed for a nested query."
- )
})
diff --git a/r/tests/testthat/test-dplyr-collapse.R
b/r/tests/testthat/test-dplyr-collapse.R
index 198827e235..a8aa5556f1 100644
--- a/r/tests/testthat/test-dplyr-collapse.R
+++ b/r/tests/testthat/test-dplyr-collapse.R
@@ -167,6 +167,7 @@ lgl: bool
total: int64
extra: int64 (multiply_checked(total, 5))
+* Sorted by lgl [asc]
See $.data for the source Arrow object",
fixed = TRUE
)
diff --git a/r/tests/testthat/test-dplyr-query.R
b/r/tests/testthat/test-dplyr-query.R
index 5dbdb0e522..e478d0e4c4 100644
--- a/r/tests/testthat/test-dplyr-query.R
+++ b/r/tests/testthat/test-dplyr-query.R
@@ -180,58 +180,26 @@ test_that("compute()", {
})
test_that("head", {
- batch <- record_batch(tbl)
-
- b2 <- batch %>%
- select(int, chr) %>%
- filter(int > 5) %>%
- head(2)
- expect_s3_class(b2, "arrow_dplyr_query")
- expected <- tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")][1:2, ]
- expect_equal(collect(b2), expected)
-
- b3 <- batch %>%
- select(int, strng = chr) %>%
- filter(int > 5) %>%
- head(2)
- expect_s3_class(b3, "arrow_dplyr_query")
- expect_equal(as.data.frame(b3), set_names(expected, c("int", "strng")))
-
- b4 <- batch %>%
- select(int, strng = chr) %>%
- filter(int > 5) %>%
- group_by(int) %>%
- head(2)
- expect_s3_class(b4, "arrow_dplyr_query")
- expect_equal(
- as.data.frame(b4),
- expected %>%
- rename(strng = chr) %>%
- group_by(int)
- )
-
- expect_equal(
- batch %>%
+ compare_dplyr_binding(
+ .input %>%
select(int, strng = chr) %>%
filter(int > 5) %>%
+ group_by(int) %>%
head(2) %>%
- mutate(twice = int * 2) %>%
collect(),
- expected %>%
- rename(strng = chr) %>%
- mutate(twice = int * 2)
+ tbl
)
# This would fail if we evaluated head() after filter()
- expect_equal(
- batch %>%
+ compare_dplyr_binding(
+ .input %>%
select(int, strng = chr) %>%
+ arrange(int) %>%
head(2) %>%
filter(int > 5) %>%
+ mutate(twice = int * 2) %>%
collect(),
- expected %>%
- rename(strng = chr) %>%
- filter(FALSE)
+ tbl
)
})
@@ -260,38 +228,25 @@ test_that("arrange then tail returns the right data", {
})
test_that("tail", {
- batch <- record_batch(tbl)
-
- b2 <- batch %>%
- select(int, chr) %>%
- filter(int > 5) %>%
- arrange(int) %>%
- tail(2)
-
- expect_s3_class(b2, "arrow_dplyr_query")
- expected <- tail(tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")], 2)
- expect_equal(as.data.frame(b2), expected)
-
- b3 <- batch %>%
- select(int, strng = chr) %>%
- filter(int > 5) %>%
- arrange(int) %>%
- tail(2)
- expect_s3_class(b3, "arrow_dplyr_query")
- expect_equal(as.data.frame(b3), set_names(expected, c("int", "strng")))
-
- b4 <- batch %>%
- select(int, strng = chr) %>%
- filter(int > 5) %>%
- group_by(int) %>%
- arrange(int) %>%
- tail(2)
- expect_s3_class(b4, "arrow_dplyr_query")
- expect_equal(
- as.data.frame(b4),
- expected %>%
- rename(strng = chr) %>%
- group_by(int)
+ # With sorting
+ compare_dplyr_binding(
+ .input %>%
+ select(int, chr) %>%
+ filter(int < 5) %>%
+ arrange(int) %>%
+ tail(2) %>%
+ collect(),
+ tbl
+ )
+ # Without sorting: table order is implicit, and we can compute the filter
+ # row length, so the query can use Fetch with offset
+ compare_dplyr_binding(
+ .input %>%
+ select(int, chr) %>%
+ filter(int < 5) %>%
+ tail(2) %>%
+ collect(),
+ tbl
)
})
@@ -550,7 +505,7 @@ test_that("show_exec_plan(), show_query() and explain()", {
show_exec_plan(),
regexp = paste0(
"ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
- "OrderBySinkNode.*wt.*DESC.*", # arrange goes via the OrderBy sink node
+ "OrderBy.*wt.*DESC.*", # arrange goes via the OrderBy node
"FilterNode.*", # filter node
"TableSourceNode.*" # entry point
)
@@ -558,14 +513,19 @@ test_that("show_exec_plan(), show_query() and explain()",
{
# printing the ExecPlan for a nested query would currently force the
# evaluation of the inner one(s), which we want to avoid => no output
- expect_warning(
+ expect_output(
mtcars %>%
arrow_table() %>%
filter(mpg > 20) %>%
- arrange(desc(wt)) %>%
head(3) %>%
show_exec_plan(),
- "The `ExecPlan` cannot be printed for a nested query."
+ paste0(
+ "ExecPlan with 4 nodes:.*",
+ "3:SinkNode.*",
+ "2:FetchNode.offset=0 count=3.*",
+ "1:FilterNode.filter=.mpg > 20.*",
+ "0:TableSourceNode.*"
+ )
)
})
diff --git a/r/tests/testthat/test-dplyr-summarize.R
b/r/tests/testthat/test-dplyr-summarize.R
index 1b834df19f..3eb1a6ed2b 100644
--- a/r/tests/testthat/test-dplyr-summarize.R
+++ b/r/tests/testthat/test-dplyr-summarize.R
@@ -616,17 +616,21 @@ test_that("min() and max() on character strings", {
collect(),
tbl,
)
- compare_dplyr_binding(
- .input %>%
- group_by(fct) %>%
- summarize(
- min_chr = min(chr, na.rm = TRUE),
- max_chr = max(chr, na.rm = TRUE)
- ) %>%
- arrange(min_chr) %>%
- collect(),
- tbl,
- )
+ withr::with_options(list(arrow.summarise.sort = FALSE), {
+ # TODO(#29887 / ARROW-14313) sorting on dictionary columns not supported
+ # so turn off arrow.summarise.sort so that we don't order_by fct after
summarize
+ compare_dplyr_binding(
+ .input %>%
+ group_by(fct) %>%
+ summarize(
+ min_chr = min(chr, na.rm = TRUE),
+ max_chr = max(chr, na.rm = TRUE)
+ ) %>%
+ arrange(min_chr) %>%
+ collect(),
+ tbl,
+ )
+ })
})
test_that("summarise() with !!sym()", {