This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 96b9388218b [opt](explode) Optimize explode_outer and posexplode
(#62069) (#62145)
96b9388218b is described below
commit 96b9388218be239ad1698ccfdce762fb50e9ac82
Author: TengJianPing <[email protected]>
AuthorDate: Tue Apr 7 18:53:50 2026 +0800
[opt](explode) Optimize explode_outer and posexplode (#62069) (#62145)
Issue Number: Pick #62069
Related PR: #xxx
Problem Summary:
Improvement of #60352, optimize explode_outer, posexplode and
posexplode_outer.
The approach is a single-pass algorithm: walk through child rows,
accumulating contiguous segments into the output, then when hitting a
null/empty row or reaching the end, flush the segment using bulk
operations. For outer-null rows, insert a NULL and copy the
non-table-function columns directly. This naturally handles both outer
and
non-outer modes since non-outer mode just won't produce any null
outputs. For posexplode, generate position indices alongside this.
Performance test result.
Create and populate a large test table. Use 100,000,000 base rows, each
with an array of 10 INT elements, producing 1,000,000,000 total exploded
output rows.
| Function | slow path | optimized | SpeedUp|
| --------- |---------- |-----------|----------|
| explode | 9734 ms | 5105 ms |90%|
| explode_outer | 9685 ms | 5064 ms |91%|
| posexplode | 11021 ms | 5963 ms |84%|
| posexplode_outer | 14088 ms | 5903 ms |138%|
None
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change. - [ ] No code files have been
changed. - [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/exec/operator/operator.h | 1 -
be/src/exec/operator/table_function_operator.cpp | 210 +++++--
be/src/exprs/table_function/table_function.h | 1 +
be/src/exprs/table_function/vexplode.cpp | 2 +-
be/src/exprs/table_function/vexplode_v2.cpp | 3 +-
.../exec/operator/analytic_sink_operator_test.cpp | 2 +-
...istinct_streaming_aggregation_operator_test.cpp | 2 +-
.../operator/exchange_source_operator_test.cpp | 2 +-
.../operator/partition_sort_sink_operator_test.cpp | 2 +-
.../exec/operator/query_cache_operator_test.cpp | 2 +-
be/test/exec/operator/repeat_operator_test.cpp | 2 +-
be/test/exec/operator/set_operator_test.cpp | 4 +-
be/test/exec/operator/sort_operator_test.cpp | 2 +-
.../exec/operator/table_function_operator_test.cpp | 639 ++++++++++++++++++++-
be/test/exec/operator/union_operator_test.cpp | 6 +-
.../exec/sink/arrow_result_block_buffer_test.cpp | 8 +-
be/test/exec/sink/result_block_buffer_test.cpp | 6 +-
be/test/testutil/mock/mock_runtime_state.h | 4 +-
18 files changed, 820 insertions(+), 78 deletions(-)
diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index 8c880b24331..2f403d275fd 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -805,7 +805,6 @@ public:
_resource_profile(tnode.resource_profile),
_limit(tnode.limit) {
if (tnode.__isset.output_tuple_id) {
- _output_row_descriptor.reset(new RowDescriptor(descs,
{tnode.output_tuple_id}, {true}));
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector
{true});
}
diff --git a/be/src/exec/operator/table_function_operator.cpp
b/be/src/exec/operator/table_function_operator.cpp
index 18a63571043..7be0c1d35ef 100644
--- a/be/src/exec/operator/table_function_operator.cpp
+++ b/be/src/exec/operator/table_function_operator.cpp
@@ -28,6 +28,8 @@
#include "core/block/block.h"
#include "core/block/column_numbers.h"
#include "core/column/column_nullable.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
#include "core/custom_allocator.h"
#include "exec/operator/operator.h"
#include "exprs/table_function/table_function_factory.h"
@@ -264,30 +266,156 @@ Status
TableFunctionLocalState::_get_expanded_block_block_fast_path(
const auto& offsets = *_block_fast_path_ctx.offsets_ptr;
const auto child_rows = cast_set<int64_t>(offsets.size());
- std::vector<uint32_t> row_ids;
- row_ids.reserve(remaining_capacity);
- uint64_t first_nested_idx = 0;
- uint64_t expected_next_nested_idx = 0;
- bool found_nested_range = false;
-
int64_t child_row = _block_fast_path_row;
uint64_t in_row_offset = _block_fast_path_in_row_offset;
int produced_rows = 0;
- while (produced_rows < remaining_capacity && child_row < child_rows) {
- if (_block_fast_path_ctx.array_nullmap_data &&
- _block_fast_path_ctx.array_nullmap_data[child_row]) {
- // NULL array row: skip it here. Slow path will handle output
semantics if needed.
- child_row++;
- in_row_offset = 0;
- continue;
+ const bool is_outer = _fns[0]->is_outer();
+ const bool is_posexplode = _block_fast_path_ctx.generate_row_index;
+ auto& out_col = columns[p._child_slots.size()];
+
+ // Decompose posexplode struct output column if needed
+ ColumnStruct* struct_col_ptr = nullptr;
+ ColumnUInt8* outer_struct_nullmap_ptr = nullptr;
+ IColumn* value_col_ptr = nullptr;
+ ColumnInt32* pos_col_ptr = nullptr;
+ if (is_posexplode) {
+ if (out_col->is_nullable()) {
+ auto* nullable = assert_cast<ColumnNullable*>(out_col.get());
+ struct_col_ptr =
assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
+ outer_struct_nullmap_ptr =
+
assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get());
+ } else {
+ struct_col_ptr = assert_cast<ColumnStruct*>(out_col.get());
+ }
+ pos_col_ptr =
assert_cast<ColumnInt32*>(&struct_col_ptr->get_column(0));
+ value_col_ptr = &struct_col_ptr->get_column(1);
+ }
+ // Segment tracking: accumulate contiguous nested ranges, flush on
boundaries.
+ // Array column offsets are monotonically non-decreasing, so nested data
across child rows
+ // is always contiguous (even with NULL/empty rows that contribute zero
elements).
+ struct ExpandSegmentContext {
+ std::vector<uint32_t>
+ seg_row_ids; // row ids of non table-function columns to
replicate for this segment
+ std::vector<int32_t>
+ seg_positions; // for posexplode, the position values to write
for this segment
+ int64_t seg_nested_start = -1; // start offset in the nested column of
this segment
+ int seg_nested_count =
+ 0; // number of nested rows in this segment (can be > child
row count due to multiple elements per row)
+ };
+ ExpandSegmentContext segment_ctx;
+ segment_ctx.seg_row_ids.reserve(remaining_capacity);
+ if (is_posexplode) {
+ segment_ctx.seg_positions.reserve(remaining_capacity);
+ }
+
+ auto reset_expand_segment_ctx = [&segment_ctx, is_posexplode]() {
+ segment_ctx.seg_nested_start = -1;
+ segment_ctx.seg_nested_count = 0;
+ segment_ctx.seg_row_ids.clear();
+ if (is_posexplode) {
+ segment_ctx.seg_positions.clear();
+ }
+ };
+
+ // Flush accumulated contiguous segment to output columns
+ auto flush_segment = [&]() {
+ if (segment_ctx.seg_nested_count == 0) {
+ return;
+ }
+
+ // Non-TF columns: replicate each child row for every output element
+ for (auto index : p._output_slot_indexs) {
+ auto src_column = _child_block->get_by_position(index).column;
+ columns[index]->insert_indices_from(
+ *src_column, segment_ctx.seg_row_ids.data(),
+ segment_ctx.seg_row_ids.data() +
segment_ctx.seg_row_ids.size());
+ }
+
+ if (is_posexplode) {
+ // Write positions
+ pos_col_ptr->insert_many_raw_data(
+ reinterpret_cast<const
char*>(segment_ctx.seg_positions.data()),
+ segment_ctx.seg_positions.size());
+ // Write nested values to the struct's value sub-column
+ DCHECK(value_col_ptr->is_nullable())
+ << "posexplode fast path requires nullable value column";
+ auto* val_nullable = assert_cast<ColumnNullable*>(value_col_ptr);
+ val_nullable->get_nested_column_ptr()->insert_range_from(
+ *_block_fast_path_ctx.nested_col,
segment_ctx.seg_nested_start,
+ segment_ctx.seg_nested_count);
+ auto* val_nullmap =
+
assert_cast<ColumnUInt8*>(val_nullable->get_null_map_column_ptr().get());
+ auto& val_nullmap_data = val_nullmap->get_data();
+ const size_t old_size = val_nullmap_data.size();
+ val_nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
+ if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
+ memcpy(val_nullmap_data.data() + old_size,
+ _block_fast_path_ctx.nested_nullmap_data +
segment_ctx.seg_nested_start,
+ segment_ctx.seg_nested_count * sizeof(UInt8));
+ } else {
+ memset(val_nullmap_data.data() + old_size, 0,
+ segment_ctx.seg_nested_count * sizeof(UInt8));
+ }
+ // Struct-level null map: these rows are not null
+ if (outer_struct_nullmap_ptr) {
+
outer_struct_nullmap_ptr->insert_many_defaults(segment_ctx.seg_nested_count);
+ }
+ } else if (out_col->is_nullable()) {
+ auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
+ out_nullable->get_nested_column_ptr()->insert_range_from(
+ *_block_fast_path_ctx.nested_col,
segment_ctx.seg_nested_start,
+ segment_ctx.seg_nested_count);
+ auto* nullmap_column =
+
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
+ auto& nullmap_data = nullmap_column->get_data();
+ const size_t old_size = nullmap_data.size();
+ nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
+ if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
+ memcpy(nullmap_data.data() + old_size,
+ _block_fast_path_ctx.nested_nullmap_data +
segment_ctx.seg_nested_start,
+ segment_ctx.seg_nested_count * sizeof(UInt8));
+ } else {
+ memset(nullmap_data.data() + old_size, 0,
+ segment_ctx.seg_nested_count * sizeof(UInt8));
+ }
+ } else {
+ out_col->insert_range_from(*_block_fast_path_ctx.nested_col,
+ segment_ctx.seg_nested_start,
segment_ctx.seg_nested_count);
}
+ reset_expand_segment_ctx();
+ };
+
+ // Emit one NULL output row for an outer-null/empty child row
+ auto emit_outer_null = [&](int64_t cr) {
+ for (auto index : p._output_slot_indexs) {
+ auto src_column = _child_block->get_by_position(index).column;
+ columns[index]->insert_from(*src_column, cr);
+ }
+ out_col->insert_default();
+ };
+ // Walk through child rows, accumulating contiguous segments into the
output,
+ // then when hitting a null/empty row or reaching the end,
+ // flush the segment using bulk operations.
+ // For outer-null rows, insert a NULL and copy the non-table-function
columns directly.
+ // This naturally handles both outer and non-outer modes since non-outer
mode
+ // just won't produce any null outputs.
+ // For posexplode, generate position indices alongside this.
+ while (produced_rows < remaining_capacity && child_row < child_rows) {
+ const bool is_null_row = _block_fast_path_ctx.array_nullmap_data &&
+
_block_fast_path_ctx.array_nullmap_data[child_row];
const uint64_t prev_off = child_row == 0 ? 0 : offsets[child_row - 1];
- const uint64_t cur_off = offsets[child_row];
+ const uint64_t cur_off = is_null_row ? prev_off : offsets[child_row];
const uint64_t nested_len = cur_off - prev_off;
- if (in_row_offset >= nested_len) {
+ if (is_null_row || in_row_offset >= nested_len) {
+ // for outer functions, emit null row for NULL or empty array rows
+ if (is_outer && in_row_offset == 0 && (is_null_row || nested_len
== 0)) {
+ flush_segment();
+ emit_outer_null(child_row);
+ produced_rows++;
+ }
child_row++;
in_row_offset = 0;
continue;
@@ -301,21 +429,28 @@ Status
TableFunctionLocalState::_get_expanded_block_block_fast_path(
DCHECK_LE(nested_start + take_count, cur_off);
DCHECK_LE(nested_start + take_count,
_block_fast_path_ctx.nested_col->size());
- if (!found_nested_range) {
- found_nested_range = true;
- first_nested_idx = nested_start;
- expected_next_nested_idx = nested_start;
+ if (segment_ctx.seg_nested_count == 0) {
+ segment_ctx.seg_nested_start = nested_start;
+ } else {
+ // Nested data from an array column is always contiguous: offsets
are monotonically
+ // non-decreasing, so skipping NULL/empty rows doesn't create gaps.
+ DCHECK_EQ(static_cast<uint64_t>(segment_ctx.seg_nested_start +
+ segment_ctx.seg_nested_count),
+ nested_start)
+ << "nested data must be contiguous across child rows";
}
- DCHECK_EQ(nested_start, expected_next_nested_idx);
// Map each produced output row back to its source child row for
copying non-table-function
// columns via insert_indices_from().
for (int j = 0; j < take_count; ++j) {
- row_ids.push_back(cast_set<uint32_t>(child_row));
+ segment_ctx.seg_row_ids.push_back(cast_set<uint32_t>(child_row));
+ if (is_posexplode) {
+
segment_ctx.seg_positions.push_back(cast_set<int32_t>(in_row_offset + j));
+ }
}
+ segment_ctx.seg_nested_count += take_count;
produced_rows += take_count;
- expected_next_nested_idx += take_count;
in_row_offset += take_count;
if (in_row_offset >= nested_len) {
child_row++;
@@ -323,35 +458,8 @@ Status
TableFunctionLocalState::_get_expanded_block_block_fast_path(
}
}
- if (produced_rows > 0) {
- for (auto index : p._output_slot_indexs) {
- auto src_column = _child_block->get_by_position(index).column;
- columns[index]->insert_indices_from(*src_column, row_ids.data(),
- row_ids.data() +
produced_rows);
- }
-
- auto& out_col = columns[p._child_slots.size()];
- if (out_col->is_nullable()) {
- auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
- out_nullable->get_nested_column_ptr()->insert_range_from(
- *_block_fast_path_ctx.nested_col, first_nested_idx,
produced_rows);
- auto* nullmap_column =
-
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
- auto& nullmap_data = nullmap_column->get_data();
- const size_t old_size = nullmap_data.size();
- nullmap_data.resize(old_size + produced_rows);
- if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
- memcpy(nullmap_data.data() + old_size,
- _block_fast_path_ctx.nested_nullmap_data +
first_nested_idx,
- produced_rows * sizeof(UInt8));
- } else {
- memset(nullmap_data.data() + old_size, 0, produced_rows *
sizeof(UInt8));
- }
- } else {
- out_col->insert_range_from(*_block_fast_path_ctx.nested_col,
first_nested_idx,
- produced_rows);
- }
- }
+ // Flush any remaining segment
+ flush_segment();
_block_fast_path_row = child_row;
_block_fast_path_in_row_offset = in_row_offset;
diff --git a/be/src/exprs/table_function/table_function.h
b/be/src/exprs/table_function/table_function.h
index be800080148..3c4bb7f1c7d 100644
--- a/be/src/exprs/table_function/table_function.h
+++ b/be/src/exprs/table_function/table_function.h
@@ -40,6 +40,7 @@ public:
const IColumn::Offsets64* offsets_ptr = nullptr;
ColumnPtr nested_col = nullptr;
const UInt8* nested_nullmap_data = nullptr;
+ bool generate_row_index = false;
};
virtual Status prepare() { return Status::OK(); }
diff --git a/be/src/exprs/table_function/vexplode.cpp
b/be/src/exprs/table_function/vexplode.cpp
index 07fa4d4355e..5fcad11f94c 100644
--- a/be/src/exprs/table_function/vexplode.cpp
+++ b/be/src/exprs/table_function/vexplode.cpp
@@ -94,7 +94,7 @@ Status VExplodeTableFunction::process_init(Block* block,
RuntimeState* state) {
}
bool VExplodeTableFunction::support_block_fast_path() const {
- return !_is_outer;
+ return true;
}
Status VExplodeTableFunction::prepare_block_fast_path(Block* /*block*/,
RuntimeState* /*state*/,
diff --git a/be/src/exprs/table_function/vexplode_v2.cpp
b/be/src/exprs/table_function/vexplode_v2.cpp
index 9aa42a84444..55be4fdf29a 100644
--- a/be/src/exprs/table_function/vexplode_v2.cpp
+++ b/be/src/exprs/table_function/vexplode_v2.cpp
@@ -109,7 +109,7 @@ Status VExplodeV2TableFunction::process_init(Block* block,
RuntimeState* state)
}
bool VExplodeV2TableFunction::support_block_fast_path() const {
- return !_is_outer && !_generate_row_index && _multi_detail.size() == 1;
+ return _multi_detail.size() == 1;
}
Status VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/,
RuntimeState* /*state*/,
@@ -123,6 +123,7 @@ Status
VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, Runtim
ctx->offsets_ptr = detail.offsets_ptr;
ctx->nested_col = detail.nested_col;
ctx->nested_nullmap_data = detail.nested_nullmap_data;
+ ctx->generate_row_index = _generate_row_index;
return Status::OK();
}
diff --git a/be/test/exec/operator/analytic_sink_operator_test.cpp
b/be/test/exec/operator/analytic_sink_operator_test.cpp
index 6729d6759da..a64d16c676f 100644
--- a/be/test/exec/operator/analytic_sink_operator_test.cpp
+++ b/be/test/exec/operator/analytic_sink_operator_test.cpp
@@ -56,7 +56,7 @@ struct AnalyticSinkOperatorTest : public ::testing::Test {
sink = std::make_unique<AnalyticSinkOperatorX>(&pool);
source = std::make_unique<AnalyticSourceOperatorX>();
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = batch_size;
+ state->_batch_size = batch_size;
std::cout << "AnalyticSinkOperatorTest::SetUp() batch_size: " <<
batch_size << std::endl;
_child_op = std::make_unique<MockAnalyticSinkOperator>();
for (int i = 0; i < batch_size; i++) {
diff --git
a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp
b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp
index b123cb60e34..88434e47fd7 100644
--- a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp
+++ b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp
@@ -34,7 +34,7 @@ struct DistinctStreamingAggOperatorTest : public
::testing::Test {
op = std::make_unique<DistinctStreamingAggOperatorX>();
mock_op = std::make_shared<MockOperatorX>();
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 10;
+ state->_batch_size = 10;
op->_child = mock_op;
}
diff --git a/be/test/exec/operator/exchange_source_operator_test.cpp
b/be/test/exec/operator/exchange_source_operator_test.cpp
index 543f5eafaa2..1f7abc4e00f 100644
--- a/be/test/exec/operator/exchange_source_operator_test.cpp
+++ b/be/test/exec/operator/exchange_source_operator_test.cpp
@@ -68,7 +68,7 @@ struct MockExchangeSourceLocalState : public
ExchangeLocalState {
struct ExchangeSourceOperatorXTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 10;
+ state->_batch_size = 10;
}
void create_op(int num_senders, bool is_merging, int offset, int limit) {
diff --git a/be/test/exec/operator/partition_sort_sink_operator_test.cpp
b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
index 5dc667bbae2..fa42cc7576b 100644
--- a/be/test/exec/operator/partition_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
@@ -51,7 +51,7 @@ private:
struct PartitionSortOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 10;
+ state->_batch_size = 10;
_child_op = std::make_unique<PartitionSortOperatorMockOperator>();
}
diff --git a/be/test/exec/operator/query_cache_operator_test.cpp
b/be/test/exec/operator/query_cache_operator_test.cpp
index 7f941281fe6..a99e9bcb9d9 100644
--- a/be/test/exec/operator/query_cache_operator_test.cpp
+++ b/be/test/exec/operator/query_cache_operator_test.cpp
@@ -50,7 +50,7 @@ private:
struct QueryCacheOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 10;
+ state->_batch_size = 10;
child_op = std::make_unique<QueryCacheMockChildOperator>();
query_cache_uptr.reset(QueryCache::create_global_cache(1024 * 1024 *
1024));
query_cache = query_cache_uptr.get();
diff --git a/be/test/exec/operator/repeat_operator_test.cpp
b/be/test/exec/operator/repeat_operator_test.cpp
index 781f14cc246..80363b8adf0 100644
--- a/be/test/exec/operator/repeat_operator_test.cpp
+++ b/be/test/exec/operator/repeat_operator_test.cpp
@@ -34,7 +34,7 @@ struct RepeatOperatorTest : public ::testing::Test {
op = std::make_unique<RepeatOperatorX>();
mock_op = std::make_shared<MockOperatorX>();
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 10;
+ state->_batch_size = 10;
op->_child = mock_op;
}
diff --git a/be/test/exec/operator/set_operator_test.cpp
b/be/test/exec/operator/set_operator_test.cpp
index d85e7a04c06..689f913e0e0 100644
--- a/be/test/exec/operator/set_operator_test.cpp
+++ b/be/test/exec/operator/set_operator_test.cpp
@@ -42,7 +42,7 @@ template <bool is_intersect>
struct SetOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 5;
+ state->_batch_size = 5;
}
void init_op(int child_size, DataTypes output_type) {
@@ -352,7 +352,7 @@ TEST_F(ExceptOperatorTest, test_build_not_ignore_null) {
TEST_F(ExceptOperatorTest, test_output_null_batsh_size) {
init_op(2,
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});
- state->batsh_size = 3; // set batch size to 3
+ state->_batch_size = 3; // set batch size to 3
sink_op->_child_exprs = MockSlotRef::create_mock_contexts(
DataTypes
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});
probe_sink_ops[0]->_child_exprs = MockSlotRef::create_mock_contexts(
diff --git a/be/test/exec/operator/sort_operator_test.cpp
b/be/test/exec/operator/sort_operator_test.cpp
index 4bb866242ab..a954d2d4898 100644
--- a/be/test/exec/operator/sort_operator_test.cpp
+++ b/be/test/exec/operator/sort_operator_test.cpp
@@ -50,7 +50,7 @@ private:
struct SortOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 10;
+ state->_batch_size = 10;
_child_op = std::make_unique<MockOperator>();
}
diff --git a/be/test/exec/operator/table_function_operator_test.cpp
b/be/test/exec/operator/table_function_operator_test.cpp
index b1ce616aaf7..bd7b108b18a 100644
--- a/be/test/exec/operator/table_function_operator_test.cpp
+++ b/be/test/exec/operator/table_function_operator_test.cpp
@@ -28,6 +28,7 @@
#include "core/block/block.h"
#include "core/column/column_array.h"
#include "core/column/column_nullable.h"
+#include "core/column/column_struct.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_nullable.h"
#include "exec/operator/operator_helper.h"
@@ -412,7 +413,7 @@ TEST_F(TableFunctionOperatorTest, block_fast_path_explode) {
}
TEST_F(TableFunctionOperatorTest, block_fast_path_explode_batch_truncate) {
- state->batsh_size = 2;
+ state->_batch_size = 2;
bool get_value_called = false;
auto int_type = std::make_shared<DataTypeInt32>();
auto arr_type = std::make_shared<DataTypeArray>(int_type);
@@ -655,7 +656,7 @@ TEST_F(TableFunctionOperatorTest,
block_fast_path_explode_nullable_array_misalig
TEST_F(TableFunctionOperatorTest,
block_fast_path_explode_nullable_array_partial_gap_uses_slow_path) {
- state->batsh_size = 2;
+ state->_batch_size = 2;
bool get_value_called = false;
auto int_type = std::make_shared<DataTypeInt32>();
auto arr_type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(int_type));
@@ -1081,7 +1082,7 @@ struct UnnestTest : public ::testing::Test {
obj_pool = std::make_unique<ObjectPool>();
query_ctx = generate_one_query();
runtime_profile = std::make_shared<RuntimeProfile>("test");
- runtime_state->batsh_size = 5;
+ runtime_state->_batch_size = 5;
runtime_state->_query_ctx = query_ctx.get();
runtime_state->_query_id = query_ctx->query_id();
runtime_state->resize_op_id_to_local_state(-100);
@@ -1237,6 +1238,98 @@ struct UnnestTest : public ::testing::Test {
// end of tplan_node_table_function_node
}
+ void setup_explode_plan_node(TPlanNode& tplan_node_table_function_node,
bool outer) {
+ // Set main TPlanNode properties
+ tplan_node_table_function_node.node_id = 0;
+ tplan_node_table_function_node.node_type =
TPlanNodeType::TABLE_FUNCTION_NODE;
+ tplan_node_table_function_node.num_children = 1;
+ tplan_node_table_function_node.limit = -1;
+
+ tplan_node_table_function_node.row_tuples.push_back(tuple0.id);
+ tplan_node_table_function_node.row_tuples.push_back(tuple1.id);
+ tplan_node_table_function_node.nullable_tuples.push_back(false);
+ tplan_node_table_function_node.nullable_tuples.push_back(false);
+ tplan_node_table_function_node.compact_data = false;
+ tplan_node_table_function_node.is_serial_operator = false;
+
+ // setup table function node
+
tplan_node_table_function_node.__set_table_function_node(TTableFunctionNode());
+
+ // setup fnCallExprList of table function node
+ TExpr fn_expr;
+
+ fn_expr.nodes.emplace_back();
+ fn_expr.nodes[0].node_type = TExprNodeType::FUNCTION_CALL;
+ fn_expr.nodes[0].type.types.emplace_back(type_node_int);
+ fn_expr.nodes[0].num_children = 1;
+
+ // setup TFunction of table function node
+ fn_expr.nodes[0].__set_fn(TFunction());
+ fn_expr.nodes[0].fn.__set_name(TFunctionName());
+ fn_expr.nodes[0].fn.name.function_name = outer ? "explode_outer" :
"explode";
+
+ fn_expr.nodes[0].fn.arg_types.emplace_back(type_desc_array_int);
+ fn_expr.nodes[0].fn.ret_type.types.emplace_back(type_node_int);
+
+ fn_expr.nodes[0].fn.has_var_args = false;
+ fn_expr.nodes[0].fn.signature = outer ? "explode_outer(array<int>)" :
"explode(array<int>)";
+ fn_expr.nodes[0].fn.__set_scalar_fn(TScalarFunction());
+ fn_expr.nodes[0].fn.scalar_fn.symbol = "";
+ fn_expr.nodes[0].fn.id = 0;
+ fn_expr.nodes[0].fn.vectorized = true;
+ fn_expr.nodes[0].fn.is_udtf_function = false;
+ fn_expr.nodes[0].fn.is_static_load = false;
+ fn_expr.nodes[0].fn.expiration_time = 360;
+ fn_expr.nodes[0].is_nullable = true;
+
+ // explode input slot ref: array<int>
+ fn_expr.nodes.emplace_back();
+ fn_expr.nodes[1].node_type = TExprNodeType::SLOT_REF;
+ fn_expr.nodes[1].type = type_desc_array_int;
+ fn_expr.nodes[1].num_children = 0;
+ fn_expr.nodes[1].__set_slot_ref(TSlotRef());
+ fn_expr.nodes[1].slot_ref = slot_ref_tags;
+ fn_expr.nodes[1].output_scale = -1;
+ fn_expr.nodes[1].is_nullable = true;
+ fn_expr.nodes[1].label = "tags";
+
+
tplan_node_table_function_node.table_function_node.fnCallExprList.push_back(fn_expr);
+
+ // Set output slot IDs
+
tplan_node_table_function_node.table_function_node.outputSlotIds.push_back(slot_desc_id.id);
+
tplan_node_table_function_node.table_function_node.outputSlotIds.push_back(
+ slot_desc_unnest_tag.id);
+
+ // Set projections
+ TExpr texpr_proj0;
+ texpr_proj0.nodes.emplace_back();
+ texpr_proj0.nodes[0].node_type = TExprNodeType::SLOT_REF;
+ texpr_proj0.nodes[0].type = type_desc_int;
+ texpr_proj0.nodes[0].num_children = 0;
+ texpr_proj0.nodes[0].__set_slot_ref(TSlotRef());
+ texpr_proj0.nodes[0].slot_ref = slot_ref_id;
+ texpr_proj0.nodes[0].output_scale = -1;
+ texpr_proj0.nodes[0].is_nullable = true;
+ texpr_proj0.nodes[0].label = "id";
+
+ TExpr texpr_proj1;
+ texpr_proj1.nodes.emplace_back();
+ texpr_proj1.nodes[0].node_type = TExprNodeType::SLOT_REF;
+ texpr_proj1.nodes[0].type = type_desc_int;
+ texpr_proj1.nodes[0].num_children = 0;
+ texpr_proj1.nodes[0].__set_slot_ref(TSlotRef());
+ texpr_proj1.nodes[0].slot_ref = slot_ref_unnest_tag;
+ texpr_proj1.nodes[0].output_scale = -1;
+ texpr_proj1.nodes[0].is_nullable = true;
+ texpr_proj1.nodes[0].label = "tag";
+
+ tplan_node_table_function_node.projections.push_back(texpr_proj0);
+ tplan_node_table_function_node.projections.push_back(texpr_proj1);
+ tplan_node_table_function_node.output_tuple_id = tuple2.id;
+ tplan_node_table_function_node.nereids_id = 144;
+ // end of tplan_node_table_function_node
+ }
+
std::shared_ptr<TableFunctionOperatorX> create_test_operators(
DescriptorTbl* desc_tbl, const TPlanNode&
tplan_node_table_function_node) {
runtime_state->set_desc_tbl(desc_tbl);
@@ -1529,4 +1622,544 @@ TEST_F(UnnestTest, outer) {
EXPECT_TRUE(ColumnHelper::block_equal(output_block,
expected_output_block));
}
}
+
+// Test inner mode fast path with NULL and empty arrays (they should be
skipped)
+TEST_F(UnnestTest, inner_with_nulls_fast_path) {
+ TDescriptorTable desc_table;
+ desc_table.tupleDescriptors.push_back(tuple0);
+ desc_table.tupleDescriptors.push_back(tuple1);
+ desc_table.tupleDescriptors.push_back(tuple2);
+ desc_table.slotDescriptors.push_back(slot_desc_id);
+ desc_table.slotDescriptors.push_back(slot_desc_tags);
+ desc_table.slotDescriptors.push_back(slot_desc_unnest_tag);
+ desc_table.slotDescriptors.push_back(slot_desc_id_2);
+ desc_table.slotDescriptors.push_back(slot_desc_unnest_tag2);
+
+ setup_exec_env();
+ runtime_state->_batch_size = 4096;
+
+ TPlanNode tplan_node;
+ setup_explode_plan_node(tplan_node, false); // inner, no conjuncts → fast
path
+
+ DescriptorTbl* desc_tbl;
+ auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+ EXPECT_TRUE(st.ok());
+
+ DataTypePtr data_type_int(std::make_shared<DataTypeInt32>());
+ auto data_type_int_nullable = make_nullable(data_type_int);
+ DataTypePtr
data_type_array_type(std::make_shared<DataTypeArray>(data_type_int_nullable));
+ auto data_type_array_type_nullable = make_nullable(data_type_array_type);
+
+ // Build input: ids=[1,2,3,4,5], arrays=[[10,20,30], NULL, [40,50], [],
[60]]
+ auto build_input = [&]() {
+ auto result_block = std::make_unique<Block>();
+
+ auto id_column = ColumnInt32::create();
+ for (int32_t id : {1, 2, 3, 4, 5}) {
+ id_column->insert_data((const char*)(&id), 0);
+ }
+
result_block->insert(ColumnWithTypeAndName(make_nullable(std::move(id_column)),
+ data_type_int_nullable,
"id"));
+
+ auto arr_data = ColumnInt32::create();
+ auto arr_offsets = ColumnOffset64::create();
+ auto arr_nullmap = ColumnUInt8::create();
+
+ // Row 0: [10, 20, 30]
+ for (int32_t v : {10, 20, 30}) {
+ arr_data->insert_data((const char*)(&v), 0);
+ }
+ ColumnArray::Offset64 off = 3;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 1: NULL
+ arr_offsets->insert_data((const char*)(&off), 0); // same offset
+ arr_nullmap->get_data().push_back(1);
+
+ // Row 2: [40, 50]
+ for (int32_t v : {40, 50}) {
+ arr_data->insert_data((const char*)(&v), 0);
+ }
+ off = 5;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 3: []
+ arr_offsets->insert_data((const char*)(&off), 0); // same offset
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 4: [60]
+ int32_t v60 = 60;
+ arr_data->insert_data((const char*)(&v60), 0);
+ off = 6;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ auto array_column =
+ ColumnArray::create(make_nullable(std::move(arr_data)),
std::move(arr_offsets));
+ auto nullable_array =
+ ColumnNullable::create(std::move(array_column),
std::move(arr_nullmap));
+ result_block->insert(ColumnWithTypeAndName(std::move(nullable_array),
+
data_type_array_type_nullable, "tags"));
+ return result_block;
+ };
+
+ {
+ auto table_func_op = create_test_operators(desc_tbl, tplan_node);
+ auto* local_state =
runtime_state->get_local_state(table_func_op->operator_id());
+ auto* tfl = dynamic_cast<TableFunctionLocalState*>(local_state);
+
+ tfl->_child_block = build_input();
+ tfl->_child_eos = true;
+
+ st = table_func_op->push(runtime_state.get(), tfl->_child_block.get(),
tfl->_child_eos);
+ ASSERT_TRUE(st.ok()) << "push failed: " << st.to_string();
+
+ bool eos = false;
+ Block output_block;
+ st = table_func_op->pull(runtime_state.get(), &output_block, &eos);
+ ASSERT_TRUE(st.ok()) << "pull failed: " << st.to_string();
+
+ // Expected: 6 rows — NULL row (id=2) and empty row (id=4) are skipped
+ EXPECT_EQ(output_block.rows(), 6);
+
+ // Verify ids: [1,1,1,3,3,5]
+ auto id_col = output_block.get_by_position(0).column;
+ auto check_id = [&](size_t row, int32_t expected) {
+ auto field = (*id_col)[row];
+ EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+ };
+ check_id(0, 1);
+ check_id(1, 1);
+ check_id(2, 1);
+ check_id(3, 3);
+ check_id(4, 3);
+ check_id(5, 5);
+
+ // Verify tags: [10,20,30,40,50,60]
+ auto tag_col = output_block.get_by_position(2).column;
+ auto check_tag = [&](size_t row, int32_t expected) {
+ auto field = (*tag_col)[row];
+ EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+ };
+ check_tag(0, 10);
+ check_tag(1, 20);
+ check_tag(2, 30);
+ check_tag(3, 40);
+ check_tag(4, 50);
+ check_tag(5, 60);
+ }
+}
+
+// Test outer mode fast path with NULL and empty arrays (should emit NULL rows)
+TEST_F(UnnestTest, outer_with_nulls_fast_path) {
+ TDescriptorTable desc_table;
+ desc_table.tupleDescriptors.push_back(tuple0);
+ desc_table.tupleDescriptors.push_back(tuple1);
+ desc_table.tupleDescriptors.push_back(tuple2);
+ desc_table.slotDescriptors.push_back(slot_desc_id);
+ desc_table.slotDescriptors.push_back(slot_desc_tags);
+ desc_table.slotDescriptors.push_back(slot_desc_unnest_tag);
+ desc_table.slotDescriptors.push_back(slot_desc_id_2);
+ desc_table.slotDescriptors.push_back(slot_desc_unnest_tag2);
+
+ setup_exec_env();
+ runtime_state->_batch_size = 4096;
+
+ TPlanNode tplan_node;
+ setup_explode_plan_node(tplan_node, true); // outer, no conjuncts → fast
path
+
+ DescriptorTbl* desc_tbl;
+ auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+ EXPECT_TRUE(st.ok());
+
+ DataTypePtr data_type_int(std::make_shared<DataTypeInt32>());
+ auto data_type_int_nullable = make_nullable(data_type_int);
+ DataTypePtr
data_type_array_type(std::make_shared<DataTypeArray>(data_type_int_nullable));
+ auto data_type_array_type_nullable = make_nullable(data_type_array_type);
+
+ // Build input: ids=[1,2,3,4,5], arrays=[[10,20], NULL, [30], [], [40,50]]
+ auto build_input = [&]() {
+ auto result_block = std::make_unique<Block>();
+
+ auto id_column = ColumnInt32::create();
+ for (int32_t id : {1, 2, 3, 4, 5}) {
+ id_column->insert_data((const char*)(&id), 0);
+ }
+
result_block->insert(ColumnWithTypeAndName(make_nullable(std::move(id_column)),
+ data_type_int_nullable,
"id"));
+
+ auto arr_data = ColumnInt32::create();
+ auto arr_offsets = ColumnOffset64::create();
+ auto arr_nullmap = ColumnUInt8::create();
+
+ // Row 0: [10, 20]
+ for (int32_t v : {10, 20}) {
+ arr_data->insert_data((const char*)(&v), 0);
+ }
+ ColumnArray::Offset64 off = 2;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 1: NULL
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(1);
+
+ // Row 2: [30]
+ int32_t v30 = 30;
+ arr_data->insert_data((const char*)(&v30), 0);
+ off = 3;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 3: []
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 4: [40, 50]
+ for (int32_t v : {40, 50}) {
+ arr_data->insert_data((const char*)(&v), 0);
+ }
+ off = 5;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ auto array_column =
+ ColumnArray::create(make_nullable(std::move(arr_data)),
std::move(arr_offsets));
+ auto nullable_array =
+ ColumnNullable::create(std::move(array_column),
std::move(arr_nullmap));
+ result_block->insert(ColumnWithTypeAndName(std::move(nullable_array),
+
data_type_array_type_nullable, "tags"));
+ return result_block;
+ };
+
+ {
+ auto table_func_op = create_test_operators(desc_tbl, tplan_node);
+ auto* local_state =
runtime_state->get_local_state(table_func_op->operator_id());
+ auto* tfl = dynamic_cast<TableFunctionLocalState*>(local_state);
+
+ tfl->_child_block = build_input();
+ tfl->_child_eos = true;
+
+ st = table_func_op->push(runtime_state.get(), tfl->_child_block.get(),
tfl->_child_eos);
+ ASSERT_TRUE(st.ok()) << "push failed: " << st.to_string();
+
+ bool eos = false;
+ Block output_block;
+ st = table_func_op->pull(runtime_state.get(), &output_block, &eos);
+ ASSERT_TRUE(st.ok()) << "pull failed: " << st.to_string();
+
+ // Expected: 7 rows
+ // (1,10), (1,20), (2,NULL), (3,30), (4,NULL), (5,40), (5,50)
+ EXPECT_EQ(output_block.rows(), 7);
+
+ // Verify ids: [1,1,2,3,4,5,5]
+ auto id_col = output_block.get_by_position(0).column;
+ auto check_id = [&](size_t row, int32_t expected) {
+ auto field = (*id_col)[row];
+ EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+ };
+ check_id(0, 1);
+ check_id(1, 1);
+ check_id(2, 2);
+ check_id(3, 3);
+ check_id(4, 4);
+ check_id(5, 5);
+ check_id(6, 5);
+
+ // Verify tags: [10, 20, NULL, 30, NULL, 40, 50]
+ auto tag_col = output_block.get_by_position(2).column;
+ auto check_tag = [&](size_t row, int32_t expected) {
+ auto field = (*tag_col)[row];
+ EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+ };
+ auto check_null = [&](size_t row) {
+ EXPECT_TRUE(tag_col->is_null_at(row)) << "row " << row << " should
be null";
+ };
+ check_tag(0, 10);
+ check_tag(1, 20);
+ check_null(2);
+ check_tag(3, 30);
+ check_null(4);
+ check_tag(5, 40);
+ check_tag(6, 50);
+ }
+}
+// Test posexplode inner mode fast path with NULL and empty arrays
+TEST_F(UnnestTest, posexplode_with_nulls_fast_path) {
+ // Build struct type descriptor for posexplode output:
+ // STRUCT(pos: INT NOT NULL, val: NULLABLE<INT>)
+ TTypeNode type_node_struct;
+ type_node_struct.type = TTypeNodeType::STRUCT;
+ {
+ TStructField sf_pos;
+ sf_pos.__set_name("pos");
+ sf_pos.__set_contains_null(false);
+ TStructField sf_val;
+ sf_val.__set_name("val");
+ sf_val.__set_contains_null(true);
+ type_node_struct.__set_struct_fields({sf_pos, sf_val});
+ }
+
+ TTypeDesc type_desc_struct;
+ type_desc_struct.types.emplace_back(type_node_struct);
+ type_desc_struct.types.emplace_back(type_node_int); // pos sub-type
+ type_desc_struct.types.emplace_back(type_node_int); // val sub-type
+ type_desc_struct.byte_size = -1;
+
+ // Create new tuple/slot descriptors for posexplode using continuing IDs
+ // After constructor: slot_id=5, tuple_id=3, col_unique_id=3
+ TTupleDescriptor pe_tuple_tf;
+ pe_tuple_tf.id = tuple_id++;
+ pe_tuple_tf.byteSize = 0;
+ pe_tuple_tf.numNullBytes = 0;
+
+ TTupleDescriptor pe_tuple_proj;
+ pe_tuple_proj.id = tuple_id++;
+ pe_tuple_proj.byteSize = 0;
+ pe_tuple_proj.numNullBytes = 0;
+
+ // TF output slot: struct type
+ TSlotRef pe_slot_ref_out;
+ pe_slot_ref_out.slot_id = slot_id++;
+ pe_slot_ref_out.tuple_id = pe_tuple_tf.id;
+ pe_slot_ref_out.col_unique_id = -1;
+ pe_slot_ref_out.is_virtual_slot = false;
+
+ TSlotDescriptor pe_slot_desc_out;
+ pe_slot_desc_out.id = pe_slot_ref_out.slot_id;
+ pe_slot_desc_out.parent = pe_tuple_tf.id;
+ pe_slot_desc_out.slotType = type_desc_struct;
+ pe_slot_desc_out.columnPos = -1;
+ pe_slot_desc_out.byteOffset = 0;
+ pe_slot_desc_out.nullIndicatorByte = 0;
+ pe_slot_desc_out.nullIndicatorBit = 0;
+ pe_slot_desc_out.colName = "posexplode#3";
+ pe_slot_desc_out.slotIdx = -1;
+ pe_slot_desc_out.isMaterialized = true;
+ pe_slot_desc_out.col_unique_id = -1;
+ pe_slot_desc_out.is_key = false;
+ pe_slot_desc_out.need_materialize = true;
+ pe_slot_desc_out.is_auto_increment = false;
+ pe_slot_desc_out.primitive_type = TPrimitiveType::STRUCT;
+
+ // Projection slots
+ TSlotDescriptor pe_slot_desc_id_proj = slot_desc_id;
+ pe_slot_desc_id_proj.id = slot_id++;
+ pe_slot_desc_id_proj.parent = pe_tuple_proj.id;
+
+ TSlotDescriptor pe_slot_desc_out_proj = pe_slot_desc_out;
+ pe_slot_desc_out_proj.id = slot_id++;
+ pe_slot_desc_out_proj.parent = pe_tuple_proj.id;
+
+ TDescriptorTable desc_table;
+ desc_table.tupleDescriptors.push_back(tuple0);
+ desc_table.tupleDescriptors.push_back(pe_tuple_tf);
+ desc_table.tupleDescriptors.push_back(pe_tuple_proj);
+ desc_table.slotDescriptors.push_back(slot_desc_id);
+ desc_table.slotDescriptors.push_back(slot_desc_tags);
+ desc_table.slotDescriptors.push_back(pe_slot_desc_out);
+ desc_table.slotDescriptors.push_back(pe_slot_desc_id_proj);
+ desc_table.slotDescriptors.push_back(pe_slot_desc_out_proj);
+
+ setup_exec_env();
+ runtime_state->_batch_size = 4096;
+
+ // Build plan node for posexplode (inner mode, no conjuncts → fast path)
+ TPlanNode tplan_node;
+ {
+ tplan_node.node_id = 0;
+ tplan_node.node_type = TPlanNodeType::TABLE_FUNCTION_NODE;
+ tplan_node.num_children = 1;
+ tplan_node.limit = -1;
+
+ tplan_node.row_tuples.push_back(tuple0.id);
+ tplan_node.row_tuples.push_back(pe_tuple_tf.id);
+ tplan_node.nullable_tuples.push_back(false);
+ tplan_node.nullable_tuples.push_back(false);
+ tplan_node.compact_data = false;
+ tplan_node.is_serial_operator = false;
+
+ tplan_node.__set_table_function_node(TTableFunctionNode());
+
+ TExpr fn_expr;
+ fn_expr.nodes.emplace_back();
+ fn_expr.nodes[0].node_type = TExprNodeType::FUNCTION_CALL;
+ fn_expr.nodes[0].type.types.emplace_back(type_node_struct);
+ fn_expr.nodes[0].type.types.emplace_back(type_node_int);
+ fn_expr.nodes[0].type.types.emplace_back(type_node_int);
+ fn_expr.nodes[0].num_children = 1;
+
+ fn_expr.nodes[0].__set_fn(TFunction());
+ fn_expr.nodes[0].fn.__set_name(TFunctionName());
+ fn_expr.nodes[0].fn.name.function_name = "posexplode";
+ fn_expr.nodes[0].fn.arg_types.emplace_back(type_desc_array_int);
+ fn_expr.nodes[0].fn.ret_type = type_desc_struct;
+ fn_expr.nodes[0].fn.has_var_args = false;
+ fn_expr.nodes[0].fn.signature = "posexplode(array<int>)";
+ fn_expr.nodes[0].fn.__set_scalar_fn(TScalarFunction());
+ fn_expr.nodes[0].fn.scalar_fn.symbol = "";
+ fn_expr.nodes[0].fn.id = 0;
+ fn_expr.nodes[0].fn.vectorized = true;
+ fn_expr.nodes[0].fn.is_udtf_function = false;
+ fn_expr.nodes[0].fn.is_static_load = false;
+ fn_expr.nodes[0].fn.expiration_time = 360;
+ fn_expr.nodes[0].is_nullable = true;
+
+ fn_expr.nodes.emplace_back();
+ fn_expr.nodes[1].node_type = TExprNodeType::SLOT_REF;
+ fn_expr.nodes[1].type = type_desc_array_int;
+ fn_expr.nodes[1].num_children = 0;
+ fn_expr.nodes[1].__set_slot_ref(TSlotRef());
+ fn_expr.nodes[1].slot_ref = slot_ref_tags;
+ fn_expr.nodes[1].output_scale = -1;
+ fn_expr.nodes[1].is_nullable = true;
+ fn_expr.nodes[1].label = "tags";
+
+ tplan_node.table_function_node.fnCallExprList.push_back(fn_expr);
+
+
tplan_node.table_function_node.outputSlotIds.push_back(slot_desc_id.id);
+
tplan_node.table_function_node.outputSlotIds.push_back(pe_slot_desc_out.id);
+
+ // Projections
+ TExpr texpr_proj0;
+ texpr_proj0.nodes.emplace_back();
+ texpr_proj0.nodes[0].node_type = TExprNodeType::SLOT_REF;
+ texpr_proj0.nodes[0].type = type_desc_int;
+ texpr_proj0.nodes[0].num_children = 0;
+ texpr_proj0.nodes[0].__set_slot_ref(TSlotRef());
+ texpr_proj0.nodes[0].slot_ref = slot_ref_id;
+ texpr_proj0.nodes[0].output_scale = -1;
+ texpr_proj0.nodes[0].is_nullable = true;
+ texpr_proj0.nodes[0].label = "id";
+
+ TExpr texpr_proj1;
+ texpr_proj1.nodes.emplace_back();
+ texpr_proj1.nodes[0].node_type = TExprNodeType::SLOT_REF;
+ texpr_proj1.nodes[0].type = type_desc_struct;
+ texpr_proj1.nodes[0].num_children = 0;
+ texpr_proj1.nodes[0].__set_slot_ref(TSlotRef());
+ texpr_proj1.nodes[0].slot_ref = pe_slot_ref_out;
+ texpr_proj1.nodes[0].output_scale = -1;
+ texpr_proj1.nodes[0].is_nullable = true;
+ texpr_proj1.nodes[0].label = "posexplode";
+
+ tplan_node.projections.push_back(texpr_proj0);
+ tplan_node.projections.push_back(texpr_proj1);
+ tplan_node.output_tuple_id = pe_tuple_proj.id;
+ tplan_node.nereids_id = 144;
+ }
+
+ DescriptorTbl* desc_tbl;
+ auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+ EXPECT_TRUE(st.ok());
+
+ DataTypePtr data_type_int(std::make_shared<DataTypeInt32>());
+ auto data_type_int_nullable = make_nullable(data_type_int);
+ DataTypePtr
data_type_array_type(std::make_shared<DataTypeArray>(data_type_int_nullable));
+ auto data_type_array_type_nullable = make_nullable(data_type_array_type);
+
+ // Build input: ids=[1,2,3,4], arrays=[[10,20], NULL, [30], []]
+ auto build_input = [&]() {
+ auto result_block = std::make_unique<Block>();
+
+ auto id_column = ColumnInt32::create();
+ for (int32_t id : {1, 2, 3, 4}) {
+ id_column->insert_data((const char*)(&id), 0);
+ }
+
result_block->insert(ColumnWithTypeAndName(make_nullable(std::move(id_column)),
+ data_type_int_nullable,
"id"));
+
+ auto arr_data = ColumnInt32::create();
+ auto arr_offsets = ColumnOffset64::create();
+ auto arr_nullmap = ColumnUInt8::create();
+
+ // Row 0: [10, 20]
+ for (int32_t v : {10, 20}) {
+ arr_data->insert_data((const char*)(&v), 0);
+ }
+ ColumnArray::Offset64 off = 2;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 1: NULL
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(1);
+
+ // Row 2: [30]
+ int32_t v30 = 30;
+ arr_data->insert_data((const char*)(&v30), 0);
+ off = 3;
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ // Row 3: []
+ arr_offsets->insert_data((const char*)(&off), 0);
+ arr_nullmap->get_data().push_back(0);
+
+ auto array_column =
+ ColumnArray::create(make_nullable(std::move(arr_data)),
std::move(arr_offsets));
+ auto nullable_array =
+ ColumnNullable::create(std::move(array_column),
std::move(arr_nullmap));
+ result_block->insert(ColumnWithTypeAndName(std::move(nullable_array),
+
data_type_array_type_nullable, "tags"));
+ return result_block;
+ };
+
+ {
+ auto table_func_op = create_test_operators(desc_tbl, tplan_node);
+ auto* local_state =
runtime_state->get_local_state(table_func_op->operator_id());
+ auto* tfl = dynamic_cast<TableFunctionLocalState*>(local_state);
+
+ tfl->_child_block = build_input();
+ tfl->_child_eos = true;
+
+ st = table_func_op->push(runtime_state.get(), tfl->_child_block.get(),
tfl->_child_eos);
+ ASSERT_TRUE(st.ok()) << "push failed: " << st.to_string();
+
+ bool eos = false;
+ Block output_block;
+ st = table_func_op->pull(runtime_state.get(), &output_block, &eos);
+ ASSERT_TRUE(st.ok()) << "pull failed: " << st.to_string();
+
+ // Expected: 3 rows (NULL and empty arrays are skipped in inner mode)
+ // Row 0: id=1, struct=(pos=0, val=10)
+ // Row 1: id=1, struct=(pos=1, val=20)
+ // Row 2: id=3, struct=(pos=0, val=30)
+ EXPECT_EQ(output_block.rows(), 3);
+
+ // Verify ids: [1, 1, 3]
+ auto id_col = output_block.get_by_position(0).column;
+ auto check_id = [&](size_t row, int32_t expected) {
+ auto field = (*id_col)[row];
+ EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+ };
+ check_id(0, 1);
+ check_id(1, 1);
+ check_id(2, 3);
+
+ // Verify struct output column at position 2
+ auto out_col = output_block.get_by_position(2).column;
+ auto* nullable_out = assert_cast<const ColumnNullable*>(out_col.get());
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_FALSE(nullable_out->is_null_at(i))
+ << "struct row " << i << " should not be null";
+ }
+ auto* struct_col = assert_cast<const
ColumnStruct*>(&nullable_out->get_nested_column());
+ auto* pos_col = assert_cast<const
ColumnInt32*>(&struct_col->get_column(0));
+ auto* val_nullable = assert_cast<const
ColumnNullable*>(&struct_col->get_column(1));
+ auto* val_col = assert_cast<const
ColumnInt32*>(&val_nullable->get_nested_column());
+
+ // Verify positions: [0, 1, 0]
+ EXPECT_EQ(pos_col->get_element(0), 0);
+ EXPECT_EQ(pos_col->get_element(1), 1);
+ EXPECT_EQ(pos_col->get_element(2), 0);
+
+ // Verify values: [10, 20, 30]
+ EXPECT_EQ(val_col->get_element(0), 10);
+ EXPECT_EQ(val_col->get_element(1), 20);
+ EXPECT_EQ(val_col->get_element(2), 30);
+ }
+}
} // namespace doris
diff --git a/be/test/exec/operator/union_operator_test.cpp
b/be/test/exec/operator/union_operator_test.cpp
index f57c34d1be3..3009e977a47 100644
--- a/be/test/exec/operator/union_operator_test.cpp
+++ b/be/test/exec/operator/union_operator_test.cpp
@@ -51,7 +51,7 @@ struct MockUnionSinkOperator : public UnionSinkOperatorX {
struct UnionOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
- state->batsh_size = 10;
+ state->_batch_size = 10;
for (int i = 0; i < child_size; i++) {
sink_state.push_back(std::make_shared<MockRuntimeState>());
sink_ops.push_back(nullptr);
@@ -75,7 +75,7 @@ struct UnionOperatorTest : public ::testing::Test {
};
TEST_F(UnionOperatorTest, test_all_const_expr) {
- state->batsh_size = 2;
+ state->_batch_size = 2;
source_op.reset(new MockUnionSourceOperator {
0,
{std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeInt64>(),
@@ -254,7 +254,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) {
{
for (int i = 0; i < child_size; i++) {
- sink_state[i]->batsh_size = 2;
+ sink_state[i]->_batch_size = 2;
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2},
{3, 4});
EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
}
diff --git a/be/test/exec/sink/arrow_result_block_buffer_test.cpp
b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
index a87a03d1542..46d7daf6c37 100644
--- a/be/test/exec/sink/arrow_result_block_buffer_test.cpp
+++ b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
@@ -76,7 +76,7 @@ private:
TEST_F(ArrowResultBlockBufferTest, TestArrowResultBlockBuffer) {
MockRuntimeState state;
- state.batsh_size = 1;
+ state._batch_size = 1;
int buffer_size = 16;
auto dep = Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
@@ -203,7 +203,7 @@ TEST_F(ArrowResultBlockBufferTest,
TestArrowResultBlockBuffer) {
TEST_F(ArrowResultBlockBufferTest, TestCancelArrowResultBlockBuffer) {
MockRuntimeState state;
- state.batsh_size = 1;
+ state._batch_size = 1;
int buffer_size = 16;
auto dep = Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
@@ -277,7 +277,7 @@ TEST_F(ArrowResultBlockBufferTest,
TestCancelArrowResultBlockBuffer) {
TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
MockRuntimeState state;
- state.batsh_size = 1;
+ state._batch_size = 1;
int buffer_size = 16;
auto dep = Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
@@ -340,7 +340,7 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
TEST_F(ArrowResultBlockBufferTest, TestArrowResultSerializeFailure) {
MockRuntimeState state;
- state.batsh_size = 1;
+ state._batch_size = 1;
int buffer_size = 16;
auto dep = Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
diff --git a/be/test/exec/sink/result_block_buffer_test.cpp
b/be/test/exec/sink/result_block_buffer_test.cpp
index 46ea26c3d79..065472cd63e 100644
--- a/be/test/exec/sink/result_block_buffer_test.cpp
+++ b/be/test/exec/sink/result_block_buffer_test.cpp
@@ -63,7 +63,7 @@ private:
TEST_F(MysqlResultBlockBufferTest, TestMySQLResultBlockBuffer) {
MockRuntimeState state;
- state.batsh_size = 1;
+ state._batch_size = 1;
int buffer_size = 16;
auto dep = Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
@@ -190,7 +190,7 @@ TEST_F(MysqlResultBlockBufferTest,
TestMySQLResultBlockBuffer) {
TEST_F(MysqlResultBlockBufferTest, TestCancelMySQLResultBlockBuffer) {
MockRuntimeState state;
- state.batsh_size = 1;
+ state._batch_size = 1;
int buffer_size = 16;
auto dep = Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
@@ -265,7 +265,7 @@ TEST_F(MysqlResultBlockBufferTest,
TestCancelMySQLResultBlockBuffer) {
TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
MockRuntimeState state;
- state.batsh_size = 1;
+ state._batch_size = 1;
int buffer_size = 16;
auto dep = Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
diff --git a/be/test/testutil/mock/mock_runtime_state.h
b/be/test/testutil/mock/mock_runtime_state.h
index a9ea7eaf89f..f1dbc16a74b 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_runtime_state.h
@@ -54,7 +54,7 @@ public:
ExecEnv* exec_env, QueryContext* ctx)
: RuntimeState(query_id, fragment_id, query_options,
query_globals, exec_env, ctx) {}
- int batch_size() const override { return batsh_size; }
+ int batch_size() const override { return _batch_size; }
bool enable_shared_exchange_sink_buffer() const override {
return _enable_shared_exchange_sink_buffer;
@@ -72,7 +72,7 @@ public:
WorkloadGroupPtr workload_group() override { return _workload_group; }
// default batch size
- int batsh_size = 4096;
+ int _batch_size = 4096;
bool _enable_shared_exchange_sink_buffer = true;
bool _enable_share_hash_table_for_broadcast_join = true;
std::shared_ptr<MockContext> _mock_context =
std::make_shared<MockContext>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]