jonkeane commented on a change in pull request #10992:
URL: https://github.com/apache/arrow/pull/10992#discussion_r698714337



##########
File path: r/R/query-engine.R
##########
@@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan",
       }
       # ScanNode needs the filter to do predicate pushdown and skip partitions,
       # and it needs to know which fields to materialize (and which are 
unnecessary)
-      ExecNode_Scan(self, dataset, filter, colnames)
+      ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
+    },
+    Build = function(.data) {
+      # This method takes an arrow_dplyr_query and chains together the
+      # ExecNodes that they produce. It does not evaluate them--that is Run().
+      group_vars <- dplyr::group_vars(.data)
+      grouped <- length(group_vars) > 0
+
+      # Collect the target names first because we have to add back the group 
vars
+      target_names <- names(.data)
+      .data <- ensure_group_vars(.data)
+      .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns
+
+      node <- self$Scan(.data)
+      # ARROW-13498: Even though Scan takes the filter, apparently we have to 
do it again
+      if (inherits(.data$filtered_rows, "Expression")) {
+        node <- node$Filter(.data$filtered_rows)
+      }
+      # If any columns are derived we need to Project (otherwise this may be 
no-op)
+      node <- node$Project(c(.data$selected_columns, .data$temp_columns))
+
+      if (length(.data$aggregations)) {
+        if (grouped) {
+          # We need to prefix all of the aggregation function names with 
"hash_"
+          .data$aggregations <- lapply(.data$aggregations, function(x) {
+            x[["fun"]] <- paste0("hash_", x[["fun"]])
+            x
+          })
+        }
+
+        node <- node$Aggregate(
+          options = .data$aggregations,
+          target_names = target_names,
+          out_field_names = names(.data$aggregations),
+          key_names = group_vars
+        )
+
+        if (grouped) {
+          # The result will have result columns first then the grouping cols.
+          # dplyr orders group cols first, so adapt the result to meet that 
expectation.

Review comment:
       The code on lines 109-111 reads to me about the order of the columns, 
not the order of the rows — maybe I'm missing something or there was code that 
did change the order of the rows at some point?

##########
File path: r/R/query-engine.R
##########
@@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan",
       }
       # ScanNode needs the filter to do predicate pushdown and skip partitions,
       # and it needs to know which fields to materialize (and which are 
unnecessary)
-      ExecNode_Scan(self, dataset, filter, colnames)
+      ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
+    },
+    Build = function(.data) {
+      # This method takes an arrow_dplyr_query and chains together the
+      # ExecNodes that they produce. It does not evaluate them--that is Run().
+      group_vars <- dplyr::group_vars(.data)
+      grouped <- length(group_vars) > 0
+
+      # Collect the target names first because we have to add back the group 
vars
+      target_names <- names(.data)
+      .data <- ensure_group_vars(.data)
+      .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns
+
+      node <- self$Scan(.data)
+      # ARROW-13498: Even though Scan takes the filter, apparently we have to 
do it again
+      if (inherits(.data$filtered_rows, "Expression")) {
+        node <- node$Filter(.data$filtered_rows)
+      }
+      # If any columns are derived we need to Project (otherwise this may be 
no-op)
+      node <- node$Project(c(.data$selected_columns, .data$temp_columns))
+
+      if (length(.data$aggregations)) {
+        if (grouped) {
+          # We need to prefix all of the aggregation function names with 
"hash_"
+          .data$aggregations <- lapply(.data$aggregations, function(x) {
+            x[["fun"]] <- paste0("hash_", x[["fun"]])
+            x
+          })
+        }
+
+        node <- node$Aggregate(
+          options = .data$aggregations,
+          target_names = target_names,
+          out_field_names = names(.data$aggregations),
+          key_names = group_vars
+        )
+
+        if (grouped) {
+          # The result will have result columns first then the grouping cols.
+          # dplyr orders group cols first, so adapt the result to meet that 
expectation.

Review comment:
       Oh, nevermind I see lines 114-117 do that sorting. I should have read 
lower and not stopped at the comment 🤦 

##########
File path: r/R/query-engine.R
##########
@@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan",
       }
       # ScanNode needs the filter to do predicate pushdown and skip partitions,
       # and it needs to know which fields to materialize (and which are 
unnecessary)
-      ExecNode_Scan(self, dataset, filter, colnames)
+      ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
+    },
+    Build = function(.data) {
+      # This method takes an arrow_dplyr_query and chains together the
+      # ExecNodes that they produce. It does not evaluate them--that is Run().
+      group_vars <- dplyr::group_vars(.data)
+      grouped <- length(group_vars) > 0
+
+      # Collect the target names first because we have to add back the group 
vars
+      target_names <- names(.data)
+      .data <- ensure_group_vars(.data)
+      .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns
+
+      node <- self$Scan(.data)
+      # ARROW-13498: Even though Scan takes the filter, apparently we have to 
do it again
+      if (inherits(.data$filtered_rows, "Expression")) {
+        node <- node$Filter(.data$filtered_rows)
+      }
+      # If any columns are derived we need to Project (otherwise this may be 
no-op)
+      node <- node$Project(c(.data$selected_columns, .data$temp_columns))
+
+      if (length(.data$aggregations)) {
+        if (grouped) {
+          # We need to prefix all of the aggregation function names with 
"hash_"
+          .data$aggregations <- lapply(.data$aggregations, function(x) {
+            x[["fun"]] <- paste0("hash_", x[["fun"]])
+            x
+          })
+        }
+
+        node <- node$Aggregate(
+          options = .data$aggregations,
+          target_names = target_names,
+          out_field_names = names(.data$aggregations),
+          key_names = group_vars
+        )
+
+        if (grouped) {
+          # The result will have result columns first then the grouping cols.
+          # dplyr orders group cols first, so adapt the result to meet that 
expectation.

Review comment:
       I agree with Ian that an option (with the default being no sort) would 
be good here. 
   
   There's also some (very recent) discussion about removing this in dplyr (if 
there's a new edition) 
https://github.com/tidyverse/dplyr/issues/5664#issuecomment-907232443

##########
File path: r/R/query-engine.R
##########
@@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan",
       }
       # ScanNode needs the filter to do predicate pushdown and skip partitions,
       # and it needs to know which fields to materialize (and which are 
unnecessary)
-      ExecNode_Scan(self, dataset, filter, colnames)
+      ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
+    },
+    Build = function(.data) {
+      # This method takes an arrow_dplyr_query and chains together the
+      # ExecNodes that they produce. It does not evaluate them--that is Run().
+      group_vars <- dplyr::group_vars(.data)
+      grouped <- length(group_vars) > 0
+
+      # Collect the target names first because we have to add back the group 
vars
+      target_names <- names(.data)
+      .data <- ensure_group_vars(.data)
+      .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns
+
+      node <- self$Scan(.data)
+      # ARROW-13498: Even though Scan takes the filter, apparently we have to 
do it again
+      if (inherits(.data$filtered_rows, "Expression")) {
+        node <- node$Filter(.data$filtered_rows)
+      }
+      # If any columns are derived we need to Project (otherwise this may be 
no-op)
+      node <- node$Project(c(.data$selected_columns, .data$temp_columns))
+
+      if (length(.data$aggregations)) {
+        if (grouped) {
+          # We need to prefix all of the aggregation function names with 
"hash_"
+          .data$aggregations <- lapply(.data$aggregations, function(x) {
+            x[["fun"]] <- paste0("hash_", x[["fun"]])
+            x
+          })
+        }
+
+        node <- node$Aggregate(
+          options = .data$aggregations,
+          target_names = target_names,
+          out_field_names = names(.data$aggregations),
+          key_names = group_vars
+        )
+
+        if (grouped) {
+          # The result will have result columns first then the grouping cols.
+          # dplyr orders group cols first, so adapt the result to meet that 
expectation.

Review comment:
       I agree with Ian that an option (with the default being no sort) would 
be good here. 
   
   There's also some (very recent) discussion about changing this behavior in 
dplyr (if there's a new edition) 
https://github.com/tidyverse/dplyr/issues/5664#issuecomment-907232443

##########
File path: r/R/dplyr-collect.R
##########
@@ -47,16 +36,69 @@ collect.ArrowTabular <- function(x, as_data_frame = TRUE, 
...) {
     x
   }
 }
-collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)
+collect.Dataset <- function(x, ...) dplyr::collect(as_adq(x), ...)
 
 compute.arrow_dplyr_query <- function(x, ...) dplyr::collect(x, as_data_frame 
= FALSE)
 compute.ArrowTabular <- function(x, ...) x
 compute.Dataset <- compute.arrow_dplyr_query
 
 pull.arrow_dplyr_query <- function(.data, var = -1) {
-  .data <- arrow_dplyr_query(.data)
+  .data <- as_adq(.data)
   var <- vars_pull(names(.data), !!enquo(var))
   .data$selected_columns <- set_names(.data$selected_columns[var], var)
   dplyr::collect(.data)[[1]]
 }
 pull.Dataset <- pull.ArrowTabular <- pull.arrow_dplyr_query
+
+# TODO: Correctly handle group_vars after summarize; also in collapse()
+restore_dplyr_features <- function(df, query) {
+  # An arrow_dplyr_query holds some attributes that Arrow doesn't know about
+  # After calling collect(), make sure these features are carried over
+
+  if (length(query$group_by_vars) > 0) {
+    # Preserve groupings, if present
+    if (is.data.frame(df)) {
+      df <- dplyr::grouped_df(
+        df,
+        dplyr::group_vars(query),
+        drop = dplyr::group_by_drop_default(query)
+      )
+    } else {
+      # This is a Table, via compute() or collect(as_data_frame = FALSE)
+      df <- as_adq(df)
+      df$group_by_vars <- query$group_by_vars
+      df$drop_empty_groups <- query$drop_empty_groups
+    }
+  }
+  df
+}
+
+collapse.arrow_dplyr_query <- function(x, ...) {
+  # Figure out what schema will result from the query
+  x$schema <- implicit_schema(x)
+  # Nest inside a new arrow_dplyr_query
+  arrow_dplyr_query(x)
+}

Review comment:
       Am I reading this right that what this does is figures out the resultant 
schema, and then wrap another `arrow_dplyr_query` layer around the resultant 
object? Am I missing something else that it's doing? Are we thinking about 
expanding this more in the future?

##########
File path: r/R/dplyr-collect.R
##########
@@ -47,16 +36,69 @@ collect.ArrowTabular <- function(x, as_data_frame = TRUE, 
...) {
     x
   }
 }
-collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...)
+collect.Dataset <- function(x, ...) dplyr::collect(as_adq(x), ...)
 
 compute.arrow_dplyr_query <- function(x, ...) dplyr::collect(x, as_data_frame 
= FALSE)
 compute.ArrowTabular <- function(x, ...) x
 compute.Dataset <- compute.arrow_dplyr_query
 
 pull.arrow_dplyr_query <- function(.data, var = -1) {
-  .data <- arrow_dplyr_query(.data)
+  .data <- as_adq(.data)
   var <- vars_pull(names(.data), !!enquo(var))
   .data$selected_columns <- set_names(.data$selected_columns[var], var)
   dplyr::collect(.data)[[1]]
 }
 pull.Dataset <- pull.ArrowTabular <- pull.arrow_dplyr_query
+
+# TODO: Correctly handle group_vars after summarize; also in collapse()
+restore_dplyr_features <- function(df, query) {
+  # An arrow_dplyr_query holds some attributes that Arrow doesn't know about
+  # After calling collect(), make sure these features are carried over
+
+  if (length(query$group_by_vars) > 0) {
+    # Preserve groupings, if present
+    if (is.data.frame(df)) {
+      df <- dplyr::grouped_df(
+        df,
+        dplyr::group_vars(query),
+        drop = dplyr::group_by_drop_default(query)
+      )
+    } else {
+      # This is a Table, via compute() or collect(as_data_frame = FALSE)
+      df <- as_adq(df)
+      df$group_by_vars <- query$group_by_vars
+      df$drop_empty_groups <- query$drop_empty_groups
+    }
+  }
+  df
+}
+
+collapse.arrow_dplyr_query <- function(x, ...) {
+  # Figure out what schema will result from the query
+  x$schema <- implicit_schema(x)
+  # Nest inside a new arrow_dplyr_query
+  arrow_dplyr_query(x)
+}

Review comment:
       >  It's really about making sure the order of ExecNodes stays faithful 
to the user's request.
   
   That's the bit I was missing / didn't connect in. That makes sense — and now 
I see how this is similar to the d(b)plyr version that forces the sql 
generation + makes a subquery from that (though I had to RTFS across both 
packages to put all that together). 
   
   On first read I was expecting that `collapse` might do some sort of 
optimization step/blend together things inside of it — but that's definitely 
not what the `dplyr` versions do either. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to