This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e0ccfa1105 ARROW-16700: [C++][R][Datasets] aggregates on partitioning
columns (#13518)
e0ccfa1105 is described below
commit e0ccfa110544e65b4e7ef2f9d39376a3dfb8420d
Author: Jeroen van Straten <[email protected]>
AuthorDate: Fri Jul 22 18:24:37 2022 +0200
ARROW-16700: [C++][R][Datasets] aggregates on partitioning columns (#13518)
This updates the Scanner node such that it will use the guarantee
expression to fill out columns missing from the dataset but guaranteed to be
some constant with appropriate scalars, rather than just inserting a null
placeholder column. In case both are available, the dataset constructor prefers
using the scalar from the guarantee expression over the actual data, since the
latter would probably be an array that unnecessarily repeats the constant value.
This is the other part of what was uncovered while analyzing ARROW-16700,
the more direct cause being a duplicate of ARROW-16904 (see also
https://github.com/apache/arrow/pull/13509 for my fix for that).
Lead-authored-by: Jeroen van Straten <[email protected]>
Co-authored-by: Aldrin M <[email protected]>
Co-authored-by: octalene <[email protected]>
Co-authored-by: Aldrin Montana <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/compute/exec/expression.cc | 25 +++++++--
cpp/src/arrow/compute/exec/expression.h | 3 +-
cpp/src/arrow/dataset/scanner.cc | 10 ++--
cpp/src/arrow/dataset/scanner_test.cc | 88 ++++++++++++++++----------------
4 files changed, 73 insertions(+), 53 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/expression.cc
b/cpp/src/arrow/compute/exec/expression.cc
index c890b3c593..06f36c7f5a 100644
--- a/cpp/src/arrow/compute/exec/expression.cc
+++ b/cpp/src/arrow/compute/exec/expression.cc
@@ -456,17 +456,31 @@ Result<Expression> Expression::Bind(const Schema&
in_schema,
return BindImpl(*this, in_schema, exec_context);
}
-Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum&
partial) {
+Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum&
partial,
+ Expression guarantee) {
ExecBatch out;
if (partial.kind() == Datum::RECORD_BATCH) {
const auto& partial_batch = *partial.record_batch();
+ out.guarantee = std::move(guarantee);
out.length = partial_batch.num_rows();
+ ARROW_ASSIGN_OR_RAISE(auto known_field_values,
+ ExtractKnownFieldValues(out.guarantee));
+
for (const auto& field : full_schema.fields()) {
- ARROW_ASSIGN_OR_RAISE(auto column,
-
FieldRef(field->name()).GetOneOrNone(partial_batch));
+ auto field_ref = FieldRef(field->name());
+
+ // If we know what the value must be from the guarantee, prefer to use
that value
+ // than the data from the record batch (if it exists at all -- probably
it doesn't),
+ // because this way it will be a scalar.
+ auto known_field_value = known_field_values.map.find(field_ref);
+ if (known_field_value != known_field_values.map.end()) {
+ out.values.emplace_back(known_field_value->second);
+ continue;
+ }
+ ARROW_ASSIGN_OR_RAISE(auto column,
field_ref.GetOneOrNone(partial_batch));
if (column) {
if (!column->type()->Equals(field->type())) {
// Referenced field was present but didn't have the expected type.
@@ -490,13 +504,14 @@ Result<ExecBatch> MakeExecBatch(const Schema&
full_schema, const Datum& partial)
ARROW_ASSIGN_OR_RAISE(auto partial_batch,
RecordBatch::FromStructArray(partial.make_array()));
- return MakeExecBatch(full_schema, partial_batch);
+ return MakeExecBatch(full_schema, partial_batch, std::move(guarantee));
}
if (partial.is_scalar()) {
ARROW_ASSIGN_OR_RAISE(auto partial_array,
MakeArrayFromScalar(*partial.scalar(), 1));
- ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema,
partial_array));
+ ARROW_ASSIGN_OR_RAISE(
+ auto out, MakeExecBatch(full_schema, partial_array,
std::move(guarantee)));
for (Datum& value : out.values) {
if (value.is_scalar()) continue;
diff --git a/cpp/src/arrow/compute/exec/expression.h
b/cpp/src/arrow/compute/exec/expression.h
index e9026961aa..a872e79959 100644
--- a/cpp/src/arrow/compute/exec/expression.h
+++ b/cpp/src/arrow/compute/exec/expression.h
@@ -226,7 +226,8 @@ Result<Expression> SimplifyWithGuarantee(Expression,
/// RecordBatch which may have missing or incorrectly ordered columns.
/// Missing fields will be replaced with null scalars.
ARROW_EXPORT Result<ExecBatch> MakeExecBatch(const Schema& full_schema,
- const Datum& partial);
+ const Datum& partial,
+ Expression guarantee =
literal(true));
/// Execute a scalar expression against the provided state and input
ExecBatch. This
/// expression must be bound.
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index d2d0923d03..c3e5b2a477 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -900,9 +900,6 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan*
plan,
std::move(batch_gen),
[scan_options](const EnumeratedRecordBatch& partial)
-> Result<util::optional<compute::ExecBatch>> {
- ARROW_ASSIGN_OR_RAISE(util::optional<compute::ExecBatch> batch,
-
compute::MakeExecBatch(*scan_options->dataset_schema,
-
partial.record_batch.value));
// TODO(ARROW-13263) fragments may be able to attach more guarantees
to batches
// than this, for example parquet's row group stats. Failing to do
this leaves
// perf on the table because row group stats could be used to skip
kernel execs in
@@ -911,7 +908,12 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan*
plan,
// Additionally, if a fragment failed to perform projection pushdown
there may be
// unnecessarily materialized columns in batch. We could drop them now
instead of
// letting them coast through the rest of the plan.
- batch->guarantee = partial.fragment.value->partition_expression();
+ auto guarantee = partial.fragment.value->partition_expression();
+
+ ARROW_ASSIGN_OR_RAISE(
+ util::optional<compute::ExecBatch> batch,
+ compute::MakeExecBatch(*scan_options->dataset_schema,
+ partial.record_batch.value, guarantee));
// tag rows with fragment- and batch-of-origin
batch->values.emplace_back(partial.fragment.index);
diff --git a/cpp/src/arrow/dataset/scanner_test.cc
b/cpp/src/arrow/dataset/scanner_test.cc
index f5db16f694..26a2353332 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -27,6 +27,7 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression_internal.h"
#include "arrow/dataset/plan.h"
#include "arrow/dataset/test_util.h"
#include "arrow/record_batch.h"
@@ -1371,16 +1372,19 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
const std::shared_ptr<Schema>& dataset_schema,
const std::shared_ptr<Schema>& physical_schema,
const std::vector<std::vector<std::string>>& fragment_batch_strs,
- const std::vector<compute::Expression>& guarantees,
- std::function<void(compute::ExecBatch*, const RecordBatch&)>
make_exec_batch = {}) {
+ const std::vector<compute::Expression>& guarantees) {
+ // If guarantees are provided we must have one for each batch
if (!guarantees.empty()) {
EXPECT_EQ(fragment_batch_strs.size(), guarantees.size());
}
+
RecordBatchVector record_batches;
FragmentVector fragments;
fragments.reserve(fragment_batch_strs.size());
- for (size_t i = 0; i < fragment_batch_strs.size(); i++) {
- const auto& batch_strs = fragment_batch_strs[i];
+
+ // construct fragments first
+ for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); frag_ndx++)
{
+ const auto& batch_strs = fragment_batch_strs[frag_ndx];
RecordBatchVector fragment_batches;
fragment_batches.reserve(batch_strs.size());
for (const auto& batch_str : batch_strs) {
@@ -1390,37 +1394,40 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
fragment_batches.end());
fragments.push_back(std::make_shared<InMemoryFragment>(
physical_schema, std::move(fragment_batches),
- guarantees.empty() ? literal(true) : guarantees[i]));
+ guarantees.empty() ? literal(true) : guarantees[frag_ndx]));
}
+ // then construct ExecBatches that can reference fields from constructed
Fragments
std::vector<compute::ExecBatch> batches;
auto batch_it = record_batches.begin();
- for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size();
- ++fragment_index) {
- for (size_t batch_index = 0; batch_index <
fragment_batch_strs[fragment_index].size();
- ++batch_index) {
+ for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); ++frag_ndx)
{
+ size_t frag_batch_count = fragment_batch_strs[frag_ndx].size();
+
+ for (size_t batch_index = 0; batch_index < frag_batch_count;
++batch_index) {
const auto& batch = *batch_it++;
// the scanned ExecBatches will begin with physical columns
batches.emplace_back(*batch);
- // allow customizing the ExecBatch (e.g. to fill in placeholders for
partition
- // fields)
- if (make_exec_batch) {
- make_exec_batch(&batches.back(), *batch);
+ // augment scanned ExecBatch with columns for this fragment's guarantee
+ if (!guarantees.empty()) {
+ EXPECT_OK_AND_ASSIGN(auto known_fields,
+ ExtractKnownFieldValues(guarantees[frag_ndx]));
+ for (const auto& known_field : known_fields.map) {
+ batches.back().values.emplace_back(known_field.second);
+ }
}
// scanned batches will be augmented with fragment and batch indices
- batches.back().values.emplace_back(static_cast<int>(fragment_index));
+ batches.back().values.emplace_back(static_cast<int>(frag_ndx));
batches.back().values.emplace_back(static_cast<int>(batch_index));
// ... and with the last-in-fragment flag
- batches.back().values.emplace_back(batch_index ==
-
fragment_batch_strs[fragment_index].size() - 1);
-
batches.back().values.emplace_back(fragments[fragment_index]->ToString());
+ batches.back().values.emplace_back(batch_index == frag_batch_count - 1);
+ batches.back().values.emplace_back(fragments[frag_ndx]->ToString());
// each batch carries a guarantee inherited from its Fragment's
partition expression
- batches.back().guarantee =
fragments[fragment_index]->partition_expression();
+ batches.back().guarantee = fragments[frag_ndx]->partition_expression();
}
}
@@ -1437,31 +1444,26 @@ DatasetAndBatches MakeBasicDataset() {
const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a",
"b"});
- return DatasetAndBatchesFromJSON(
- dataset_schema, physical_schema,
- {
- {
- R"([{"a": 1, "b": null},
- {"a": 2, "b": true}])",
- R"([{"a": null, "b": true},
- {"a": 3, "b": false}])",
- },
- {
- R"([{"a": null, "b": true},
- {"a": 4, "b": false}])",
- R"([{"a": 5, "b": null},
- {"a": 6, "b": false},
- {"a": 7, "b": false}])",
- },
- },
- {
- equal(field_ref("c"), literal(23)),
- equal(field_ref("c"), literal(47)),
- },
- [](compute::ExecBatch* batch, const RecordBatch&) {
- // a placeholder will be inserted for partition field "c"
- batch->values.emplace_back(std::make_shared<Int32Scalar>());
- });
+ return DatasetAndBatchesFromJSON(dataset_schema, physical_schema,
+ {
+ {
+ R"([{"a": 1, "b": null},
+ {"a": 2, "b": true}])",
+ R"([{"a": null, "b": true},
+ {"a": 3, "b": false}])",
+ },
+ {
+ R"([{"a": null, "b": true},
+ {"a": 4, "b": false}])",
+ R"([{"a": 5, "b": null},
+ {"a": 6, "b": false},
+ {"a": 7, "b": false}])",
+ },
+ },
+ {
+ equal(field_ref("c"), literal(23)),
+ equal(field_ref("c"), literal(47)),
+ });
}
DatasetAndBatches MakeNestedDataset() {