This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c7e99278cf2e48f29afb8afe9a9d4f2c857a7af4
Author: Mryange <[email protected]>
AuthorDate: Mon Apr 15 15:00:09 2024 +0800

    [refine](Operator) When _stop_emplace_flag is not set to true, perform 
batch processing on the block. (#33173)
---
 .../distinct_streaming_aggregation_operator.cpp    | 112 +++++++++++++++------
 .../exec/distinct_streaming_aggregation_operator.h |  15 ++-
 be/src/vec/columns/column.h                        |   6 ++
 be/src/vec/columns/column_array.h                  |   4 +
 be/src/vec/columns/column_complex.h                |   5 +
 be/src/vec/columns/column_const.h                  |   4 +
 be/src/vec/columns/column_decimal.h                |   4 +
 be/src/vec/columns/column_dictionary.h             |   5 +
 be/src/vec/columns/column_dummy.h                  |  14 +++
 be/src/vec/columns/column_fixed_length_object.h    |   5 +
 be/src/vec/columns/column_impl.h                   |  10 +-
 be/src/vec/columns/column_map.h                    |   4 +
 be/src/vec/columns/column_nullable.h               |   5 +
 be/src/vec/columns/column_object.cpp               |   5 -
 be/src/vec/columns/column_object.h                 |   9 +-
 be/src/vec/columns/column_string.h                 |   4 +
 be/src/vec/columns/column_struct.h                 |   4 +
 be/src/vec/columns/column_vector.h                 |   4 +
 be/src/vec/columns/predicate_column.h              |   4 +
 19 files changed, 178 insertions(+), 45 deletions(-)

diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 7983b269488..9151f4a29d5 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -46,7 +46,7 @@ static constexpr StreamingHtMinReductionEntry 
STREAMING_HT_MIN_REDUCTION[] = {
         {0, 0.0},
         // Expand into L3 cache if we look like we're getting some reduction.
         // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 1.1},
+        {1024 * 1024, 0.0},
         // Expand into main memory if we're getting a significant reduction.
         // The L3 cache is generally 16MB or more
         {16 * 1024 * 1024, 2.0},
@@ -59,6 +59,7 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
                                                                OperatorXBase* 
parent)
         : PipelineXLocalState<FakeSharedState>(state, parent),
           dummy_mapped_data(std::make_shared<char>('A')),
+          batch_size(state->batch_size()),
           _agg_arena_pool(std::make_unique<vectorized::Arena>()),
           _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
           _agg_profile_arena(std::make_unique<vectorized::Arena>()),
@@ -83,6 +84,8 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* 
state, LocalStateInfo&
     _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
     _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
     _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+    _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
+    _insert_keys_to_column_timer = ADD_TIMER(profile(), 
"InsertKeysToColumnTime");
 
     if (_probe_expr_ctxs.empty()) {
         _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
@@ -120,7 +123,7 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
                 // were aggregated into it. Exclude passed through rows from 
this calculation since
                 // they were not in hash tables.
                 const int64_t input_rows = _input_num_rows;
-                const int64_t aggregated_input_rows = input_rows - 
_cur_num_rows_returned;
+                const int64_t aggregated_input_rows = input_rows - 
_num_rows_returned;
                 // TODO chenhao
                 //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
                 double current_reduction = 
static_cast<double>(aggregated_input_rows) / ht_rows;
@@ -245,22 +248,48 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
     _distinct_row.clear();
     _distinct_row.reserve(rows);
 
-    RETURN_IF_CATCH_EXCEPTION(
-            _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, 
rows));
-    // need use _cur_num_rows_returned to decide whether to do continue 
emplace into hash table
-    _cur_num_rows_returned += _distinct_row.size();
+    if (!_stop_emplace_flag) {
+        RETURN_IF_CATCH_EXCEPTION(
+                _emplace_into_hash_table_to_distinct(_distinct_row, 
key_columns, rows));
+    }
 
     bool mem_reuse = 
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
                      out_block->mem_reuse();
+    SCOPED_TIMER(_insert_keys_to_column_timer);
     if (mem_reuse) {
-        for (int i = 0; i < key_size; ++i) {
-            auto output_column = out_block->get_by_position(i).column;
-            if (_stop_emplace_flag) { // swap the column directly, to solve 
Check failed: d.column->use_count() == 1 (2 vs. 1)
+        if (_stop_emplace_flag && !out_block->empty()) {
+            // when out_block row >= batch_size, push it to data_queue, so 
when _stop_emplace_flag = true, maybe have some data in block
+            // need output those data firstly
+            DCHECK(_distinct_row.empty());
+            _distinct_row.resize(rows);
+            std::iota(_distinct_row.begin(), _distinct_row.end(), 0);
+        }
+        DCHECK_EQ(out_block->columns(), key_size);
+        if (_stop_emplace_flag && _distinct_row.empty()) {
+            // swap the column directly, to solve Check failed: 
d.column->use_count() == 1 (2 vs. 1)
+            for (int i = 0; i < key_size; ++i) {
+                auto output_column = out_block->get_by_position(i).column;
                 out_block->replace_by_position(i, 
key_columns[i]->assume_mutable());
                 in_block->replace_by_position(result_idxs[i], output_column);
+            }
+        } else {
+            DCHECK_EQ(_cache_block.rows(), 0);
+            if (out_block->rows() + _distinct_row.size() > batch_size) {
+                size_t split_size = batch_size - out_block->rows();
+                for (int i = 0; i < key_size; ++i) {
+                    auto output_dst = 
out_block->get_by_position(i).column->assume_mutable();
+                    key_columns[i]->append_data_by_selector(output_dst, 
_distinct_row, 0,
+                                                            split_size);
+                    auto cache_dst = 
_cache_block.get_by_position(i).column->assume_mutable();
+                    key_columns[i]->append_data_by_selector(cache_dst, 
_distinct_row, split_size,
+                                                            
_distinct_row.size());
+                }
             } else {
-                auto dst = output_column->assume_mutable();
-                key_columns[i]->append_data_by_selector(dst, _distinct_row);
+                for (int i = 0; i < key_size; ++i) {
+                    auto output_column = out_block->get_by_position(i).column;
+                    auto dst = output_column->assume_mutable();
+                    key_columns[i]->append_data_by_selector(dst, 
_distinct_row);
+                }
             }
         }
     } else {
@@ -279,6 +308,7 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
             }
         }
         out_block->swap(vectorized::Block(columns_with_schema));
+        _cache_block = out_block->clone_empty();
         if (_stop_emplace_flag) {
             in_block->clear(); // clear the column ref with stop_emplace_flag 
= true
         }
@@ -443,26 +473,18 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* 
state, vectorized::Bloc
                                            bool eos) const {
     auto& local_state = get_local_state(state);
     local_state._input_num_rows += in_block->rows();
-    Status ret = Status::OK();
-    if (in_block->rows() > 0) {
-        RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key(
-                in_block, local_state._aggregated_block.get()));
-
-        // get enough data or reached limit rows, need push block to queue
-        if (!local_state._stop_emplace_flag && _limit != -1 &&
-            (local_state._aggregated_block->rows() + 
local_state._output_distinct_rows) >= _limit) {
-            auto limit_rows = _limit - local_state._output_distinct_rows;
-            local_state._aggregated_block->set_num_rows(limit_rows);
-            local_state._output_distinct_rows += limit_rows;
-        } else if (!local_state._stop_emplace_flag) {
-            local_state._output_distinct_rows += 
local_state._aggregated_block->rows();
-        }
+    if (in_block->rows() == 0) {
+        return Status::OK();
     }
 
-    // reach limit or source finish
-    if ((UNLIKELY(eos)) || (_limit != -1 && local_state._output_distinct_rows 
>= _limit)) {
-        local_state._output_distinct_rows += 
local_state._aggregated_block->rows();
-        return Status::OK(); // need given finish signal
+    RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key(
+            in_block, local_state._aggregated_block.get()));
+    // set limit and reach limit
+    if (_limit != -1 &&
+        (local_state._num_rows_returned + 
local_state._aggregated_block->rows()) > _limit) {
+        auto limit_rows = _limit - local_state._num_rows_returned;
+        local_state._aggregated_block->set_num_rows(limit_rows);
+        local_state._reach_limit = true;
     }
     return Status::OK();
 }
@@ -473,23 +495,34 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* 
state, vectorized::Bloc
     if (!local_state._aggregated_block->empty()) {
         block->swap(*local_state._aggregated_block);
         local_state._aggregated_block->clear_column_data(block->columns());
+        // The cache block may have additional data due to exceeding the batch 
size.
+        if (!local_state._cache_block.empty()) {
+            local_state._swap_cache_block(local_state._aggregated_block.get());
+        }
     }
 
     local_state._make_nullable_output_key(block);
-    if (_is_streaming_preagg == false) {
+    if (!_is_streaming_preagg) {
         // dispose the having clause, should not be execute in prestreaming agg
         RETURN_IF_ERROR(
                 vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
     }
     local_state.add_num_rows_returned(block->rows());
-    *eos = local_state._child_eos || (_limit != -1 && 
local_state._output_distinct_rows >= _limit);
+    COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
+    // If the limit is not reached, it is important to ensure that 
_aggregated_block is empty
+    // because it may still contain data.
+    // However, if the limit is reached, there is no need to output data even 
if some exists.
+    *eos = (local_state._child_eos && local_state._aggregated_block->empty()) 
||
+           (local_state._reach_limit);
     return Status::OK();
 }
 
 bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) 
const {
     auto& local_state = get_local_state(state);
-    return local_state._aggregated_block->empty() && !local_state._child_eos &&
-           (_limit == -1 || local_state._output_distinct_rows < _limit);
+    const bool need_batch = local_state._stop_emplace_flag
+                                    ? local_state._aggregated_block->empty()
+                                    : local_state._aggregated_block->rows() < 
state->batch_size();
+    return need_batch && !(local_state._child_eos || local_state._reach_limit);
 }
 
 Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
@@ -498,10 +531,23 @@ Status 
DistinctStreamingAggLocalState::close(RuntimeState* state) {
     }
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_close_timer);
+    /// _hash_table_size_counter may be null if prepare failed.
+    if (_hash_table_size_counter) {
+        std::visit(
+                [&](auto&& agg_method) {
+                    COUNTER_SET(_hash_table_size_counter, 
int64_t(agg_method.hash_table->size()));
+                },
+                _agg_data->method_variant);
+    }
     if (Base::_closed) {
         return Status::OK();
     }
     _aggregated_block->clear();
