This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new a133581f24d branch-4.0: [fix](iterator) Use explicit output schema in
new_merge_iterator and new_union_iterator #60772 (#60804)
a133581f24d is described below
commit a133581f24d45ff63d61f01d25aaa943b4699b48
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 25 10:04:49 2026 +0800
branch-4.0: [fix](iterator) Use explicit output schema in
new_merge_iterator and new_union_iterator #60772 (#60804)
Cherry-picked from #60772
Co-authored-by: ivin <[email protected]>
---
be/src/olap/rowset/beta_rowset_reader.cpp | 13 ++++--
be/src/olap/rowset/beta_rowset_reader.h | 10 ++++
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 15 +++++-
be/src/olap/schema_change.cpp | 39 +++++++++++++++-
be/src/olap/tablet_schema.cpp | 4 +-
be/src/olap/tablet_schema.h | 2 +-
be/src/vec/olap/vgeneric_iterators.cpp | 54 ++++++++++++++--------
be/src/vec/olap/vgeneric_iterators.h | 50 ++++++++++++++++----
be/test/vec/exec/vgeneric_iterators_test.cpp | 17 +++++--
9 files changed, 161 insertions(+), 43 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index eea0bbf865c..0b44597ed95 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -148,6 +148,10 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}
VLOG_NOTICE << "read columns size: " << read_columns.size();
_input_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
+ // output_schema only contains return_columns (excludes extra columns like
delete-predicate columns).
+ // It is used by merge/union iterators to determine how many columns to
copy to the output block.
+ _output_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(),
+
*(_read_context->return_columns));
if (_read_context->predicates != nullptr) {
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
_read_context->predicates->begin(),
@@ -316,15 +320,16 @@ Status BetaRowsetReader::_init_iterator() {
}
}
}
- _iterator = vectorized::new_merge_iterator(
- std::move(iterators), sequence_loc, _read_context->is_unique,
- _read_context->read_orderby_key_reverse,
_read_context->merged_rows);
+ _iterator = vectorized::new_merge_iterator(std::move(iterators),
sequence_loc,
+ _read_context->is_unique,
+
_read_context->read_orderby_key_reverse,
+ _read_context->merged_rows,
_output_schema);
} else {
if (_read_context->read_orderby_key_reverse) {
// reverse iterators to read backward for ORDER BY key DESC
std::reverse(iterators.begin(), iterators.end());
}
- _iterator = vectorized::new_union_iterator(std::move(iterators));
+ _iterator = vectorized::new_union_iterator(std::move(iterators),
_output_schema);
}
auto s = _iterator->init(_read_options);
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index 9f0313f3c02..1d1efde7357 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -146,7 +146,17 @@ private:
std::pair<int64_t, int64_t> _segment_offsets;
std::vector<RowRanges> _segment_row_ranges;
+ // _input_schema: includes return_columns + delete_predicate_columns.
+ // Used by SegmentIterator internally (iter->schema() returns this).
SegmentIterator
+ // handles the extra delete predicate columns through
_current_return_columns and
+ // _evaluate_short_circuit_predicate(), independent of the block structure.
+ // e.g. return_columns={c1, c2}, delete_pred on c3 => input_schema={c1,
c2, c3}
SchemaSPtr _input_schema;
+ // _output_schema: includes only return_columns (a subset of input_schema).
+ // Passed to VMergeIterator/VUnionIterator. block_reset() builds the
internal block
+ // with this schema, and copy_rows() copies exactly these columns to the
destination.
+ // e.g. return_columns={c1, c2} => output_schema={c1, c2}
+ SchemaSPtr _output_schema;
RowsetReaderContext* _read_context = nullptr;
BetaRowsetSharedPtr _rowset;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 1c6d950a00d..7a47d8acec4 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -1938,8 +1938,19 @@ Status
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
for (auto cid : _non_predicate_columns) {
auto loc = _schema_block_id_map[cid];
- // if loc > block->columns() means the column is delete column and
should
- // not output by block, so just skip the column.
+ // Whether a delete predicate column gets output depends on how the
caller builds
+ // the block passed to next_batch(). Both calling paths now build the
block with
+ // only the output schema (return_columns), so delete predicate
columns are skipped:
+ //
+ // 1) VMergeIterator path: block_reset() builds _block using the
output schema
+ // (return_columns only), e.g. block has 2 columns {c1, c2}.
+ // Here loc=2 for delete predicate c3, block->columns()=2, so loc <
block->columns()
+ // is false, and c3 is skipped.
+ //
+ // 2) VUnionIterator path: the caller's block is built with only
return_columns
+ // (output schema), e.g. block has 2 columns {c1, c2}.
+ // Here loc=2 for c3, block->columns()=2, so loc < block->columns()
is false,
+ // and c3 is skipped — same behavior as the VMergeIterator path.
if (loc < block->columns()) {
bool column_in_block_is_nothing =
vectorized::check_and_get_column<const
vectorized::ColumnNothing>(
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 8c07c88401a..db1880e000a 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -560,7 +560,14 @@ Status
VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader
bool eof = false;
do {
auto new_block =
vectorized::Block::create_unique(new_tablet_schema->create_block());
- auto ref_block =
vectorized::Block::create_unique(base_tablet_schema->create_block(false));
+ // create_block() skips dropped columns (from light-weight schema
change).
+ // Dropped columns are only needed for delete predicate evaluation,
which
+ // SegmentIterator handles internally — it creates temporary columns
for
+ // predicate columns not present in the block (via `i >=
block->columns()`
+ // guard in _init_current_block). If dropped columns were included
here,
+ // the block would have more columns than VMergeIterator's
output_schema
+ // expects, causing DCHECK failures in copy_rows.
+ auto ref_block =
vectorized::Block::create_unique(base_tablet_schema->create_block());
Status st = next_batch(rowset_reader, ref_block.get(), _row_same_bit);
if (!st) {
@@ -629,7 +636,14 @@ Status
VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset
bool eof = false;
do {
- auto ref_block =
vectorized::Block::create_unique(base_tablet_schema->create_block(false));
+ // create_block() skips dropped columns (from light-weight schema
change).
+ // Dropped columns are only needed for delete predicate evaluation,
which
+ // SegmentIterator handles internally — it creates temporary columns
for
+ // predicate columns not present in the block (via `i >=
block->columns()`
+ // guard in _init_current_block). If dropped columns were included
here,
+ // the block would have more columns than VMergeIterator's
output_schema
+ // expects, causing DCHECK failures in copy_rows.
+ auto ref_block =
vectorized::Block::create_unique(base_tablet_schema->create_block());
Status st = next_batch(rowset_reader, ref_block.get(), _row_same_bit);
if (!st) {
if (st.is<ErrorCode::END_OF_FILE>()) {
@@ -909,6 +923,27 @@ Status SchemaChangeJob::_do_process_alter_tablet(const
TAlterTabletReqV2& reques
// dropped column during light weight schema change.
// But the tablet schema in base tablet maybe not the latest from FE, so
that if fe pass through
// a tablet schema, then use request schema.
+ //
+ // return_columns does NOT include dropped columns. It is computed here
BEFORE
+ // merge_dropped_columns() appends dropped columns to _base_tablet_schema
below.
+ // This means return_columns only covers the original (non-dropped)
columns.
+ //
+ // This is important because:
+ // - BetaRowsetReader builds _output_schema from return_columns, which
determines the
+ // number of columns in ref_block (via create_block() which also skips
dropped cols).
+ // - VMergeIterator's copy_rows iterates over _output_schema columns, so
ref_block
+ // must match _output_schema exactly.
+ // - Dropped columns are only needed for delete predicate evaluation, and
SegmentIterator
+ // handles them internally (creates temporary columns for predicate
columns not present
+ // in the block via `i >= block->columns()` guard in
_init_current_block).
+ //
+ // Example: table has columns [k1, v1, v2], then DROP COLUMN v1, then
+ // DELETE FROM t WHERE v1 = 'x' was issued before the drop.
+ // - _base_tablet_schema after merge_dropped_columns: [k1, v2,
v1(DROPPED)]
+ // - return_columns (computed before merge): [0, 1] → [k1, v2]
+ // - _output_schema / ref_block columns: [k1, v2] (2 columns)
+ // - SegmentIterator reads v1 internally for delete predicate, but does
not
+ // output it to ref_block. copy_rows only iterates 2 columns — no OOB
access.
size_t num_cols =
request.columns.empty() ? _base_tablet_schema->num_columns() :
request.columns.size();
return_columns.resize(num_cols);
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index fdd4cb5945b..e56c73e34a5 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -1752,10 +1752,10 @@ vectorized::Block TabletSchema::create_block(
return block;
}
-vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const {
+vectorized::Block TabletSchema::create_block() const {
vectorized::Block block;
for (const auto& col : _cols) {
- if (ignore_dropped_col && is_dropped_column(*col)) {
+ if (is_dropped_column(*col)) {
continue;
}
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 57da8305590..4ec777a84fa 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -548,7 +548,7 @@ public:
vectorized::Block create_block(
const std::vector<uint32_t>& return_columns,
const std::unordered_set<uint32_t>*
tablet_columns_need_convert_null = nullptr) const;
- vectorized::Block create_block(bool ignore_dropped_col = true) const;
+ vectorized::Block create_block() const;
void set_schema_version(int32_t version) { _schema_version = version; }
void set_auto_increment_column(const std::string& auto_increment_column) {
_auto_increment_column = auto_increment_column;
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 4966d5e64e4..ad67a48d711 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -98,12 +98,20 @@ Status VStatisticsIterator::next_batch(Block* block) {
return Status::EndOfFile("End of VStatisticsIterator");
}
+// Build the block using the output schema, which contains only the columns
+// the caller requested (return_columns). Delete predicate columns are excluded
+// because SegmentIterator handles them independently:
+// - _init_current_block() skips predicate columns (including delete
predicates)
+// via the _is_pred_column[cid] check, so it never accesses the block by
those positions.
+// - _output_non_pred_columns() checks loc < block->columns() before filling
any column,
+// so delete predicate columns (whose loc exceeds block->columns()) are
simply skipped.
+// - Delete predicate evaluation happens entirely through
_current_return_columns and
+// _evaluate_short_circuit_predicate(), which are independent of the block
structure.
Status VMergeIteratorContext::block_reset(const std::shared_ptr<Block>& block)
{
if (!block->columns()) {
- const Schema& schema = _iter->schema();
- const auto& column_ids = schema.column_ids();
- for (size_t i = 0; i < schema.num_column_ids(); ++i) {
- auto column_desc = schema.column(column_ids[i]);
+ const auto& column_ids = _output_schema->column_ids();
+ for (size_t i = 0; i < _output_schema->num_column_ids(); ++i) {
+ auto column_desc = _output_schema->column(column_ids[i]);
auto data_type = Schema::get_data_type_ptr(*column_desc);
if (data_type == nullptr) {
return Status::RuntimeError("invalid data type");
@@ -143,9 +151,15 @@ bool VMergeIteratorContext::compare(const
VMergeIteratorContext& rhs) const {
return result;
}
+// Copy rows from the internal _block to the destination block.
+// Both blocks are built with the output schema (return_columns only), so they
+// have the same number of columns. We iterate over
_output_schema->num_column_ids()
+// columns to copy from src to dst.
Status VMergeIteratorContext::copy_rows(Block* block, bool advanced) {
Block& src = *_block;
Block& dst = *block;
+ DCHECK_EQ(src.columns(), _output_schema->num_column_ids());
+ DCHECK_EQ(dst.columns(), _output_schema->num_column_ids());
if (_cur_batch_num == 0) {
return Status::OK();
}
@@ -154,7 +168,7 @@ Status VMergeIteratorContext::copy_rows(Block* block, bool
advanced) {
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
RETURN_IF_CATCH_EXCEPTION({
- for (size_t i = 0; i < _num_columns; ++i) {
+ for (size_t i = 0; i < _output_schema->num_column_ids(); ++i) {
auto& s_col = src.get_by_position(i);
auto& d_col = dst.get_by_position(i);
@@ -344,13 +358,12 @@ Status VMergeIterator::init(const StorageReadOptions&
opts) {
if (_origin_iters.empty()) {
return Status::OK();
}
- _schema = &(_origin_iters[0]->schema());
_record_rowids = opts.record_rowids;
for (auto& iter : _origin_iters) {
- auto ctx = std::make_shared<VMergeIteratorContext>(std::move(iter),
_sequence_id_idx,
- _is_unique,
_is_reverse,
-
opts.read_orderby_key_columns);
+ auto ctx = std::make_shared<VMergeIteratorContext>(
+ std::move(iter), _sequence_id_idx, _is_unique, _is_reverse,
+ opts.read_orderby_key_columns, _output_schema);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
@@ -366,12 +379,18 @@ Status VMergeIterator::init(const StorageReadOptions&
opts) {
}
// VUnionIterator will read data from input iterator one by one.
+// Unlike VMergeIterator, VUnionIterator does NOT have its own internal block
or copy_rows().
+// It passes the caller's block directly to the underlying SegmentIterator via
next_batch(),
+// so there is no input-schema vs output-schema mismatch issue here.
+// The output_schema parameter is accepted only so that schema() can return
the output schema
+// consistently with VMergeIterator.
class VUnionIterator : public RowwiseIterator {
public:
// Iterators' ownership it transferred to this class.
// This class will delete all iterators when destructs
// Client should not use iterators anymore.
- VUnionIterator(std::vector<RowwiseIteratorUPtr>&& v) :
_origin_iters(std::move(v)) {}
+ VUnionIterator(std::vector<RowwiseIteratorUPtr>&& v, SchemaSPtr
output_schema)
+ : _output_schema(std::move(output_schema)),
_origin_iters(std::move(v)) {}
~VUnionIterator() override = default;
@@ -379,7 +398,7 @@ public:
Status next_batch(Block* block) override;
- const Schema& schema() const override { return *_schema; }
+ const Schema& schema() const override { return *_output_schema; }
Status current_block_row_locations(std::vector<RowLocation>* locations)
override;
@@ -390,7 +409,7 @@ public:
}
private:
- const Schema* _schema = nullptr;
+ const SchemaSPtr _output_schema;
RowwiseIteratorUPtr _cur_iter = nullptr;
StorageReadOptions _read_options;
std::vector<RowwiseIteratorUPtr> _origin_iters;
@@ -400,7 +419,6 @@ Status VUnionIterator::init(const StorageReadOptions& opts)
{
if (_origin_iters.empty()) {
return Status::OK();
}
-
// we use back() and pop_back() of std::vector to handle each iterator,
// so reverse the vector here to keep result block of next_batch to be
// in the same order as the original segments.
@@ -409,7 +427,6 @@ Status VUnionIterator::init(const StorageReadOptions& opts)
{
_read_options = opts;
_cur_iter = std::move(_origin_iters.back());
RETURN_IF_ERROR(_cur_iter->init(_read_options));
- _schema = &_cur_iter->schema();
return Status::OK();
}
@@ -441,19 +458,20 @@ Status
VUnionIterator::current_block_row_locations(std::vector<RowLocation>* loc
RowwiseIteratorUPtr new_merge_iterator(std::vector<RowwiseIteratorUPtr>&&
inputs,
int sequence_id_idx, bool is_unique,
bool is_reverse,
- uint64_t* merged_rows) {
+ uint64_t* merged_rows, SchemaSPtr
output_schema) {
// when the size of inputs is 1, we also need to use VMergeIterator,
because the
// next_block_view function only be implemented in VMergeIterator. The
reason why
// the size of inputs is 1 is that the segment was filtered out by zone
map or others.
return std::make_unique<VMergeIterator>(std::move(inputs),
sequence_id_idx, is_unique,
- is_reverse, merged_rows);
+ is_reverse, merged_rows,
std::move(output_schema));
}
-RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&&
inputs) {
+RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&&
inputs,
+ SchemaSPtr output_schema) {
if (inputs.size() == 1) {
return std::move(inputs[0]);
}
- return std::make_unique<VUnionIterator>(std::move(inputs));
+ return std::make_unique<VUnionIterator>(std::move(inputs),
std::move(output_schema));
}
RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment,
const Schema& schema) {
diff --git a/be/src/vec/olap/vgeneric_iterators.h
b/be/src/vec/olap/vgeneric_iterators.h
index c48492aa702..fbf3dc6c8f0 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -86,13 +86,14 @@ private:
class VMergeIteratorContext {
public:
VMergeIteratorContext(RowwiseIteratorUPtr&& iter, int sequence_id_idx,
bool is_unique,
- bool is_reverse, std::vector<uint32_t>*
read_orderby_key_columns)
+ bool is_reverse, std::vector<uint32_t>*
read_orderby_key_columns,
+ SchemaSPtr output_schema)
: _iter(std::move(iter)),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
- _num_columns(cast_set<int>(_iter->schema().num_column_ids())),
-
_num_key_columns(cast_set<int>(_iter->schema().num_key_columns())),
+ _output_schema(std::move(output_schema)),
+
_num_key_columns(cast_set<int>(_output_schema->num_key_columns())),
_compare_columns(read_orderby_key_columns) {}
VMergeIteratorContext(const VMergeIteratorContext&) = delete;
@@ -102,6 +103,22 @@ public:
~VMergeIteratorContext() = default;
+ // Reset (or initialize) the internal _block using the output schema.
+ //
+ // The output schema contains only the columns the caller requested
(return_columns),
+ // excluding delete predicate columns. For example, if the query reads
columns {c1, c2}
+ // but there is a delete predicate on column c3 (e.g., "DELETE FROM t
WHERE c3 = 'foo'"):
+ // - input schema (iter->schema) = {c1, c2, c3} (3 columns)
+ // - output schema = {c1, c2} (2 columns)
+ //
+ // It is safe to build the block with only the output schema because
SegmentIterator
+ // handles delete predicate columns independently of the block structure:
+ // - _init_current_block() skips predicate columns (including delete
predicates)
+ // via the _is_pred_column[cid] check, never accessing the block for
them.
+ // - _output_non_pred_columns() checks loc < block->columns() before
filling any
+ // column, so delete predicate columns are simply skipped when the
block is smaller.
+ // - Delete predicate evaluation uses _current_return_columns and
+ // _evaluate_short_circuit_predicate(), independent of the block.
Status block_reset(const std::shared_ptr<Block>& block);
// Initialize this context and will prepare data for current_row()
@@ -109,6 +126,10 @@ public:
bool compare(const VMergeIteratorContext& rhs) const;
+ // Copy rows from internal _block to the destination block.
+ // Both blocks have _output_schema columns (return_columns only).
+ // Only _output_schema->num_column_ids() columns are copied.
+ //
// `advanced = false` when current block finished
// when input argument type is block, we do not process same_bit,
// this case we only need merge and return ordered data
(VCollectIterator::_topn_next), data mode is dup/mow can guarantee all rows are
different
@@ -174,7 +195,14 @@ private:
size_t _index_in_block = -1;
// 4096 minus 16 + 16 bytes padding that in padding pod array
int _block_row_max = 4064;
- int _num_columns;
+ // The output schema defines which columns are in _block and in the
caller's dst block.
+ // It contains only the requested return_columns, excluding delete
predicate columns.
+ // For example:
+ // - _iter->schema() (input) = {c1, c2, c3} — c3 for "DELETE WHERE
c3='foo'"
+ // - _output_schema = {c1, c2} — only the requested
columns
+ // block_reset() uses _output_schema to build _block, and copy_rows()
iterates over
+ // _output_schema->num_column_ids() columns to copy from _block to the
destination.
+ const SchemaSPtr _output_schema;
int _num_key_columns;
std::vector<uint32_t>* _compare_columns;
std::vector<RowLocation> _block_row_locations;
@@ -192,8 +220,9 @@ class VMergeIterator : public RowwiseIterator {
public:
// VMergeIterator takes the ownership of input iterators
VMergeIterator(std::vector<RowwiseIteratorUPtr>&& iters, int
sequence_id_idx, bool is_unique,
- bool is_reverse, uint64_t* merged_rows)
+ bool is_reverse, uint64_t* merged_rows, SchemaSPtr
output_schema)
: _origin_iters(std::move(iters)),
+ _output_schema(std::move(output_schema)),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
@@ -209,7 +238,7 @@ public:
}
Status next_batch(BlockView* block_view) override { return
_next_batch(block_view); }
- const Schema& schema() const override { return *_schema; }
+ const Schema& schema() const override { return *_output_schema; }
Status current_block_row_locations(std::vector<RowLocation>*
block_row_locations) override {
DCHECK(_record_rowids);
@@ -293,7 +322,9 @@ private:
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIteratorUPtr> _origin_iters;
- const Schema* _schema = nullptr;
+ // The output schema (excludes delete predicate columns). Passed down to
each
+ // VMergeIteratorContext to control how many columns copy_rows() copies.
+ const SchemaSPtr _output_schema;
struct VMergeContextComparator {
bool operator()(const std::shared_ptr<VMergeIteratorContext>& lhs,
@@ -325,13 +356,14 @@ private:
// should delete returned iterator after usage.
RowwiseIteratorUPtr new_merge_iterator(std::vector<RowwiseIteratorUPtr>&&
inputs,
int sequence_id_idx, bool is_unique,
bool is_reverse,
- uint64_t* merged_rows);
+ uint64_t* merged_rows, SchemaSPtr
output_schema);
// Create a union iterator for input iterators. Union iterator will read
// input iterators one by one.
//
// Inputs iterators' ownership is taken by created union iterator.
-RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&&
inputs);
+RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&&
inputs,
+ SchemaSPtr output_schema);
// Create an auto increment iterator which returns num_rows data in format of
schema.
// This class aims to be used in unit test.
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp
b/be/test/vec/exec/vgeneric_iterators_test.cpp
index 4aaa91995fd..9097c4a739b 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -20,6 +20,7 @@
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
+#include <memory>
#include <vector>
#include "gtest/gtest_pred_impl.h"
@@ -104,13 +105,14 @@ TEST(VGenericIteratorsTest, AutoIncrement) {
TEST(VGenericIteratorsTest, Union) {
auto schema = create_schema();
+ auto output_schema = std::make_shared<Schema>(schema);
std::vector<RowwiseIteratorUPtr> inputs;
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
- auto iter = vectorized::new_union_iterator(std::move(inputs));
+ auto iter = vectorized::new_union_iterator(std::move(inputs),
output_schema);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@@ -148,13 +150,15 @@ TEST(VGenericIteratorsTest, Union) {
TEST(VGenericIteratorsTest, MergeAgg) {
EXPECT_TRUE(1);
auto schema = create_schema();
+ auto output_schema = std::make_shared<Schema>(schema);
std::vector<RowwiseIteratorUPtr> inputs;
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
- auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, false,
false, nullptr);
+ auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, false,
false, nullptr,
+ output_schema);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@@ -197,13 +201,15 @@ TEST(VGenericIteratorsTest, MergeAgg) {
TEST(VGenericIteratorsTest, MergeUnique) {
EXPECT_TRUE(1);
auto schema = create_schema();
+ auto output_schema = std::make_shared<Schema>(schema);
std::vector<RowwiseIteratorUPtr> inputs;
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
- auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, true,
false, nullptr);
+ auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, true,
false, nullptr,
+ output_schema);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@@ -310,6 +316,7 @@ public:
TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
EXPECT_TRUE(1);
auto schema = create_schema();
+ auto output_schema = std::make_shared<Schema>(schema);
std::vector<RowwiseIteratorUPtr> inputs;
int seq_column_id = 2;
@@ -325,8 +332,8 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
schema, num_rows, rows_begin, seq_column_id,
seq_id_in_every_file));
}
- auto iter =
- vectorized::new_merge_iterator(std::move(inputs), seq_column_id,
true, false, nullptr);
+ auto iter = vectorized::new_merge_iterator(std::move(inputs),
seq_column_id, true, false,
+ nullptr, output_schema);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]