This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a133581f24d branch-4.0: [fix](iterator) Use explicit output schema in 
new_merge_iterator and new_union_iterator #60772 (#60804)
a133581f24d is described below

commit a133581f24d45ff63d61f01d25aaa943b4699b48
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 25 10:04:49 2026 +0800

    branch-4.0: [fix](iterator) Use explicit output schema in 
new_merge_iterator and new_union_iterator #60772 (#60804)
    
    Cherry-picked from #60772
    
    Co-authored-by: ivin <[email protected]>
---
 be/src/olap/rowset/beta_rowset_reader.cpp          | 13 ++++--
 be/src/olap/rowset/beta_rowset_reader.h            | 10 ++++
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 15 +++++-
 be/src/olap/schema_change.cpp                      | 39 +++++++++++++++-
 be/src/olap/tablet_schema.cpp                      |  4 +-
 be/src/olap/tablet_schema.h                        |  2 +-
 be/src/vec/olap/vgeneric_iterators.cpp             | 54 ++++++++++++++--------
 be/src/vec/olap/vgeneric_iterators.h               | 50 ++++++++++++++++----
 be/test/vec/exec/vgeneric_iterators_test.cpp       | 17 +++++--
 9 files changed, 161 insertions(+), 43 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index eea0bbf865c..0b44597ed95 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -148,6 +148,10 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     }
     VLOG_NOTICE << "read columns size: " << read_columns.size();
     _input_schema = 
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
+    // output_schema only contains return_columns (excludes extra columns like 
delete-predicate columns).
+    // It is used by merge/union iterators to determine how many columns to 
copy to the output block.
+    _output_schema = 
std::make_shared<Schema>(_read_context->tablet_schema->columns(),
+                                              
*(_read_context->return_columns));
     if (_read_context->predicates != nullptr) {
         
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
                                                
_read_context->predicates->begin(),
@@ -316,15 +320,16 @@ Status BetaRowsetReader::_init_iterator() {
                 }
             }
         }
-        _iterator = vectorized::new_merge_iterator(
-                std::move(iterators), sequence_loc, _read_context->is_unique,
-                _read_context->read_orderby_key_reverse, 
_read_context->merged_rows);
+        _iterator = vectorized::new_merge_iterator(std::move(iterators), 
sequence_loc,
+                                                   _read_context->is_unique,
+                                                   
_read_context->read_orderby_key_reverse,
+                                                   _read_context->merged_rows, 
_output_schema);
     } else {
         if (_read_context->read_orderby_key_reverse) {
             // reverse iterators to read backward for ORDER BY key DESC
             std::reverse(iterators.begin(), iterators.end());
         }
-        _iterator = vectorized::new_union_iterator(std::move(iterators));
+        _iterator = vectorized::new_union_iterator(std::move(iterators), 
_output_schema);
     }
 
     auto s = _iterator->init(_read_options);
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index 9f0313f3c02..1d1efde7357 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -146,7 +146,17 @@ private:
     std::pair<int64_t, int64_t> _segment_offsets;
     std::vector<RowRanges> _segment_row_ranges;
 
+    // _input_schema: includes return_columns + delete_predicate_columns.
+    // Used by SegmentIterator internally (iter->schema() returns this). 
SegmentIterator
+    // handles the extra delete predicate columns through 
_current_return_columns and
+    // _evaluate_short_circuit_predicate(), independent of the block structure.
+    // e.g. return_columns={c1, c2}, delete_pred on c3 => input_schema={c1, 
c2, c3}
     SchemaSPtr _input_schema;