+    // If the limit is reached, there may still be remaining data in the cache 
block.
+    // If the limit is not reached, the cache block must be empty.
+    DCHECK(_reach_limit || _aggregated_block->empty());
+    DCHECK(_reach_limit || _cache_block.empty());
+    _cache_block.clear();
     return Base::close(state);
 }
 
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 4c15194362e..125f176375b 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -58,15 +58,19 @@ private:
     void _make_nullable_output_key(vectorized::Block* block);
     bool _should_expand_preagg_hash_tables();
 
+    void _swap_cache_block(vectorized::Block* block) {
+        DCHECK(!_cache_block.is_empty_column());
+        block->swap(_cache_block);
+        _cache_block = block->clone_empty();
+    }
+
     std::shared_ptr<char> dummy_mapped_data;
     vectorized::IColumn::Selector _distinct_row;
     vectorized::Arena _arena;
-    int64_t _output_distinct_rows = 0;
     size_t _input_num_rows = 0;
     bool _should_expand_hash_table = true;
-    int64_t _cur_num_rows_returned = 0;
     bool _stop_emplace_flag = false;
-
+    const int batch_size;
     std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
     vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
@@ -75,13 +79,16 @@ private:
     std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
     std::unique_ptr<vectorized::Block> _child_block = nullptr;
     bool _child_eos = false;
