zanmato1984 commented on code in PR #41874:
URL: https://github.com/apache/arrow/pull/41874#discussion_r1696924129
##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -204,15 +232,79 @@ class UnmaterializedCompositeTable {
return builder.Append(data + offset0, offset1 - offset0);
}
+ arrow::Result<std::vector<CompositeEntry>> FlattenSlices(int table_index) {
+ std::vector<CompositeEntry> flattened_blocks;
+
+ arrow::RecordBatch* active_rb = NULL;
+ size_t start = -1;
+ size_t end = -1;
+
+ for (const auto& slice : slices) {
+ const auto& [batch, block_start, block_end] =
slice.components[table_index];
+ if (active_rb == NULL) {
+ active_rb = batch;
+ start = block_start;
+ end = block_end;
+ } else if (active_rb == batch && block_start == end) {
+ end = block_end;
+ } else {
+ flattened_blocks.push_back({active_rb, start, end});
+ active_rb = batch;
+ start = block_start;
+ end = block_end;
+ }
+ DCHECK_NE(active_rb, NULL);
+ }
+ // flush the last batch
+ flattened_blocks.push_back({active_rb, start, end});
+ return flattened_blocks;
+ }
+
template <class Type, class Builder = typename
arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
- const std::shared_ptr<arrow::DataType>& type, int i_col) {
+ const std::shared_ptr<arrow::DataType>& type, int i_col,
+ const std::optional<std::unordered_map<int,
std::vector<CompositeEntry>>>&
+ contiguous_blocks) {
+ const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+ if (contiguous_blocks.has_value() &&
+ contiguous_blocks.value().find(table_index) !=
contiguous_blocks.value().end()) {
+ return
materializeContiguous<Type>(contiguous_blocks.value().at(table_index), type,
+ column_index);
+ }
+ return materializeRowByRow<Type>(type, table_index, column_index);
+ }
+
+ /// If the source table is contiguous, we can take slices of the record batch
+ /// directly. This is much cheaper than copying the data row-by-row into an
output
+ /// array.
+ template <class Type, class Builder = typename
arrow::TypeTraits<Type>::BuilderType>
+ arrow::Result<std::shared_ptr<arrow::Array>> materializeContiguous(
+ const std::vector<CompositeEntry>& flattened_blocks,
+ const std::shared_ptr<arrow::DataType>& type, int column_index) {
+ if (Size() == 0) {
+ ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+ return builderPtr->Finish();
+ }
+
+ std::vector<std::shared_ptr<arrow::Array>> col;
+ col.reserve(flattened_blocks.size());
+ for (const auto& [rb, block_start, block_end] : flattened_blocks) {
+ auto chunk = rb->column(column_index)
+ ->Slice(block_start, /*length=*/(block_end -
block_start + 1));
+ DCHECK_EQ(chunk->type()->id(), type->id());
+ col.push_back(std::move(chunk));
+ }
+ return arrow::Concatenate(col);
Review Comment:
We need to pass the specified `this->pool` down to the `Concatenate` call.
##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -204,15 +232,79 @@ class UnmaterializedCompositeTable {
return builder.Append(data + offset0, offset1 - offset0);
}
+ arrow::Result<std::vector<CompositeEntry>> FlattenSlices(int table_index) {
+ std::vector<CompositeEntry> flattened_blocks;
+
+ arrow::RecordBatch* active_rb = NULL;
+ size_t start = -1;
+ size_t end = -1;
+
+ for (const auto& slice : slices) {
+ const auto& [batch, block_start, block_end] =
slice.components[table_index];
+ if (active_rb == NULL) {
+ active_rb = batch;
+ start = block_start;
+ end = block_end;
+ } else if (active_rb == batch && block_start == end) {
+ end = block_end;
+ } else {
+ flattened_blocks.push_back({active_rb, start, end});
+ active_rb = batch;
+ start = block_start;
+ end = block_end;
+ }
+ DCHECK_NE(active_rb, NULL);
+ }
+ // flush the last batch
+ flattened_blocks.push_back({active_rb, start, end});
+ return flattened_blocks;
+ }
+
template <class Type, class Builder = typename
arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
- const std::shared_ptr<arrow::DataType>& type, int i_col) {
+ const std::shared_ptr<arrow::DataType>& type, int i_col,
+ const std::optional<std::unordered_map<int,
std::vector<CompositeEntry>>>&
+ contiguous_blocks) {
+ const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+ if (contiguous_blocks.has_value() &&
+ contiguous_blocks.value().find(table_index) !=
contiguous_blocks.value().end()) {
+ return
materializeContiguous<Type>(contiguous_blocks.value().at(table_index), type,
+ column_index);
+ }
+ return materializeRowByRow<Type>(type, table_index, column_index);
+ }
+
+ /// If the source table is contiguous, we can take slices of the record batch
+ /// directly. This is much cheaper than copying the data row-by-row into an
output
+ /// array.
+ template <class Type, class Builder = typename
arrow::TypeTraits<Type>::BuilderType>
+ arrow::Result<std::shared_ptr<arrow::Array>> materializeContiguous(
+ const std::vector<CompositeEntry>& flattened_blocks,
+ const std::shared_ptr<arrow::DataType>& type, int column_index) {
+ if (Size() == 0) {
+ ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+ return builderPtr->Finish();
+ }
+
+ std::vector<std::shared_ptr<arrow::Array>> col;
+ col.reserve(flattened_blocks.size());
+ for (const auto& [rb, block_start, block_end] : flattened_blocks) {
+ auto chunk = rb->column(column_index)
+ ->Slice(block_start, /*length=*/(block_end -
block_start + 1));
+ DCHECK_EQ(chunk->type()->id(), type->id());
+ col.push_back(std::move(chunk));
+ }
+ return arrow::Concatenate(col);
Review Comment:
```suggestion
return arrow::Concatenate(col, pool);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]