This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b71e503184 ARROW-17060: [C++] Change AsOfJoinNode to use ExecContext's
Memory Pool (#13585)
b71e503184 is described below
commit b71e50318406226088b920b76c267289fb9ffb2d
Author: Ivan Chau <[email protected]>
AuthorDate: Tue Jul 12 20:51:53 2022 -0400
ARROW-17060: [C++] Change AsOfJoinNode to use ExecContext's Memory Pool
(#13585)
Change `asof_join_node.cc` so the `Builder` it uses allocates memory from
the `ExecContext`'s memory pool instead of the default memory pool.
Authored-by: Ivan Chau <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/compute/exec/asof_join_node.cc | 24 +++++++++++++-----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/asof_join_node.cc
b/cpp/src/arrow/compute/exec/asof_join_node.cc
index 93eca8dbfb..e392c33e19 100644
--- a/cpp/src/arrow/compute/exec/asof_join_node.cc
+++ b/cpp/src/arrow/compute/exec/asof_join_node.cc
@@ -383,7 +383,7 @@ class CompositeReferenceTable {
// Materializes the current reference table into a target record batch
Result<std::shared_ptr<RecordBatch>> Materialize(
- const std::shared_ptr<arrow::Schema>& output_schema,
+ MemoryPool* memory_pool, const std::shared_ptr<arrow::Schema>&
output_schema,
const std::vector<std::unique_ptr<InputState>>& state) {
DCHECK_EQ(state.size(), n_tables_);
@@ -410,22 +410,22 @@ class CompositeReferenceTable {
if (field_type->Equals(arrow::int32())) {
ARROW_ASSIGN_OR_RAISE(
arrays.at(i_dst_col),
- (MaterializePrimitiveColumn<arrow::Int32Builder,
int32_t>(i_table,
-
i_src_col)));
+ (MaterializePrimitiveColumn<arrow::Int32Builder, int32_t>(
+ memory_pool, i_table, i_src_col)));
} else if (field_type->Equals(arrow::int64())) {
ARROW_ASSIGN_OR_RAISE(
arrays.at(i_dst_col),
- (MaterializePrimitiveColumn<arrow::Int64Builder,
int64_t>(i_table,
-
i_src_col)));
+ (MaterializePrimitiveColumn<arrow::Int64Builder, int64_t>(
+ memory_pool, i_table, i_src_col)));
} else if (field_type->Equals(arrow::float32())) {
ARROW_ASSIGN_OR_RAISE(arrays.at(i_dst_col),
(MaterializePrimitiveColumn<arrow::FloatBuilder, float>(
- i_table, i_src_col)));
+ memory_pool, i_table, i_src_col)));
} else if (field_type->Equals(arrow::float64())) {
ARROW_ASSIGN_OR_RAISE(
arrays.at(i_dst_col),
- (MaterializePrimitiveColumn<arrow::DoubleBuilder,
double>(i_table,
-
i_src_col)));
+ (MaterializePrimitiveColumn<arrow::DoubleBuilder, double>(
+ memory_pool, i_table, i_src_col)));
} else {
ARROW_RETURN_NOT_OK(
Status::Invalid("Unsupported data type: ", src_field->name()));
@@ -460,9 +460,10 @@ class CompositeReferenceTable {
}
template <class Builder, class PrimitiveType>
- Result<std::shared_ptr<Array>> MaterializePrimitiveColumn(size_t i_table,
+ Result<std::shared_ptr<Array>> MaterializePrimitiveColumn(MemoryPool*
memory_pool,
+ size_t i_table,
col_index_t i_col)
{
- Builder builder;
+ Builder builder(memory_pool);
ARROW_RETURN_NOT_OK(builder.Reserve(rows_.size()));
for (row_index_t i_row = 0; i_row < rows_.size(); ++i_row) {
const auto& ref = rows_[i_row].refs[i_table];
@@ -549,7 +550,8 @@ class AsofJoinNode : public ExecNode {
if (dst.empty()) {
return NULLPTR;
} else {
- return dst.Materialize(output_schema(), state_);
+ return dst.Materialize(plan()->exec_context()->memory_pool(),
output_schema(),
+ state_);
}
}