+    bool _reach_limit = false;
     std::unique_ptr<vectorized::Block> _aggregated_block = nullptr;
-
+    vectorized::Block _cache_block;
     RuntimeProfile::Counter* _build_timer = nullptr;
     RuntimeProfile::Counter* _expr_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
+    RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+    RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
 };
 
 class DistinctStreamingAggOperatorX final
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index f1bcee1ad7d..a6d48a41fca 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -499,6 +499,9 @@ public:
 
     virtual void append_data_by_selector(MutablePtr& res, const Selector& 
selector) const = 0;
 
+    virtual void append_data_by_selector(MutablePtr& res, const Selector& 
selector, size_t begin,
+                                         size_t end) const = 0;
+
     /// Insert data from several other columns according to source mask (used 
in vertical merge).
     /// For now it is a helper to de-virtualize calls to insert*() functions 
inside gather loop
     /// (descendants should call gatherer_stream.gather(*this) to implement 
this function.)
@@ -695,6 +698,9 @@ public:
 protected:
     template <typename Derived>
     void append_data_by_selector_impl(MutablePtr& res, const Selector& 
selector) const;
+    template <typename Derived>
+    void append_data_by_selector_impl(MutablePtr& res, const Selector& 
selector, size_t begin,
+                                      size_t end) const;
 };
 
 using ColumnPtr = IColumn::Ptr;
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index 118e7ab05c6..7d619c14eff 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -207,6 +207,10 @@ public:
                                  const IColumn::Selector& selector) const 