+    // _output_schema: includes only return_columns (a subset of input_schema).
+    // Passed to VMergeIterator/VUnionIterator. block_reset() builds the 
internal block
+    // with this schema, and copy_rows() copies exactly these columns to the 
destination.
+    // e.g. return_columns={c1, c2} => output_schema={c1, c2}
+    SchemaSPtr _output_schema;
     RowsetReaderContext* _read_context = nullptr;
     BetaRowsetSharedPtr _rowset;
 
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 1c6d950a00d..7a47d8acec4 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -1938,8 +1938,19 @@ Status 
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
     RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
     for (auto cid : _non_predicate_columns) {
         auto loc = _schema_block_id_map[cid];
-        // if loc > block->columns() means the column is delete column and 
should
-        // not output by block, so just skip the column.
+        // Whether a delete predicate column gets output depends on how the 
caller builds
+        // the block passed to next_batch(). Both calling paths now build the 
block with
+        // only the output schema (return_columns), so delete predicate 
columns are skipped:
+        //
+        // 1) VMergeIterator path: block_reset() builds _block using the 
output schema
+        //    (return_columns only), e.g. block has 2 columns {c1, c2}.
+        //    Here loc=2 for delete predicate c3, block->columns()=2, so loc < 
block->columns()
+        //    is false, and c3 is skipped.
+        //
+        // 2) VUnionIterator path: the caller's block is built with only 
return_columns
+        //    (output schema), e.g. block has 2 columns {c1, c2}.
+        //    Here loc=2 for c3, block->columns()=2, so loc < block->columns() 
is false,
+        //    and c3 is skipped — same behavior as the VMergeIterator path.
         if (loc < block->columns()) {
             bool column_in_block_is_nothing =
                     vectorized::check_and_get_column<const 
vectorized::ColumnNothing>(
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 8c07c88401a..db1880e000a 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -560,7 +560,14 @@ Status 
VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader
     bool eof = false;
     do {
         auto new_block = 
vectorized::Block::create_unique(new_tablet_schema->create_block());
-        auto ref_block = 
vectorized::Block::create_unique(base_tablet_schema->create_block(false));
+        // create_block() skips dropped columns (from light-weight schema 
change).
+        // Dropped columns are only needed for delete predicate evaluation, 
which
+        // SegmentIterator handles internally — it creates temporary columns 
for
+        // predicate columns not present in the block (via `i >= 
block->columns()`
+        // guard in _init_current_block). If dropped columns were included 
here,
+        // the block would have more columns than VMergeIterator's 
output_schema
+        // expects, causing DCHECK failures in copy_rows.
+        auto ref_block = 
vectorized::Block::create_unique(base_tablet_schema->create_block());
 
         Status st = next_batch(rowset_reader, ref_block.get(), _row_same_bit);
         if (!st) {
@@ -629,7 +636,14 @@ Status 
VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset
 
     bool eof = false;
     do {
-        auto ref_block = 
vectorized::Block::create_unique(base_tablet_schema->create_block(false));
+        // create_block() skips dropped columns (from light-weight schema 
change).
+        // Dropped columns are only needed for delete predicate evaluation, 
which
+        // SegmentIterator handles internally — it creates temporary columns 
for
+        // predicate columns not present in the block (via `i >= 
block->columns()`
+        // guard in _init_current_block). If dropped columns were included 
here,
+        // the block would have more columns than VMergeIterator's 
output_schema
+        // expects, causing DCHECK failures in copy_rows.
+        auto ref_block = 
vectorized::Block::create_unique(base_tablet_schema->create_block());
         Status st = next_batch(rowset_reader, ref_block.get(), _row_same_bit);
         if (!st) {
             if (st.is<ErrorCode::END_OF_FILE>()) {
@@ -909,6 +923,27 @@ Status SchemaChangeJob::_do_process_alter_tablet(const 
TAlterTabletReqV2& reques
     // dropped column during light weight schema change.
     // But the tablet schema in base tablet maybe not the latest from FE, so 
that if fe pass through
     // a tablet schema, then use request schema.
+    //
+    // return_columns does NOT include dropped columns. It is computed here 
BEFORE
+    // merge_dropped_columns() appends dropped columns to _base_tablet_schema 
below.
+    // This means return_columns only covers the original (non-dropped) 
columns.
+    //
+    // This is important because:
+    // - BetaRowsetReader builds _output_schema from return_columns, which 
determines the
+    //   number of columns in ref_block (via create_block() which also skips 
dropped cols).
+    // - VMergeIterator's copy_rows iterates over _output_schema columns, so 
ref_block
+    //   must match _output_schema exactly.
+    // - Dropped columns are only needed for delete predicate evaluation, and 
SegmentIterator
+    //   handles them internally (creates temporary columns for predicate 
columns not present
+    //   in the block via `i >= block->columns()` guard in 
_init_current_block).
+    //
+    // Example: table has columns [k1, v1, v2], then DROP COLUMN v1, then
+    //   DELETE FROM t WHERE v1 = 'x' was issued before the drop.
+    //   - _base_tablet_schema after merge_dropped_columns: [k1, v2, 
v1(DROPPED)]
+    //   - return_columns (computed before merge): [0, 1] → [k1, v2]
+    //   - _output_schema / ref_block columns: [k1, v2] (2 columns)
+    //   - SegmentIterator reads v1 internally for delete predicate, but does 
not
+    //     output it to ref_block. copy_rows only iterates 2 columns — no OOB 
access.
     size_t num_cols =
             request.columns.empty() ? _base_tablet_schema->num_columns() : 
request.columns.size();
     return_columns.resize(num_cols);
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index fdd4cb5945b..e56c73e34a5 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -1752,10 +1752,10 @@ vectorized::Block TabletSchema::create_block(
     return block;
 }
 
-vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const {
+vectorized::Block TabletSchema::create_block() const {
     vectorized::Block block;
     for (const auto& col : _cols) {
-        if (ignore_dropped_col && is_dropped_column(*col)) {
+        if (is_dropped_column(*col)) {
             continue;
         }
 
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 57da8305590..4ec777a84fa 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -548,7 +548,7 @@ public:
     vectorized::Block create_block(
             const std::vector<uint32_t>& return_columns,
             const std::unordered_set<uint32_t>* 
tablet_columns_need_convert_null = nullptr) const;
-    vectorized::Block create_block(bool ignore_dropped_col = true) const;
+    vectorized::Block create_block() const;
     void set_schema_version(int32_t version) { _schema_version = version; }
     void set_auto_increment_column(const std::string& auto_increment_column) {
         _auto_increment_column = auto_increment_column;
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 4966d5e64e4..ad67a48d711 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -98,12 +98,20 @@ Status VStatisticsIterator::next_batch(Block* block) {
     return Status::EndOfFile("End of VStatisticsIterator");
 }
 
+// Build the block using the output schema, which contains only the columns
+// the caller requested (return_columns). Delete predicate columns are excluded
+// because SegmentIterator handles them independently:
+//   - _init_current_block() skips predicate columns (including delete 
predicates)
+//     via the _is_pred_column[cid] check, so it never accesses the block by 
those positions.
+//   - _output_non_pred_columns() checks loc < block->columns() before filling 
any column,
+//     so delete predicate columns (whose loc exceeds block->columns()) are 
simply skipped.
+//   - Delete predicate evaluation happens entirely through 
_current_return_columns and
+//     _evaluate_short_circuit_predicate(), which are independent of the block 
structure.
 Status VMergeIteratorContext::block_reset(const std::shared_ptr<Block>& block) 
{
     if (!block->columns()) {
-        const Schema& schema = _iter->schema();
-        const auto& column_ids = schema.column_ids();
-        for (size_t i = 0; i < schema.num_column_ids(); ++i) {
-            auto column_desc = schema.column(column_ids[i]);
+        const auto& column_ids = _output_schema->column_ids();
+        for (size_t i = 0; i < _output_schema->num_column_ids(); ++i) {
+            auto column_desc = _output_schema->column(column_ids[i]);
             auto data_type = Schema::get_data_type_ptr(*column_desc);
             if (data_type == nullptr) {
                 return Status::RuntimeError("invalid data type");
@@ -143,9 +151,15 @@ bool VMergeIteratorContext::compare(const 
VMergeIteratorContext& rhs) const {
     return result;
 }
 
+// Copy rows from the internal _block to the destination block.
+// Both blocks are built with the output schema (return_columns only), so they
+// have the same number of columns. We iterate over 
_output_schema->num_column_ids()
+// columns to copy from src to dst.
 Status VMergeIteratorContext::copy_rows(Block* block, bool advanced) {
     Block& src = *_block;
     Block& dst = *block;
+    DCHECK_EQ(src.columns(), _output_schema->num_column_ids());
+    DCHECK_EQ(dst.columns(), _output_schema->num_column_ids());
     if (_cur_batch_num == 0) {
         return Status::OK();
     }
@@ -154,7 +168,7 @@ Status VMergeIteratorContext::copy_rows(Block* block, bool 
advanced) {
     size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
 
     RETURN_IF_CATCH_EXCEPTION({
-        for (size_t i = 0; i < _num_columns; ++i) {
+        for (size_t i = 0; i < _output_schema->num_column_ids(); ++i) {
             auto& s_col = src.get_by_position(i);
             auto& d_col = dst.get_by_position(i);
 
@@ -344,13 +358,12 @@ Status VMergeIterator::init(const StorageReadOptions& 
opts) {
     if (_origin_iters.empty()) {
         return Status::OK();
     }
-    _schema = &(_origin_iters[0]->schema());
     _record_rowids = opts.record_rowids;
 
     for (auto& iter : _origin_iters) {
-        auto ctx = std::make_shared<VMergeIteratorContext>(std::move(iter), 
_sequence_id_idx,
-                                                           _is_unique, 
_is_reverse,
-                                                           
opts.read_orderby_key_columns);
+        auto ctx = std::make_shared<VMergeIteratorContext>(
+                std::move(iter), _sequence_id_idx, _is_unique, _is_reverse,
+                opts.read_orderby_key_columns, _output_schema);
         RETURN_IF_ERROR(ctx->init(opts));
         if (!ctx->valid()) {
             continue;
@@ -366,12 +379,18 @@ Status VMergeIterator::init(const StorageReadOptions& 
opts) {
 }
 
 // VUnionIterator will read data from input iterator one by one.
+// Unlike VMergeIterator, VUnionIterator does NOT have its own internal block 
or copy_rows().
+// It passes the caller's block directly to the underlying SegmentIterator via 
next_batch(),
+// so there is no input-schema vs output-schema mismatch issue here.
+// The output_schema parameter is accepted only so that schema() can return 
the output schema
+// consistently with VMergeIterator.
 class VUnionIterator : public RowwiseIterator {
 public:
     // Iterators' ownership it transferred to this class.
     // This class will delete all iterators when destructs
     // Client should not use iterators anymore.
-    VUnionIterator(std::vector<RowwiseIteratorUPtr>&& v) : 
_origin_iters(std::move(v)) {}
+    VUnionIterator(std::vector<RowwiseIteratorUPtr>&& v, SchemaSPtr 
output_schema)
+            : _output_schema(std::move(output_schema)), 
_origin_iters(std::move(v)) {}
 
     ~VUnionIterator() override = default;
 
@@ -379,7 +398,7 @@ public:
 
     Status next_batch(Block* block) override;
 
-    const Schema& schema() const override { return *_schema; }
+    const Schema& schema() const override { return *_output_schema; }
 
     Status current_block_row_locations(std::vector<RowLocation>* locations) 
override;
 
@@ -390,7 +409,7 @@ public:
     }
 
 private:
-    const Schema* _schema = nullptr;
+    const SchemaSPtr _output_schema;
     RowwiseIteratorUPtr _cur_iter = nullptr;
     StorageReadOptions _read_options;
     std::vector<RowwiseIteratorUPtr> _origin_iters;
@@ -400,7 +419,6 @@ Status VUnionIterator::init(const StorageReadOptions& opts) 
{
     if (_origin_iters.empty()) {
         return Status::OK();
     }
-
     // we use back() and pop_back() of std::vector to handle each iterator,
     // so reverse the vector here to keep result block of next_batch to be
     // in the same order as the original segments.
@@ -409,7 +427,6 @@ Status VUnionIterator::init(const StorageReadOptions& opts) 
{
     _read_options = opts;
     _cur_iter = std::move(_origin_iters.back());
     RETURN_IF_ERROR(_cur_iter->init(_read_options));
-    _schema = &_cur_iter->schema();
     return Status::OK();
 }
 
@@ -441,19 +458,20 @@ Status 
VUnionIterator::current_block_row_locations(std::vector<RowLocation>* loc
 
 RowwiseIteratorUPtr new_merge_iterator(std::vector<RowwiseIteratorUPtr>&& 
inputs,
                                        int sequence_id_idx, bool is_unique, 
bool is_reverse,
-                                       uint64_t* merged_rows) {
+                                       uint64_t* merged_rows, SchemaSPtr 
output_schema) {
     // when the size of inputs is 1, we also need to use VMergeIterator, 
because the
     // next_block_view function only be implemented in VMergeIterator. The 
reason why
     // the size of inputs is 1 is that the segment was filtered out by zone 
map or others.
     return std::make_unique<VMergeIterator>(std::move(inputs), 
sequence_id_idx, is_unique,
-                                            is_reverse, merged_rows);
+                                            is_reverse, merged_rows, 
std::move(output_schema));
 }
 
-RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& 
inputs) {
+RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& 
inputs,
+                                       SchemaSPtr output_schema) {
     if (inputs.size() == 1) {
         return std::move(inputs[0]);
     }
-    return std::make_unique<VUnionIterator>(std::move(inputs));
+    return std::make_unique<VUnionIterator>(std::move(inputs), 
std::move(output_schema));
 }
 
 RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, 
const Schema& schema) {
diff --git a/be/src/vec/olap/vgeneric_iterators.h 
b/be/src/vec/olap/vgeneric_iterators.h
index c48492aa702..fbf3dc6c8f0 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -86,13 +86,14 @@ private:
 class VMergeIteratorContext {
 public:
     VMergeIteratorContext(RowwiseIteratorUPtr&& iter, int sequence_id_idx, 
bool is_unique,
-                          bool is_reverse, std::vector<uint32_t>* 
read_orderby_key_columns)
+                          bool is_reverse, std::vector<uint32_t>* 
read_orderby_key_columns,
+                          SchemaSPtr output_schema)
             : _iter(std::move(iter)),
               _sequence_id_idx(sequence_id_idx),
               _is_unique(is_unique),
               _is_reverse(is_reverse),
-              _num_columns(cast_set<int>(_iter->schema().num_column_ids())),
-              
_num_key_columns(cast_set<int>(_iter->schema().num_key_columns())),
+              _output_schema(std::move(output_schema)),
+              
_num_key_columns(cast_set<int>(_output_schema->num_key_columns())),
               _compare_columns(read_orderby_key_columns) {}
 
     VMergeIteratorContext(const VMergeIteratorContext&) = delete;
@@ -102,6 +103,22 @@ public:
 
     ~VMergeIteratorContext() = default;
 
+    // Reset (or initialize) the internal _block using the output schema.
+    //
+    // The output schema contains only the columns the caller requested 
(return_columns),
+    // excluding delete predicate columns. For example, if the query reads 
columns {c1, c2}
+    // but there is a delete predicate on column c3 (e.g., "DELETE FROM t 
WHERE c3 = 'foo'"):
+    //   - input schema  (iter->schema) = {c1, c2, c3}   (3 columns)
+    //   - output schema                = {c1, c2}       (2 columns)
+    //
+    // It is safe to build the block with only the output schema because 
SegmentIterator
+    // handles delete predicate columns independently of the block structure:
+    //   - _init_current_block() skips predicate columns (including delete 
predicates)
+    //     via the _is_pred_column[cid] check, never accessing the block for 
them.
+    //   - _output_non_pred_columns() checks loc < block->columns() before 
filling any
+    //     column, so delete predicate columns are simply skipped when the 
block is smaller.
+    //   - Delete predicate evaluation uses _current_return_columns and
+    //     _evaluate_short_circuit_predicate(), independent of the block.
     Status block_reset(const std::shared_ptr<Block>& block);
 
     // Initialize this context and will prepare data for current_row()
@@ -109,6 +126,10 @@ public:
 
     bool compare(const VMergeIteratorContext& rhs) const;
 
+    // Copy rows from internal _block to the destination block.
+    // Both blocks have _output_schema columns (return_columns only).
+    // Only _output_schema->num_column_ids() columns are copied.
+    //
     // `advanced = false` when current block finished
     // when input argument type is block, we do not process same_bit,
     // this case we only need merge and return ordered data 
(VCollectIterator::_topn_next), data mode is dup/mow can guarantee all rows are 
different
@@ -174,7 +195,14 @@ private:
     size_t _index_in_block = -1;
     // 4096 minus 16 + 16 bytes padding that in padding pod array
     int _block_row_max = 4064;
-    int _num_columns;
+    // The output schema defines which columns are in _block and in the 
caller's dst block.
+    // It contains only the requested return_columns, excluding delete 
predicate columns.
+    // For example:
+    //   - _iter->schema() (input)  = {c1, c2, c3}  — c3 for "DELETE WHERE 
c3='foo'"
+    //   - _output_schema           = {c1, c2}      — only the requested 
columns
+    // block_reset() uses _output_schema to build _block, and copy_rows() 
iterates over
+    // _output_schema->num_column_ids() columns to copy from _block to the 
destination.
+    const SchemaSPtr _output_schema;
     int _num_key_columns;
     std::vector<uint32_t>* _compare_columns;
     std::vector<RowLocation> _block_row_locations;
@@ -192,8 +220,9 @@ class VMergeIterator : public RowwiseIterator {
 public:
     // VMergeIterator takes the ownership of input iterators
     VMergeIterator(std::vector<RowwiseIteratorUPtr>&& iters, int 
sequence_id_idx, bool is_unique,
-                   bool is_reverse, uint64_t* merged_rows)
+                   bool is_reverse, uint64_t* merged_rows, SchemaSPtr 
output_schema)
             : _origin_iters(std::move(iters)),
+              _output_schema(std::move(output_schema)),
               _sequence_id_idx(sequence_id_idx),
               _is_unique(is_unique),
               _is_reverse(is_reverse),
@@ -209,7 +238,7 @@ public:
     }
     Status next_batch(BlockView* block_view) override { return 
_next_batch(block_view); }
 
-    const Schema& schema() const override { return *_schema; }
+    const Schema& schema() const override { return *_output_schema; }
 
     Status current_block_row_locations(std::vector<RowLocation>* 
block_row_locations) override {
         DCHECK(_record_rowids);
@@ -293,7 +322,9 @@ private:
     // It will be released after '_merge_heap' has been built.
     std::vector<RowwiseIteratorUPtr> _origin_iters;
 
-    const Schema* _schema = nullptr;
+    // The output schema (excludes delete predicate columns). Passed down to 
each
+    // VMergeIteratorContext to control how many columns copy_rows() copies.
+    const SchemaSPtr _output_schema;
 
     struct VMergeContextComparator {
         bool operator()(const std::shared_ptr<VMergeIteratorContext>& lhs,
@@ -325,13 +356,14 @@ private:
 // should delete returned iterator after usage.
 RowwiseIteratorUPtr new_merge_iterator(std::vector<RowwiseIteratorUPtr>&& 
inputs,
                                        int sequence_id_idx, bool is_unique, 
bool is_reverse,
-                                       uint64_t* merged_rows);
+                                       uint64_t* merged_rows, SchemaSPtr 
output_schema);
 
 // Create a union iterator for input iterators. Union iterator will read
 // input iterators one by one.
 //
 // Inputs iterators' ownership is taken by created union iterator.
-RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& 
inputs);
+RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& 
inputs,
+                                       SchemaSPtr output_schema);
 
 // Create an auto increment iterator which returns num_rows data in format of 
schema.
 // This class aims to be used in unit test.
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp 
b/be/test/vec/exec/vgeneric_iterators_test.cpp
index 4aaa91995fd..9097c4a739b 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -20,6 +20,7 @@
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
 
+#include <memory>
 #include <vector>
 
 #include "gtest/gtest_pred_impl.h"
@@ -104,13 +105,14 @@ TEST(VGenericIteratorsTest, AutoIncrement) {
 
 TEST(VGenericIteratorsTest, Union) {
     auto schema = create_schema();
+    auto output_schema = std::make_shared<Schema>(schema);
     std::vector<RowwiseIteratorUPtr> inputs;
 
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
 
-    auto iter = vectorized::new_union_iterator(std::move(inputs));
+    auto iter = vectorized::new_union_iterator(std::move(inputs), 
output_schema);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     EXPECT_TRUE(st.ok());
@@ -148,13 +150,15 @@ TEST(VGenericIteratorsTest, Union) {
 TEST(VGenericIteratorsTest, MergeAgg) {
     EXPECT_TRUE(1);
     auto schema = create_schema();
+    auto output_schema = std::make_shared<Schema>(schema);
     std::vector<RowwiseIteratorUPtr> inputs;
 
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
 
-    auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, false, 
false, nullptr);
+    auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, false, 
false, nullptr,
+                                               output_schema);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     EXPECT_TRUE(st.ok());
@@ -197,13 +201,15 @@ TEST(VGenericIteratorsTest, MergeAgg) {
 TEST(VGenericIteratorsTest, MergeUnique) {
     EXPECT_TRUE(1);
     auto schema = create_schema();
+    auto output_schema = std::make_shared<Schema>(schema);
     std::vector<RowwiseIteratorUPtr> inputs;
 
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
 
-    auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, true, 
false, nullptr);
+    auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, true, 
false, nullptr,
+                                               output_schema);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     EXPECT_TRUE(st.ok());
@@ -310,6 +316,7 @@ public:
 TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
     EXPECT_TRUE(1);
     auto schema = create_schema();
+    auto output_schema = std::make_shared<Schema>(schema);
     std::vector<RowwiseIteratorUPtr> inputs;
 
     int seq_column_id = 2;
@@ -325,8 +332,8 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
                 schema, num_rows, rows_begin, seq_column_id, 
seq_id_in_every_file));
     }
 
-    auto iter =
-            vectorized::new_merge_iterator(std::move(inputs), seq_column_id, 
true, false, nullptr);
+    auto iter = vectorized::new_merge_iterator(std::move(inputs), 
seq_column_id, true, false,
+                                               nullptr, output_schema);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     EXPECT_TRUE(st.ok());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to