jonkeane commented on a change in pull request #10992: URL: https://github.com/apache/arrow/pull/10992#discussion_r698714337
########## File path: r/R/query-engine.R ########## @@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan", } # ScanNode needs the filter to do predicate pushdown and skip partitions, # and it needs to know which fields to materialize (and which are unnecessary) - ExecNode_Scan(self, dataset, filter, colnames) + ExecNode_Scan(self, dataset, filter, colnames %||% character(0)) + }, + Build = function(.data) { + # This method takes an arrow_dplyr_query and chains together the + # ExecNodes that they produce. It does not evaluate them--that is Run(). + group_vars <- dplyr::group_vars(.data) + grouped <- length(group_vars) > 0 + + # Collect the target names first because we have to add back the group vars + target_names <- names(.data) + .data <- ensure_group_vars(.data) + .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns + + node <- self$Scan(.data) + # ARROW-13498: Even though Scan takes the filter, apparently we have to do it again + if (inherits(.data$filtered_rows, "Expression")) { + node <- node$Filter(.data$filtered_rows) + } + # If any columns are derived we need to Project (otherwise this may be no-op) + node <- node$Project(c(.data$selected_columns, .data$temp_columns)) + + if (length(.data$aggregations)) { + if (grouped) { + # We need to prefix all of the aggregation function names with "hash_" + .data$aggregations <- lapply(.data$aggregations, function(x) { + x[["fun"]] <- paste0("hash_", x[["fun"]]) + x + }) + } + + node <- node$Aggregate( + options = .data$aggregations, + target_names = target_names, + out_field_names = names(.data$aggregations), + key_names = group_vars + ) + + if (grouped) { + # The result will have result columns first then the grouping cols. + # dplyr orders group cols first, so adapt the result to meet that expectation. Review comment: The code on lines 109-111 reads to me about the order of the columns, not the order of the rows — maybe I'm missing something or there was code that did change the order of the rows at some point? ########## File path: r/R/query-engine.R ########## @@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan", } # ScanNode needs the filter to do predicate pushdown and skip partitions, # and it needs to know which fields to materialize (and which are unnecessary) - ExecNode_Scan(self, dataset, filter, colnames) + ExecNode_Scan(self, dataset, filter, colnames %||% character(0)) + }, + Build = function(.data) { + # This method takes an arrow_dplyr_query and chains together the + # ExecNodes that they produce. It does not evaluate them--that is Run(). + group_vars <- dplyr::group_vars(.data) + grouped <- length(group_vars) > 0 + + # Collect the target names first because we have to add back the group vars + target_names <- names(.data) + .data <- ensure_group_vars(.data) + .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns + + node <- self$Scan(.data) + # ARROW-13498: Even though Scan takes the filter, apparently we have to do it again + if (inherits(.data$filtered_rows, "Expression")) { + node <- node$Filter(.data$filtered_rows) + } + # If any columns are derived we need to Project (otherwise this may be no-op) + node <- node$Project(c(.data$selected_columns, .data$temp_columns)) + + if (length(.data$aggregations)) { + if (grouped) { + # We need to prefix all of the aggregation function names with "hash_" + .data$aggregations <- lapply(.data$aggregations, function(x) { + x[["fun"]] <- paste0("hash_", x[["fun"]]) + x + }) + } + + node <- node$Aggregate( + options = .data$aggregations, + target_names = target_names, + out_field_names = names(.data$aggregations), + key_names = group_vars + ) + + if (grouped) { + # The result will have result columns first then the grouping cols. + # dplyr orders group cols first, so adapt the result to meet that expectation. Review comment: Oh, nevermind I see lines 114-117 do that sorting. I should have read lower and not stopped at the comment 🤦 ########## File path: r/R/query-engine.R ########## @@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan", } # ScanNode needs the filter to do predicate pushdown and skip partitions, # and it needs to know which fields to materialize (and which are unnecessary) - ExecNode_Scan(self, dataset, filter, colnames) + ExecNode_Scan(self, dataset, filter, colnames %||% character(0)) + }, + Build = function(.data) { + # This method takes an arrow_dplyr_query and chains together the + # ExecNodes that they produce. It does not evaluate them--that is Run(). + group_vars <- dplyr::group_vars(.data) + grouped <- length(group_vars) > 0 + + # Collect the target names first because we have to add back the group vars + target_names <- names(.data) + .data <- ensure_group_vars(.data) + .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns + + node <- self$Scan(.data) + # ARROW-13498: Even though Scan takes the filter, apparently we have to do it again + if (inherits(.data$filtered_rows, "Expression")) { + node <- node$Filter(.data$filtered_rows) + } + # If any columns are derived we need to Project (otherwise this may be no-op) + node <- node$Project(c(.data$selected_columns, .data$temp_columns)) + + if (length(.data$aggregations)) { + if (grouped) { + # We need to prefix all of the aggregation function names with "hash_" + .data$aggregations <- lapply(.data$aggregations, function(x) { + x[["fun"]] <- paste0("hash_", x[["fun"]]) + x + }) + } + + node <- node$Aggregate( + options = .data$aggregations, + target_names = target_names, + out_field_names = names(.data$aggregations), + key_names = group_vars + ) + + if (grouped) { + # The result will have result columns first then the grouping cols. + # dplyr orders group cols first, so adapt the result to meet that expectation. Review comment: I agree with Ian that an option (with the default being no sort) would be good here. There's also some (very recent) discussion about removing this in dplyr (if there's a new edition) https://github.com/tidyverse/dplyr/issues/5664#issuecomment-907232443 ########## File path: r/R/query-engine.R ########## @@ -42,11 +55,73 @@ ExecPlan <- R6Class("ExecPlan", } # ScanNode needs the filter to do predicate pushdown and skip partitions, # and it needs to know which fields to materialize (and which are unnecessary) - ExecNode_Scan(self, dataset, filter, colnames) + ExecNode_Scan(self, dataset, filter, colnames %||% character(0)) + }, + Build = function(.data) { + # This method takes an arrow_dplyr_query and chains together the + # ExecNodes that they produce. It does not evaluate them--that is Run(). + group_vars <- dplyr::group_vars(.data) + grouped <- length(group_vars) > 0 + + # Collect the target names first because we have to add back the group vars + target_names <- names(.data) + .data <- ensure_group_vars(.data) + .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns + + node <- self$Scan(.data) + # ARROW-13498: Even though Scan takes the filter, apparently we have to do it again + if (inherits(.data$filtered_rows, "Expression")) { + node <- node$Filter(.data$filtered_rows) + } + # If any columns are derived we need to Project (otherwise this may be no-op) + node <- node$Project(c(.data$selected_columns, .data$temp_columns)) + + if (length(.data$aggregations)) { + if (grouped) { + # We need to prefix all of the aggregation function names with "hash_" + .data$aggregations <- lapply(.data$aggregations, function(x) { + x[["fun"]] <- paste0("hash_", x[["fun"]]) + x + }) + } + + node <- node$Aggregate( + options = .data$aggregations, + target_names = target_names, + out_field_names = names(.data$aggregations), + key_names = group_vars + ) + + if (grouped) { + # The result will have result columns first then the grouping cols. + # dplyr orders group cols first, so adapt the result to meet that expectation. Review comment: I agree with Ian that an option (with the default being no sort) would be good here. There's also some (very recent) discussion about changing this behavior in dplyr (if there's a new edition) https://github.com/tidyverse/dplyr/issues/5664#issuecomment-907232443 ########## File path: r/R/dplyr-collect.R ########## @@ -47,16 +36,69 @@ collect.ArrowTabular <- function(x, as_data_frame = TRUE, ...) { x } } -collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...) +collect.Dataset <- function(x, ...) dplyr::collect(as_adq(x), ...) compute.arrow_dplyr_query <- function(x, ...) dplyr::collect(x, as_data_frame = FALSE) compute.ArrowTabular <- function(x, ...) x compute.Dataset <- compute.arrow_dplyr_query pull.arrow_dplyr_query <- function(.data, var = -1) { - .data <- arrow_dplyr_query(.data) + .data <- as_adq(.data) var <- vars_pull(names(.data), !!enquo(var)) .data$selected_columns <- set_names(.data$selected_columns[var], var) dplyr::collect(.data)[[1]] } pull.Dataset <- pull.ArrowTabular <- pull.arrow_dplyr_query + +# TODO: Correctly handle group_vars after summarize; also in collapse() +restore_dplyr_features <- function(df, query) { + # An arrow_dplyr_query holds some attributes that Arrow doesn't know about + # After calling collect(), make sure these features are carried over + + if (length(query$group_by_vars) > 0) { + # Preserve groupings, if present + if (is.data.frame(df)) { + df <- dplyr::grouped_df( + df, + dplyr::group_vars(query), + drop = dplyr::group_by_drop_default(query) + ) + } else { + # This is a Table, via compute() or collect(as_data_frame = FALSE) + df <- as_adq(df) + df$group_by_vars <- query$group_by_vars + df$drop_empty_groups <- query$drop_empty_groups + } + } + df +} + +collapse.arrow_dplyr_query <- function(x, ...) { + # Figure out what schema will result from the query + x$schema <- implicit_schema(x) + # Nest inside a new arrow_dplyr_query + arrow_dplyr_query(x) +} Review comment: Am I reading this right that what this does is figures out the resultant schema, and then wrap another `arrow_dplyr_query` layer around the resultant object? Am I missing something else that it's doing? Are we thinking about expanding this more in the future? ########## File path: r/R/dplyr-collect.R ########## @@ -47,16 +36,69 @@ collect.ArrowTabular <- function(x, as_data_frame = TRUE, ...) { x } } -collect.Dataset <- function(x, ...) dplyr::collect(arrow_dplyr_query(x), ...) +collect.Dataset <- function(x, ...) dplyr::collect(as_adq(x), ...) compute.arrow_dplyr_query <- function(x, ...) dplyr::collect(x, as_data_frame = FALSE) compute.ArrowTabular <- function(x, ...) x compute.Dataset <- compute.arrow_dplyr_query pull.arrow_dplyr_query <- function(.data, var = -1) { - .data <- arrow_dplyr_query(.data) + .data <- as_adq(.data) var <- vars_pull(names(.data), !!enquo(var)) .data$selected_columns <- set_names(.data$selected_columns[var], var) dplyr::collect(.data)[[1]] } pull.Dataset <- pull.ArrowTabular <- pull.arrow_dplyr_query + +# TODO: Correctly handle group_vars after summarize; also in collapse() +restore_dplyr_features <- function(df, query) { + # An arrow_dplyr_query holds some attributes that Arrow doesn't know about + # After calling collect(), make sure these features are carried over + + if (length(query$group_by_vars) > 0) { + # Preserve groupings, if present + if (is.data.frame(df)) { + df <- dplyr::grouped_df( + df, + dplyr::group_vars(query), + drop = dplyr::group_by_drop_default(query) + ) + } else { + # This is a Table, via compute() or collect(as_data_frame = FALSE) + df <- as_adq(df) + df$group_by_vars <- query$group_by_vars + df$drop_empty_groups <- query$drop_empty_groups + } + } + df +} + +collapse.arrow_dplyr_query <- function(x, ...) { + # Figure out what schema will result from the query + x$schema <- implicit_schema(x) + # Nest inside a new arrow_dplyr_query + arrow_dplyr_query(x) +} Review comment: > It's really about making sure the order of ExecNodes stays faithful to the user's request. That's the bit I was missing / didn't connect in. That makes sense — and now I see how this is similar to the d(b)plyr version that forces the sql generation + makes a subquery from that (though I had to RTFS across both packages to put all that together). On first read I was expecting that `collapse` might do some sort of optimization step/blend together things inside of it — but that's definitely not what the `dplyr` versions do either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org