This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ccfa1105 ARROW-16700: [C++][R][Datasets] aggregates on partitioning 
columns (#13518)
e0ccfa1105 is described below

commit e0ccfa110544e65b4e7ef2f9d39376a3dfb8420d
Author: Jeroen van Straten <[email protected]>
AuthorDate: Fri Jul 22 18:24:37 2022 +0200

    ARROW-16700: [C++][R][Datasets] aggregates on partitioning columns (#13518)
    
    This updates the Scanner node such that it will use the guarantee 
expression to fill out columns missing from the dataset but guaranteed to be 
some constant with appropriate scalars, rather than just inserting a null 
placeholder column. In case both are available, the dataset constructor prefers 
using the scalar from the guarantee expression over the actual data, since the 
latter would probably be an array that unnecessarily repeats the constant value.
    
    This is the other part of what was uncovered while analyzing ARROW-16700, 
the more direct cause being a duplicate of ARROW-16904 (see also 
https://github.com/apache/arrow/pull/13509 for my fix for that).
    
    Lead-authored-by: Jeroen van Straten <[email protected]>
    Co-authored-by: Aldrin M <[email protected]>
    Co-authored-by: octalene <[email protected]>
    Co-authored-by: Aldrin Montana <[email protected]>
    Co-authored-by: Weston Pace <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/compute/exec/expression.cc | 25 +++++++--
 cpp/src/arrow/compute/exec/expression.h  |  3 +-
 cpp/src/arrow/dataset/scanner.cc         | 10 ++--
 cpp/src/arrow/dataset/scanner_test.cc    | 88 ++++++++++++++++----------------
 4 files changed, 73 insertions(+), 53 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/expression.cc 
b/cpp/src/arrow/compute/exec/expression.cc
index c890b3c593..06f36c7f5a 100644
--- a/cpp/src/arrow/compute/exec/expression.cc
+++ b/cpp/src/arrow/compute/exec/expression.cc
@@ -456,17 +456,31 @@ Result<Expression> Expression::Bind(const Schema& 
in_schema,
   return BindImpl(*this, in_schema, exec_context);
 }
 
-Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& 
partial) {
+Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& 
partial,
+                                Expression guarantee) {
   ExecBatch out;
 
   if (partial.kind() == Datum::RECORD_BATCH) {
     const auto& partial_batch = *partial.record_batch();
+    out.guarantee = std::move(guarantee);
     out.length = partial_batch.num_rows();
 
+    ARROW_ASSIGN_OR_RAISE(auto known_field_values,
+                          ExtractKnownFieldValues(out.guarantee));
+
     for (const auto& field : full_schema.fields()) {
-      ARROW_ASSIGN_OR_RAISE(auto column,
-                            
FieldRef(field->name()).GetOneOrNone(partial_batch));
+      auto field_ref = FieldRef(field->name());
+
+      // If we know what the value must be from the guarantee, prefer to use 
that value
+      // than the data from the record batch (if it exists at all -- probably 
it doesn't),
+      // because this way it will be a scalar.
+      auto known_field_value = known_field_values.map.find(field_ref);
+      if (known_field_value != known_field_values.map.end()) {
+        out.values.emplace_back(known_field_value->second);
+        continue;
+      }
 
+      ARROW_ASSIGN_OR_RAISE(auto column, 
field_ref.GetOneOrNone(partial_batch));
       if (column) {
         if (!column->type()->Equals(field->type())) {
           // Referenced field was present but didn't have the expected type.
@@ -490,13 +504,14 @@ Result<ExecBatch> MakeExecBatch(const Schema& 
full_schema, const Datum& partial)
       ARROW_ASSIGN_OR_RAISE(auto partial_batch,
                             
RecordBatch::FromStructArray(partial.make_array()));
 
-      return MakeExecBatch(full_schema, partial_batch);
+      return MakeExecBatch(full_schema, partial_batch, std::move(guarantee));
     }
 
     if (partial.is_scalar()) {
       ARROW_ASSIGN_OR_RAISE(auto partial_array,
                             MakeArrayFromScalar(*partial.scalar(), 1));
-      ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, 
partial_array));
+      ARROW_ASSIGN_OR_RAISE(
+          auto out, MakeExecBatch(full_schema, partial_array, 
std::move(guarantee)));
 
       for (Datum& value : out.values) {
         if (value.is_scalar()) continue;
diff --git a/cpp/src/arrow/compute/exec/expression.h 
b/cpp/src/arrow/compute/exec/expression.h
index e9026961aa..a872e79959 100644
--- a/cpp/src/arrow/compute/exec/expression.h
+++ b/cpp/src/arrow/compute/exec/expression.h
@@ -226,7 +226,8 @@ Result<Expression> SimplifyWithGuarantee(Expression,
 /// RecordBatch which may have missing or incorrectly ordered columns.
 /// Missing fields will be replaced with null scalars.
 ARROW_EXPORT Result<ExecBatch> MakeExecBatch(const Schema& full_schema,
-                                             const Datum& partial);
+                                             const Datum& partial,
+                                             Expression guarantee = 
literal(true));
 
 /// Execute a scalar expression against the provided state and input 
ExecBatch. This
 /// expression must be bound.
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index d2d0923d03..c3e5b2a477 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -900,9 +900,6 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* 
plan,
       std::move(batch_gen),
       [scan_options](const EnumeratedRecordBatch& partial)
           -> Result<util::optional<compute::ExecBatch>> {
-        ARROW_ASSIGN_OR_RAISE(util::optional<compute::ExecBatch> batch,
-                              
compute::MakeExecBatch(*scan_options->dataset_schema,
-                                                     
partial.record_batch.value));
         // TODO(ARROW-13263) fragments may be able to attach more guarantees 
to batches
         // than this, for example parquet's row group stats. Failing to do 
this leaves
         // perf on the table because row group stats could be used to skip 
kernel execs in
@@ -911,7 +908,12 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* 
plan,
         // Additionally, if a fragment failed to perform projection pushdown 
there may be
         // unnecessarily materialized columns in batch. We could drop them now 
instead of
         // letting them coast through the rest of the plan.
-        batch->guarantee = partial.fragment.value->partition_expression();
+        auto guarantee = partial.fragment.value->partition_expression();
+
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*scan_options->dataset_schema,
+                                   partial.record_batch.value, guarantee));
 
         // tag rows with fragment- and batch-of-origin
         batch->values.emplace_back(partial.fragment.index);
diff --git a/cpp/src/arrow/dataset/scanner_test.cc 
b/cpp/src/arrow/dataset/scanner_test.cc
index f5db16f694..26a2353332 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -27,6 +27,7 @@
 #include "arrow/compute/api_vector.h"
 #include "arrow/compute/cast.h"
 #include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression_internal.h"
 #include "arrow/dataset/plan.h"
 #include "arrow/dataset/test_util.h"
 #include "arrow/record_batch.h"
@@ -1371,16 +1372,19 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
     const std::shared_ptr<Schema>& dataset_schema,
     const std::shared_ptr<Schema>& physical_schema,
     const std::vector<std::vector<std::string>>& fragment_batch_strs,
-    const std::vector<compute::Expression>& guarantees,
-    std::function<void(compute::ExecBatch*, const RecordBatch&)> 
make_exec_batch = {}) {
+    const std::vector<compute::Expression>& guarantees) {
+  // If guarantees are provided we must have one for each batch
   if (!guarantees.empty()) {
     EXPECT_EQ(fragment_batch_strs.size(), guarantees.size());
   }
+
   RecordBatchVector record_batches;
   FragmentVector fragments;
   fragments.reserve(fragment_batch_strs.size());
-  for (size_t i = 0; i < fragment_batch_strs.size(); i++) {
-    const auto& batch_strs = fragment_batch_strs[i];
+
+  // construct fragments first
+  for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); frag_ndx++) 
{
+    const auto& batch_strs = fragment_batch_strs[frag_ndx];
     RecordBatchVector fragment_batches;
     fragment_batches.reserve(batch_strs.size());
     for (const auto& batch_str : batch_strs) {
@@ -1390,37 +1394,40 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
                           fragment_batches.end());
     fragments.push_back(std::make_shared<InMemoryFragment>(
         physical_schema, std::move(fragment_batches),
-        guarantees.empty() ? literal(true) : guarantees[i]));
+        guarantees.empty() ? literal(true) : guarantees[frag_ndx]));
   }
 
+  // then construct ExecBatches that can reference fields from constructed 
Fragments
   std::vector<compute::ExecBatch> batches;
   auto batch_it = record_batches.begin();
-  for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size();
-       ++fragment_index) {
-    for (size_t batch_index = 0; batch_index < 
fragment_batch_strs[fragment_index].size();
-         ++batch_index) {
+  for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); ++frag_ndx) 
{
+    size_t frag_batch_count = fragment_batch_strs[frag_ndx].size();
+
+    for (size_t batch_index = 0; batch_index < frag_batch_count; 
++batch_index) {
       const auto& batch = *batch_it++;
 
       // the scanned ExecBatches will begin with physical columns
       batches.emplace_back(*batch);
 
-      // allow customizing the ExecBatch (e.g. to fill in placeholders for 
partition
-      // fields)
-      if (make_exec_batch) {
-        make_exec_batch(&batches.back(), *batch);
+      // augment scanned ExecBatch with columns for this fragment's guarantee
+      if (!guarantees.empty()) {
+        EXPECT_OK_AND_ASSIGN(auto known_fields,
+                             ExtractKnownFieldValues(guarantees[frag_ndx]));
+        for (const auto& known_field : known_fields.map) {
+          batches.back().values.emplace_back(known_field.second);
+        }
       }
 
       // scanned batches will be augmented with fragment and batch indices
-      batches.back().values.emplace_back(static_cast<int>(fragment_index));
+      batches.back().values.emplace_back(static_cast<int>(frag_ndx));
       batches.back().values.emplace_back(static_cast<int>(batch_index));
 
       // ... and with the last-in-fragment flag
-      batches.back().values.emplace_back(batch_index ==
-                                         
fragment_batch_strs[fragment_index].size() - 1);
-      
batches.back().values.emplace_back(fragments[fragment_index]->ToString());
+      batches.back().values.emplace_back(batch_index == frag_batch_count - 1);
+      batches.back().values.emplace_back(fragments[frag_ndx]->ToString());
 
       // each batch carries a guarantee inherited from its Fragment's 
partition expression
-      batches.back().guarantee = 
fragments[fragment_index]->partition_expression();
+      batches.back().guarantee = fragments[frag_ndx]->partition_expression();
     }
   }
 
@@ -1437,31 +1444,26 @@ DatasetAndBatches MakeBasicDataset() {
 
   const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", 
"b"});
 
-  return DatasetAndBatchesFromJSON(
-      dataset_schema, physical_schema,
-      {
-          {
-              R"([{"a": 1,    "b": null},
-                  {"a": 2,    "b": true}])",
-              R"([{"a": null, "b": true},
-                  {"a": 3,    "b": false}])",
-          },
-          {
-              R"([{"a": null, "b": true},
-                  {"a": 4,    "b": false}])",
-              R"([{"a": 5,    "b": null},
-                  {"a": 6,    "b": false},
-                  {"a": 7,    "b": false}])",
-          },
-      },
-      {
-          equal(field_ref("c"), literal(23)),
-          equal(field_ref("c"), literal(47)),
-      },
-      [](compute::ExecBatch* batch, const RecordBatch&) {
-        // a placeholder will be inserted for partition field "c"
-        batch->values.emplace_back(std::make_shared<Int32Scalar>());
-      });
+  return DatasetAndBatchesFromJSON(dataset_schema, physical_schema,
+                                   {
+                                       {
+                                           R"([{"a": 1,    "b": null},
+                                               {"a": 2,    "b": true}])",
+                                           R"([{"a": null, "b": true},
+                                               {"a": 3,    "b": false}])",
+                                       },
+                                       {
+                                           R"([{"a": null, "b": true},
+                                               {"a": 4,    "b": false}])",
+                                           R"([{"a": 5,    "b": null},
+                                               {"a": 6,    "b": false},
+                                               {"a": 7,    "b": false}])",
+                                       },
+                                   },
+                                   {
+                                       equal(field_ref("c"), literal(23)),
+                                       equal(field_ref("c"), literal(47)),
+                                   });
 }
 
 DatasetAndBatches MakeNestedDataset() {

Reply via email to