override {
         return append_data_by_selector_impl<ColumnArray>(res, selector);
     }
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        return append_data_by_selector_impl<ColumnArray>(res, selector, begin, 
end);
+    }
 
     void for_each_subcolumn(ColumnCallback callback) override {
         callback(offsets);
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index d983fc9175e..54b0650e800 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -262,6 +262,11 @@ public:
                                  const IColumn::Selector& selector) const 
override {
         this->template append_data_by_selector_impl<ColumnComplexType<T>>(res, 
selector);
     }
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        this->template append_data_by_selector_impl<ColumnComplexType<T>>(res, 
selector, begin,
+                                                                          end);
+    }
 
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
         DCHECK(size() > self_row);
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 66db2ed54f0..e1c3c52949b 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -253,6 +253,10 @@ public:
                                  const IColumn::Selector& selector) const 
override {
         assert_cast<Self&>(*res).resize(selector.size());
     }
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        assert_cast<Self&>(*res).resize(end - begin);
+    }
 
     void for_each_subcolumn(ColumnCallback callback) override { 
callback(data); }
 
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 152d4165416..7f286699ab8 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -231,6 +231,10 @@ public:
                                  const IColumn::Selector& selector) const 
override {
         this->template append_data_by_selector_impl<Self>(res, selector);
     }
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        this->template append_data_by_selector_impl<Self>(res, selector, 
begin, end);
+    }
 
     //    void gather(ColumnGathererStream & gatherer_stream) override;
 
diff --git a/be/src/vec/columns/column_dictionary.h 
b/be/src/vec/columns/column_dictionary.h
index 175912f9668..3b1537b83d3 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -195,6 +195,11 @@ public:
         LOG(FATAL) << "append_data_by_selector is not supported in 
ColumnDictionary!";
     }
 
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        LOG(FATAL) << "append_data_by_selector is not supported in 
ColumnDictionary!";
+    }
+
     [[noreturn]] ColumnPtr index(const IColumn& indexes, size_t limit) const 
override {
         LOG(FATAL) << "index not implemented";
         __builtin_unreachable();
diff --git a/be/src/vec/columns/column_dummy.h 
b/be/src/vec/columns/column_dummy.h
index f330a60642a..a28133c69a3 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -130,6 +130,20 @@ public:
         for (size_t i = 0; i < selector.size(); ++i) res->insert_from(*this, 
selector[i]);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        size_t num_rows = size();
+
+        if (num_rows < selector.size()) {
+            LOG(FATAL) << fmt::format("Size of selector: {}, is larger than 
size of column:{}",
+                                      selector.size(), num_rows);
+        }
+
+        res->reserve(num_rows);
+
+        for (size_t i = begin; i < end; ++i) res->insert_from(*this, 
selector[i]);
+    }
+
     void addSize(size_t delta) { s += delta; }
 
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
diff --git a/be/src/vec/columns/column_fixed_length_object.h 
b/be/src/vec/columns/column_fixed_length_object.h
index 5875b8f6c30..a10204ed48b 100644
--- a/be/src/vec/columns/column_fixed_length_object.h
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -238,6 +238,11 @@ public:
         this->template append_data_by_selector_impl<Self>(res, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        this->template append_data_by_selector_impl<Self>(res, selector, 
begin, end);
+    }
+
     size_t byte_size() const override { return _data.size(); }
 
     size_t item_size() const { return _item_size; }
diff --git a/be/src/vec/columns/column_impl.h b/be/src/vec/columns/column_impl.h
index 20292ad2351..4a66720c116 100644
--- a/be/src/vec/columns/column_impl.h
+++ b/be/src/vec/columns/column_impl.h
@@ -33,7 +33,8 @@
 namespace doris::vectorized {
 
 template <typename Derived>
-void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& 
selector) const {
+void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& 
selector, size_t begin,
+                                           size_t end) const {
     size_t num_rows = size();
 
     if (num_rows < selector.size()) {
@@ -43,8 +44,13 @@ void IColumn::append_data_by_selector_impl(MutablePtr& res, 
const Selector& sele
 
     res->reserve(num_rows);
 
-    for (size_t i = 0; i < selector.size(); ++i)
+    for (size_t i = begin; i < end; ++i) {
         static_cast<Derived&>(*res).insert_from(*this, selector[i]);
+    }
+}
+template <typename Derived>
+void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& 
selector) const {
+    append_data_by_selector_impl<Derived>(res, selector, 0, selector.size());
 }
 
 template <typename Derived>
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 2cdfcae8c73..e0bc7e72d78 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -133,6 +133,10 @@ public:
                                  const IColumn::Selector& selector) const 
