This is an automated email from the ASF dual-hosted git repository. npr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new d0a7fb9403 GH-33760: [R][C++] Handle nested field refs in scanner (#33770) d0a7fb9403 is described below commit d0a7fb9403a904b7850517c745c3925695d8658d Author: Neal Richardson <neal.p.richard...@gmail.com> AuthorDate: Tue Jan 24 11:56:30 2023 -0500 GH-33760: [R][C++] Handle nested field refs in scanner (#33770) ### Rationale for this change Followup to https://github.com/apache/arrow/pull/19706/files#r1073391100 with the goal of deleting and simplifying some code. As it turned out, it was more about moving code from the R bindings to the C++ library. ### Are there any user-facing changes? Not for R users, but this fixes a bug in the dataset C++ library where nested field refs could not be handled by the scanner. * Closes: #33760 Authored-by: Neal Richardson <neal.p.richard...@gmail.com> Signed-off-by: Neal Richardson <neal.p.richard...@gmail.com> --- cpp/src/arrow/dataset/scanner.cc | 23 ++++++++++++++++------- r/R/arrowExports.R | 9 +++------ r/R/query-engine.R | 10 +++------- r/src/arrowExports.cpp | 19 +++++-------------- r/src/compute-exec.cpp | 23 +++++++++++++---------- r/src/expression.cpp | 19 ------------------- 6 files changed, 40 insertions(+), 63 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index f307787357..bc8feec96d 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -22,6 +22,7 @@ #include <memory> #include <mutex> #include <numeric> +#include <set> #include <sstream> #include "arrow/array/array_primitive.h" @@ -135,6 +136,7 @@ Result<std::shared_ptr<Schema>> GetProjectedSchemaFromExpression( const std::shared_ptr<Schema>& dataset_schema) { // process resultant dataset_schema after projection FieldVector project_fields; + std::set<std::string> field_names; if (auto call = projection.call()) { if (call->function_name != "make_struct") { return Status::Invalid("Top level projection expression call must be make_struct"); @@ -142,13 +144,11 @@ Result<std::shared_ptr<Schema>> GetProjectedSchemaFromExpression( for (const compute::Expression& arg : call->arguments) { if (auto field_ref = arg.field_ref()) { if (field_ref->IsName()) { - auto field = dataset_schema->GetFieldByName(*field_ref->name()); - if (field) { - project_fields.push_back(std::move(field)); - } - // if the field is not present in the schema we ignore it. - // the case is if kAugmentedFields are present in the expression - // and if they are not present in the provided schema, we ignore them. + field_names.emplace(*field_ref->name()); + } else if (field_ref->IsNested()) { + // We keep the top-level field name. + auto nested_field_refs = *field_ref->nested_refs(); + field_names.emplace(*nested_field_refs[0].name()); } else { return Status::Invalid( "No projected schema was supplied and we could not infer the projected " @@ -157,6 +157,15 @@ Result<std::shared_ptr<Schema>> GetProjectedSchemaFromExpression( } } } + for (auto f : field_names) { + auto field = dataset_schema->GetFieldByName(f); + if (field) { + // if the field is not present in the schema we ignore it. + // the case is if kAugmentedFields are present in the expression + // and if they are not present in the provided schema, we ignore them. + project_fields.push_back(std::move(field)); + } + } return schema(project_fields); } diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 2eeca24dbd..5e807fbab1 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -460,8 +460,8 @@ ExecNode_output_schema <- function(node) { .Call(`_arrow_ExecNode_output_schema`, node) } -ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { - .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names) +ExecNode_Scan <- function(plan, dataset, filter, projection) { + .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection) } ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { @@ -1088,10 +1088,6 @@ compute___expr__is_field_ref <- function(x) { .Call(`_arrow_compute___expr__is_field_ref`, x) } -field_names_in_expression <- function(x) { - .Call(`_arrow_field_names_in_expression`, x) -} - compute___expr__get_field_ref_name <- function(x) { .Call(`_arrow_compute___expr__get_field_ref_name`, x) } @@ -2095,3 +2091,4 @@ SetIOThreadPoolCapacity <- function(threads) { Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } + diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 2f0b421fae..7a336b7a07 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -34,11 +34,7 @@ ExecPlan <- R6Class("ExecPlan", if (isTRUE(filter)) { filter <- Expression$scalar(TRUE) } - # Use FieldsInExpression to find all from dataset$selected_columns - colnames <- unique(unlist(map( - dataset$selected_columns, - field_names_in_expression - ))) + projection <- dataset$selected_columns dataset <- dataset$.data assert_is(dataset, "Dataset") } else { @@ -46,10 +42,10 @@ ExecPlan <- R6Class("ExecPlan", # Just a dataset, not a query, so there's no predicates to push down # so set some defaults filter <- Expression$scalar(TRUE) - colnames <- names(dataset) + projection <- make_field_refs(colnames) } - out <- ExecNode_Scan(self, dataset, filter, colnames %||% character(0)) + out <- ExecNode_Scan(self, dataset, filter, projection) # Hold onto the source data's schema so we can preserve schema metadata # in the resulting Scan/Write out$extras$source_schema <- dataset$schema diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index e918390e26..dade762683 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -990,18 +990,18 @@ END_CPP11 } // compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names); -extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ +std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, cpp11::list projection); +extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP projection_sexp){ BEGIN_CPP11 arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp); arrow::r::Input<const std::shared_ptr<ds::Dataset>&>::type dataset(dataset_sexp); arrow::r::Input<const std::shared_ptr<compute::Expression>&>::type filter(filter_sexp); - arrow::r::Input<std::vector<std::string>>::type materialized_field_names(materialized_field_names_sexp); - return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, materialized_field_names)); + arrow::r::Input<cpp11::list>::type projection(projection_sexp); + return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, projection)); END_CPP11 } #else -extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ +extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP projection_sexp){ Rf_error("Cannot call ExecNode_Scan(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -2740,14 +2740,6 @@ BEGIN_CPP11 END_CPP11 } // expression.cpp -std::vector<std::string> field_names_in_expression(const std::shared_ptr<compute::Expression>& x); -extern "C" SEXP _arrow_field_names_in_expression(SEXP x_sexp){ -BEGIN_CPP11 - arrow::r::Input<const std::shared_ptr<compute::Expression>&>::type x(x_sexp); - return cpp11::as_sexp(field_names_in_expression(x)); -END_CPP11 -} -// expression.cpp std::string compute___expr__get_field_ref_name(const std::shared_ptr<compute::Expression>& x); extern "C" SEXP _arrow_compute___expr__get_field_ref_name(SEXP x_sexp){ BEGIN_CPP11 @@ -5587,7 +5579,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_compute___expr__equals", (DL_FUNC) &_arrow_compute___expr__equals, 2}, { "_arrow_compute___expr__call", (DL_FUNC) &_arrow_compute___expr__call, 3}, { "_arrow_compute___expr__is_field_ref", (DL_FUNC) &_arrow_compute___expr__is_field_ref, 1}, - { "_arrow_field_names_in_expression", (DL_FUNC) &_arrow_field_names_in_expression, 1}, { "_arrow_compute___expr__get_field_ref_name", (DL_FUNC) &_arrow_compute___expr__get_field_ref_name, 1}, { "_arrow_compute___expr__field_ref", (DL_FUNC) &_arrow_compute___expr__field_ref, 1}, { "_arrow_compute___expr__nested_field_ref", (DL_FUNC) &_arrow_compute___expr__nested_field_ref, 2}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 64ea6f5b5e..2a2e509c23 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -288,28 +288,31 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema( std::shared_ptr<compute::ExecNode> ExecNode_Scan( const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& dataset, - const std::shared_ptr<compute::Expression>& filter, - std::vector<std::string> materialized_field_names) { + const std::shared_ptr<compute::Expression>& filter, cpp11::list projection) { arrow::dataset::internal::Initialize(); // TODO: pass in FragmentScanOptions auto options = std::make_shared<ds::ScanOptions>(); options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true); - options->dataset_schema = dataset->schema(); + // This filter is only used for predicate pushdown; + // you still need to pass it to a FilterNode after to handle any other components options->filter = *filter; - // ScanNode needs to know which fields to materialize (and which are unnecessary) + // ScanNode needs to know which fields to materialize. + // It will pull them from this projection to prune the scan, + // but you still need to Project after std::vector<compute::Expression> exprs; - for (const auto& name : materialized_field_names) { - exprs.push_back(compute::field_ref(name)); + for (SEXP expr : projection) { + auto expr_ptr = cpp11::as_cpp<std::shared_ptr<compute::Expression>>(expr); + exprs.push_back(*expr_ptr); } - - options->projection = - call("make_struct", std::move(exprs), - compute::MakeStructOptions{std::move(materialized_field_names)}); + cpp11::strings field_names(projection.attr(R_NamesSymbol)); + options->projection = call( + "make_struct", std::move(exprs), + compute::MakeStructOptions{cpp11::as_cpp<std::vector<std::string>>(field_names)}); return MakeExecNodeOrStop("scan", plan.get(), {}, ds::ScanNodeOptions{dataset, options}); diff --git a/r/src/expression.cpp b/r/src/expression.cpp index fbed6b5ee3..35ef34ee81 100644 --- a/r/src/expression.cpp +++ b/r/src/expression.cpp @@ -51,25 +51,6 @@ bool compute___expr__is_field_ref(const std::shared_ptr<compute::Expression>& x) return x->field_ref() != nullptr; } -// [[arrow::export]] -std::vector<std::string> field_names_in_expression( - const std::shared_ptr<compute::Expression>& x) { - std::vector<std::string> out; - std::vector<arrow::FieldRef> nested; - - auto field_refs = FieldsInExpression(*x); - for (auto f : field_refs) { - if (f.IsNested()) { - // We keep the top-level field name. - nested = *f.nested_refs(); - out.push_back(*nested[0].name()); - } else { - out.push_back(*f.name()); - } - } - return out; -} - // [[arrow::export]] std::string compute___expr__get_field_ref_name( const std::shared_ptr<compute::Expression>& x) {