This is an automated email from the ASF dual-hosted git repository.

npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d0a7fb9403 GH-33760: [R][C++] Handle nested field refs in scanner  
(#33770)
d0a7fb9403 is described below

commit d0a7fb9403a904b7850517c745c3925695d8658d
Author: Neal Richardson <neal.p.richard...@gmail.com>
AuthorDate: Tue Jan 24 11:56:30 2023 -0500

    GH-33760: [R][C++] Handle nested field refs in scanner  (#33770)
    
    ### Rationale for this change
    
    Followup to https://github.com/apache/arrow/pull/19706/files#r1073391100 
with the goal of deleting and simplifying some code. As it turned out, it was 
more about moving code from the R bindings to the C++ library.
    
    ### Are there any user-facing changes?
    
    Not for R users, but this fixes a bug in the dataset C++ library where 
nested field refs could not be handled by the scanner.
    
    * Closes: #33760
    
    Authored-by: Neal Richardson <neal.p.richard...@gmail.com>
    Signed-off-by: Neal Richardson <neal.p.richard...@gmail.com>
---
 cpp/src/arrow/dataset/scanner.cc | 23 ++++++++++++++++-------
 r/R/arrowExports.R               |  9 +++------
 r/R/query-engine.R               | 10 +++-------
 r/src/arrowExports.cpp           | 19 +++++--------------
 r/src/compute-exec.cpp           | 23 +++++++++++++----------
 r/src/expression.cpp             | 19 -------------------
 6 files changed, 40 insertions(+), 63 deletions(-)

diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index f307787357..bc8feec96d 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -22,6 +22,7 @@
 #include <memory>
 #include <mutex>
 #include <numeric>
+#include <set>
 #include <sstream>
 
 #include "arrow/array/array_primitive.h"
@@ -135,6 +136,7 @@ Result<std::shared_ptr<Schema>> 
GetProjectedSchemaFromExpression(
     const std::shared_ptr<Schema>& dataset_schema) {
   // process resultant dataset_schema after projection
   FieldVector project_fields;
+  std::set<std::string> field_names;
   if (auto call = projection.call()) {
     if (call->function_name != "make_struct") {
       return Status::Invalid("Top level projection expression call must be 
make_struct");
@@ -142,13 +144,11 @@ Result<std::shared_ptr<Schema>> 
GetProjectedSchemaFromExpression(
     for (const compute::Expression& arg : call->arguments) {
       if (auto field_ref = arg.field_ref()) {
         if (field_ref->IsName()) {
-          auto field = dataset_schema->GetFieldByName(*field_ref->name());
-          if (field) {
-            project_fields.push_back(std::move(field));
-          }
-          // if the field is not present in the schema we ignore it.
-          // the case is if kAugmentedFields are present in the expression
-          // and if they are not present in the provided schema, we ignore 
them.
+          field_names.emplace(*field_ref->name());
+        } else if (field_ref->IsNested()) {
+          // We keep the top-level field name.
+          auto nested_field_refs = *field_ref->nested_refs();
+          field_names.emplace(*nested_field_refs[0].name());
         } else {
           return Status::Invalid(
               "No projected schema was supplied and we could not infer the 
projected "
@@ -157,6 +157,15 @@ Result<std::shared_ptr<Schema>> 
GetProjectedSchemaFromExpression(
       }
     }
   }
+  for (auto f : field_names) {
+    auto field = dataset_schema->GetFieldByName(f);
+    if (field) {
+      // if the field is not present in the schema we ignore it.
+      // the case is if kAugmentedFields are present in the expression
+      // and if they are not present in the provided schema, we ignore them.
+      project_fields.push_back(std::move(field));
+    }
+  }
   return schema(project_fields);
 }
 
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 2eeca24dbd..5e807fbab1 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -460,8 +460,8 @@ ExecNode_output_schema <- function(node) {
   .Call(`_arrow_ExecNode_output_schema`, node)
 }
 
-ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) {
-  .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, 
materialized_field_names)
+ExecNode_Scan <- function(plan, dataset, filter, projection) {
+  .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
 }
 
 ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, 
filesystem, base_dir, partitioning, basename_template, existing_data_behavior, 
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, 
max_rows_per_group) {
@@ -1088,10 +1088,6 @@ compute___expr__is_field_ref <- function(x) {
   .Call(`_arrow_compute___expr__is_field_ref`, x)
 }
 
-field_names_in_expression <- function(x) {
-  .Call(`_arrow_field_names_in_expression`, x)
-}
-
 compute___expr__get_field_ref_name <- function(x) {
   .Call(`_arrow_compute___expr__get_field_ref_name`, x)
 }
@@ -2095,3 +2091,4 @@ SetIOThreadPoolCapacity <- function(threads) {
 Array__infer_type <- function(x) {
   .Call(`_arrow_Array__infer_type`, x)
 }
+
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 2f0b421fae..7a336b7a07 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -34,11 +34,7 @@ ExecPlan <- R6Class("ExecPlan",
         if (isTRUE(filter)) {
           filter <- Expression$scalar(TRUE)
         }
-        # Use FieldsInExpression to find all from dataset$selected_columns
-        colnames <- unique(unlist(map(
-          dataset$selected_columns,
-          field_names_in_expression
-        )))
+        projection <- dataset$selected_columns
         dataset <- dataset$.data
         assert_is(dataset, "Dataset")
       } else {
@@ -46,10 +42,10 @@ ExecPlan <- R6Class("ExecPlan",
         # Just a dataset, not a query, so there's no predicates to push down
         # so set some defaults
         filter <- Expression$scalar(TRUE)
-        colnames <- names(dataset)
+        projection <- make_field_refs(colnames)
       }
 
-      out <- ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
+      out <- ExecNode_Scan(self, dataset, filter, projection)
       # Hold onto the source data's schema so we can preserve schema metadata
       # in the resulting Scan/Write
       out$extras$source_schema <- dataset$schema
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index e918390e26..dade762683 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -990,18 +990,18 @@ END_CPP11
 }
 // compute-exec.cpp
 #if defined(ARROW_R_WITH_DATASET)
-std::shared_ptr<compute::ExecNode> ExecNode_Scan(const 
std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& 
dataset, const std::shared_ptr<compute::Expression>& filter, 
std::vector<std::string> materialized_field_names);
-extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP 
filter_sexp, SEXP materialized_field_names_sexp){
+std::shared_ptr<compute::ExecNode> ExecNode_Scan(const 
std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& 
dataset, const std::shared_ptr<compute::Expression>& filter, cpp11::list 
projection);
+extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP 
filter_sexp, SEXP projection_sexp){
 BEGIN_CPP11
        arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type 
plan(plan_sexp);
        arrow::r::Input<const std::shared_ptr<ds::Dataset>&>::type 
dataset(dataset_sexp);
        arrow::r::Input<const std::shared_ptr<compute::Expression>&>::type 
filter(filter_sexp);
-       arrow::r::Input<std::vector<std::string>>::type 
materialized_field_names(materialized_field_names_sexp);
-       return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, 
materialized_field_names));
+       arrow::r::Input<cpp11::list>::type projection(projection_sexp);
+       return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, projection));
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP 
filter_sexp, SEXP materialized_field_names_sexp){
+extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP 
filter_sexp, SEXP projection_sexp){
        Rf_error("Cannot call ExecNode_Scan(). See 
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow 
C++ libraries. ");
 }
 #endif
@@ -2740,14 +2740,6 @@ BEGIN_CPP11
 END_CPP11
 }
 // expression.cpp
-std::vector<std::string> field_names_in_expression(const 
std::shared_ptr<compute::Expression>& x);
-extern "C" SEXP _arrow_field_names_in_expression(SEXP x_sexp){
-BEGIN_CPP11
-       arrow::r::Input<const std::shared_ptr<compute::Expression>&>::type 
x(x_sexp);
-       return cpp11::as_sexp(field_names_in_expression(x));
-END_CPP11
-}
-// expression.cpp
 std::string compute___expr__get_field_ref_name(const 
std::shared_ptr<compute::Expression>& x);
 extern "C" SEXP _arrow_compute___expr__get_field_ref_name(SEXP x_sexp){
 BEGIN_CPP11
@@ -5587,7 +5579,6 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_compute___expr__equals", (DL_FUNC) 
&_arrow_compute___expr__equals, 2}, 
                { "_arrow_compute___expr__call", (DL_FUNC) 
&_arrow_compute___expr__call, 3}, 
                { "_arrow_compute___expr__is_field_ref", (DL_FUNC) 
&_arrow_compute___expr__is_field_ref, 1}, 
-               { "_arrow_field_names_in_expression", (DL_FUNC) 
&_arrow_field_names_in_expression, 1}, 
                { "_arrow_compute___expr__get_field_ref_name", (DL_FUNC) 
&_arrow_compute___expr__get_field_ref_name, 1}, 
                { "_arrow_compute___expr__field_ref", (DL_FUNC) 
&_arrow_compute___expr__field_ref, 1}, 
                { "_arrow_compute___expr__nested_field_ref", (DL_FUNC) 
&_arrow_compute___expr__nested_field_ref, 2}, 
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index 64ea6f5b5e..2a2e509c23 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -288,28 +288,31 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema(
 std::shared_ptr<compute::ExecNode> ExecNode_Scan(
     const std::shared_ptr<compute::ExecPlan>& plan,
     const std::shared_ptr<ds::Dataset>& dataset,
-    const std::shared_ptr<compute::Expression>& filter,
-    std::vector<std::string> materialized_field_names) {
+    const std::shared_ptr<compute::Expression>& filter, cpp11::list 
projection) {
   arrow::dataset::internal::Initialize();
 
   // TODO: pass in FragmentScanOptions
   auto options = std::make_shared<ds::ScanOptions>();
 
   options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true);
-
   options->dataset_schema = dataset->schema();
 
+  // This filter is only used for predicate pushdown;
+  // you still need to pass it to a FilterNode after to handle any other 
components
   options->filter = *filter;
 
-  // ScanNode needs to know which fields to materialize (and which are 
unnecessary)
+  // ScanNode needs to know which fields to materialize.
+  // It will pull them from this projection to prune the scan,
+  // but you still need to Project after
   std::vector<compute::Expression> exprs;
-  for (const auto& name : materialized_field_names) {
-    exprs.push_back(compute::field_ref(name));
+  for (SEXP expr : projection) {
+    auto expr_ptr = cpp11::as_cpp<std::shared_ptr<compute::Expression>>(expr);
+    exprs.push_back(*expr_ptr);
   }
-
-  options->projection =
-      call("make_struct", std::move(exprs),
-           compute::MakeStructOptions{std::move(materialized_field_names)});
+  cpp11::strings field_names(projection.attr(R_NamesSymbol));
+  options->projection = call(
+      "make_struct", std::move(exprs),
+      
compute::MakeStructOptions{cpp11::as_cpp<std::vector<std::string>>(field_names)});
 
   return MakeExecNodeOrStop("scan", plan.get(), {},
                             ds::ScanNodeOptions{dataset, options});
diff --git a/r/src/expression.cpp b/r/src/expression.cpp
index fbed6b5ee3..35ef34ee81 100644
--- a/r/src/expression.cpp
+++ b/r/src/expression.cpp
@@ -51,25 +51,6 @@ bool compute___expr__is_field_ref(const 
std::shared_ptr<compute::Expression>& x)
   return x->field_ref() != nullptr;
 }
 
-// [[arrow::export]]
-std::vector<std::string> field_names_in_expression(
-    const std::shared_ptr<compute::Expression>& x) {
-  std::vector<std::string> out;
-  std::vector<arrow::FieldRef> nested;
-
-  auto field_refs = FieldsInExpression(*x);
-  for (auto f : field_refs) {
-    if (f.IsNested()) {
-      // We keep the top-level field name.
-      nested = *f.nested_refs();
-      out.push_back(*nested[0].name());
-    } else {
-      out.push_back(*f.name());
-    }
-  }
-  return out;
-}
-
 // [[arrow::export]]
 std::string compute___expr__get_field_ref_name(
     const std::shared_ptr<compute::Expression>& x) {

Reply via email to