override {
         return append_data_by_selector_impl<ColumnMap>(res, selector);
     }
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        return append_data_by_selector_impl<ColumnMap>(res, selector, begin, 
end);
+    }
 
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
         LOG(FATAL) << "Method replace_column_data is not supported for " << 
get_name();
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 56dbc619a39..0b783309499 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -232,6 +232,11 @@ public:
         append_data_by_selector_impl<ColumnNullable>(res, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        append_data_by_selector_impl<ColumnNullable>(res, selector, begin, 
end);
+    }
+
     //    void gather(ColumnGathererStream & gatherer_stream) override;
 
     void for_each_subcolumn(ColumnCallback callback) override {
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index c596717194f..6d7104647c7 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -1435,11 +1435,6 @@ Status ColumnObject::extract_root(const PathInData& 
path, MutableColumnPtr& dst)
     return Status::OK();
 }
 
-void ColumnObject::append_data_by_selector(MutableColumnPtr& res,
-                                           const IColumn::Selector& selector) 
const {
-    return append_data_by_selector_impl<ColumnObject>(res, selector);
-}
-
 void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* 
indices_begin,
                                        const uint32_t* indices_end) {
     for (const auto* x = indices_begin; x != indices_end; ++x) {
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 26b2c66a755..02edde29cf5 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -385,7 +385,14 @@ public:
     void insert(const Field& field) override { try_insert(field); }
 
     void append_data_by_selector(MutableColumnPtr& res,
-                                 const IColumn::Selector& selector) const 
override;
+                                 const IColumn::Selector& selector) const 
override {
+        append_data_by_selector_impl<ColumnObject>(res, selector);
+    }
+
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        append_data_by_selector_impl<ColumnObject>(res, selector, begin, end);
+    }
 
     void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
                              const uint32_t* indices_end) override;
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 405ada3b48d..c2eedfbc791 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -521,6 +521,10 @@ public:
         append_data_by_selector_impl<ColumnString>(res, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        append_data_by_selector_impl<ColumnString>(res, selector, begin, end);
+    }
     //    void gather(ColumnGathererStream & gatherer_stream) override;
 
     void reserve(size_t n) override;
diff --git a/be/src/vec/columns/column_struct.h 
b/be/src/vec/columns/column_struct.h
index 2ca4fdec015..5157b1ad6b0 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -132,6 +132,10 @@ public:
     void append_data_by_selector(MutableColumnPtr& res, const Selector& 
selector) const override {
         return append_data_by_selector_impl<ColumnStruct>(res, selector);
     }
+    void append_data_by_selector(MutableColumnPtr& res, const Selector& 
selector, size_t begin,
+                                 size_t end) const override {
+        return append_data_by_selector_impl<ColumnStruct>(res, selector, 
begin, end);
+    }
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
         LOG(FATAL) << "Method replace_column_data is not supported for " << 
get_name();
     }
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 7e035b8b47a..0b7d8350c5f 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -412,6 +412,10 @@ public:
                                  const IColumn::Selector& selector) const 
override {
         this->template append_data_by_selector_impl<Self>(res, selector);
     }
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        this->template append_data_by_selector_impl<Self>(res, selector, 
begin, end);
+    }
 
     bool is_fixed_and_contiguous() const override { return true; }
     size_t size_of_value_if_fixed() const override { return sizeof(T); }
diff --git a/be/src/vec/columns/predicate_column.h 
b/be/src/vec/columns/predicate_column.h
index 2a390d39d29..0fdca54dc38 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -433,6 +433,10 @@ public:
                                  const IColumn::Selector& selector) const 
override {
         LOG(FATAL) << "append_data_by_selector is not supported in 
PredicateColumnType!";
     }
+    void append_data_by_selector(MutableColumnPtr& res, const 
IColumn::Selector& selector,
+                                 size_t begin, size_t end) const override {
+        LOG(FATAL) << "append_data_by_selector is not supported in 
PredicateColumnType!";
+    }
 
     Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* 
col_ptr) override {
         ColumnType* column = assert_cast<ColumnType*>(col_ptr);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to