This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c7e99278cf2e48f29afb8afe9a9d4f2c857a7af4 Author: Mryange <[email protected]> AuthorDate: Mon Apr 15 15:00:09 2024 +0800 [refine](Operator) When _stop_emplace_flag is not set to true, perform batch processing on the block. (#33173) --- .../distinct_streaming_aggregation_operator.cpp | 112 +++++++++++++++------ .../exec/distinct_streaming_aggregation_operator.h | 15 ++- be/src/vec/columns/column.h | 6 ++ be/src/vec/columns/column_array.h | 4 + be/src/vec/columns/column_complex.h | 5 + be/src/vec/columns/column_const.h | 4 + be/src/vec/columns/column_decimal.h | 4 + be/src/vec/columns/column_dictionary.h | 5 + be/src/vec/columns/column_dummy.h | 14 +++ be/src/vec/columns/column_fixed_length_object.h | 5 + be/src/vec/columns/column_impl.h | 10 +- be/src/vec/columns/column_map.h | 4 + be/src/vec/columns/column_nullable.h | 5 + be/src/vec/columns/column_object.cpp | 5 - be/src/vec/columns/column_object.h | 9 +- be/src/vec/columns/column_string.h | 4 + be/src/vec/columns/column_struct.h | 4 + be/src/vec/columns/column_vector.h | 4 + be/src/vec/columns/predicate_column.h | 4 + 19 files changed, 178 insertions(+), 45 deletions(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 7983b269488..9151f4a29d5 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -46,7 +46,7 @@ static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { {0, 0.0}, // Expand into L3 cache if we look like we're getting some reduction. // At present, The L2 cache is generally 1024k or more - {1024 * 1024, 1.1}, + {1024 * 1024, 0.0}, // Expand into main memory if we're getting a significant reduction. // The L3 cache is generally 16MB or more {16 * 1024 * 1024, 2.0}, @@ -59,6 +59,7 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta OperatorXBase* parent) : PipelineXLocalState<FakeSharedState>(state, parent), dummy_mapped_data(std::make_shared<char>('A')), + batch_size(state->batch_size()), _agg_arena_pool(std::make_unique<vectorized::Arena>()), _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()), _agg_profile_arena(std::make_unique<vectorized::Arena>()), @@ -83,6 +84,8 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& _hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime"); _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); + _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); + _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); if (_probe_expr_ctxs.empty()) { _agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>( @@ -120,7 +123,7 @@ bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() { // were aggregated into it. Exclude passed through rows from this calculation since // they were not in hash tables. const int64_t input_rows = _input_num_rows; - const int64_t aggregated_input_rows = input_rows - _cur_num_rows_returned; + const int64_t aggregated_input_rows = input_rows - _num_rows_returned; // TODO chenhao // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows; @@ -245,22 +248,48 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( _distinct_row.clear(); _distinct_row.reserve(rows); - RETURN_IF_CATCH_EXCEPTION( - _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); - // need use _cur_num_rows_returned to decide whether to do continue emplace into hash table - _cur_num_rows_returned += _distinct_row.size(); + if (!_stop_emplace_flag) { + RETURN_IF_CATCH_EXCEPTION( + _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); + } bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() && out_block->mem_reuse(); + SCOPED_TIMER(_insert_keys_to_column_timer); if (mem_reuse) { - for (int i = 0; i < key_size; ++i) { - auto output_column = out_block->get_by_position(i).column; - if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1) + if (_stop_emplace_flag && !out_block->empty()) { + // when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block + // need output those data firstly + DCHECK(_distinct_row.empty()); + _distinct_row.resize(rows); + std::iota(_distinct_row.begin(), _distinct_row.end(), 0); + } + DCHECK_EQ(out_block->columns(), key_size); + if (_stop_emplace_flag && _distinct_row.empty()) { + // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1) + for (int i = 0; i < key_size; ++i) { + auto output_column = out_block->get_by_position(i).column; out_block->replace_by_position(i, key_columns[i]->assume_mutable()); in_block->replace_by_position(result_idxs[i], output_column); + } + } else { + DCHECK_EQ(_cache_block.rows(), 0); + if (out_block->rows() + _distinct_row.size() > batch_size) { + size_t split_size = batch_size - out_block->rows(); + for (int i = 0; i < key_size; ++i) { + auto output_dst = out_block->get_by_position(i).column->assume_mutable(); + key_columns[i]->append_data_by_selector(output_dst, _distinct_row, 0, + split_size); + auto cache_dst = _cache_block.get_by_position(i).column->assume_mutable(); + key_columns[i]->append_data_by_selector(cache_dst, _distinct_row, split_size, + _distinct_row.size()); + } } else { - auto dst = output_column->assume_mutable(); - key_columns[i]->append_data_by_selector(dst, _distinct_row); + for (int i = 0; i < key_size; ++i) { + auto output_column = out_block->get_by_position(i).column; + auto dst = output_column->assume_mutable(); + key_columns[i]->append_data_by_selector(dst, _distinct_row); + } } } } else { @@ -279,6 +308,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( } } out_block->swap(vectorized::Block(columns_with_schema)); + _cache_block = out_block->clone_empty(); if (_stop_emplace_flag) { in_block->clear(); // clear the column ref with stop_emplace_flag = true } @@ -443,26 +473,18 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc bool eos) const { auto& local_state = get_local_state(state); local_state._input_num_rows += in_block->rows(); - Status ret = Status::OK(); - if (in_block->rows() > 0) { - RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key( - in_block, local_state._aggregated_block.get())); - - // get enough data or reached limit rows, need push block to queue - if (!local_state._stop_emplace_flag && _limit != -1 && - (local_state._aggregated_block->rows() + local_state._output_distinct_rows) >= _limit) { - auto limit_rows = _limit - local_state._output_distinct_rows; - local_state._aggregated_block->set_num_rows(limit_rows); - local_state._output_distinct_rows += limit_rows; - } else if (!local_state._stop_emplace_flag) { - local_state._output_distinct_rows += local_state._aggregated_block->rows(); - } + if (in_block->rows() == 0) { + return Status::OK(); } - // reach limit or source finish - if ((UNLIKELY(eos)) || (_limit != -1 && local_state._output_distinct_rows >= _limit)) { - local_state._output_distinct_rows += local_state._aggregated_block->rows(); - return Status::OK(); // need given finish signal + RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key( + in_block, local_state._aggregated_block.get())); + // set limit and reach limit + if (_limit != -1 && + (local_state._num_rows_returned + local_state._aggregated_block->rows()) > _limit) { + auto limit_rows = _limit - local_state._num_rows_returned; + local_state._aggregated_block->set_num_rows(limit_rows); + local_state._reach_limit = true; } return Status::OK(); } @@ -473,23 +495,34 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc if (!local_state._aggregated_block->empty()) { block->swap(*local_state._aggregated_block); local_state._aggregated_block->clear_column_data(block->columns()); + // The cache block may have additional data due to exceeding the batch size. + if (!local_state._cache_block.empty()) { + local_state._swap_cache_block(local_state._aggregated_block.get()); + } } local_state._make_nullable_output_key(block); - if (_is_streaming_preagg == false) { + if (!_is_streaming_preagg) { // dispose the having clause, should not be execute in prestreaming agg RETURN_IF_ERROR( vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); } local_state.add_num_rows_returned(block->rows()); - *eos = local_state._child_eos || (_limit != -1 && local_state._output_distinct_rows >= _limit); + COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); + // If the limit is not reached, it is important to ensure that _aggregated_block is empty + // because it may still contain data. + // However, if the limit is reached, there is no need to output data even if some exists. + *eos = (local_state._child_eos && local_state._aggregated_block->empty()) || + (local_state._reach_limit); return Status::OK(); } bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) const { auto& local_state = get_local_state(state); - return local_state._aggregated_block->empty() && !local_state._child_eos && - (_limit == -1 || local_state._output_distinct_rows < _limit); + const bool need_batch = local_state._stop_emplace_flag + ? local_state._aggregated_block->empty() + : local_state._aggregated_block->rows() < state->batch_size(); + return need_batch && !(local_state._child_eos || local_state._reach_limit); } Status DistinctStreamingAggLocalState::close(RuntimeState* state) { @@ -498,10 +531,23 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) { } SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_close_timer); + /// _hash_table_size_counter may be null if prepare failed. + if (_hash_table_size_counter) { + std::visit( + [&](auto&& agg_method) { + COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.hash_table->size())); + }, + _agg_data->method_variant); + } if (Base::_closed) { return Status::OK(); } _aggregated_block->clear(); + // If the limit is reached, there may still be remaining data in the cache block. + // If the limit is not reached, the cache block must be empty. + DCHECK(_reach_limit || _aggregated_block->empty()); + DCHECK(_reach_limit || _cache_block.empty()); + _cache_block.clear(); return Base::close(state); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 4c15194362e..125f176375b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -58,15 +58,19 @@ private: void _make_nullable_output_key(vectorized::Block* block); bool _should_expand_preagg_hash_tables(); + void _swap_cache_block(vectorized::Block* block) { + DCHECK(!_cache_block.is_empty_column()); + block->swap(_cache_block); + _cache_block = block->clone_empty(); + } + std::shared_ptr<char> dummy_mapped_data; vectorized::IColumn::Selector _distinct_row; vectorized::Arena _arena; - int64_t _output_distinct_rows = 0; size_t _input_num_rows = 0; bool _should_expand_hash_table = true; - int64_t _cur_num_rows_returned = 0; bool _stop_emplace_flag = false; - + const int batch_size; std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr; vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr; std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; @@ -75,13 +79,16 @@ private: std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr; std::unique_ptr<vectorized::Block> _child_block = nullptr; bool _child_eos = false; + bool _reach_limit = false; std::unique_ptr<vectorized::Block> _aggregated_block = nullptr; - + vectorized::Block _cache_block; RuntimeProfile::Counter* _build_timer = nullptr; RuntimeProfile::Counter* _expr_timer = nullptr; RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; RuntimeProfile::Counter* _hash_table_input_counter = nullptr; + RuntimeProfile::Counter* _hash_table_size_counter = nullptr; + RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; }; class DistinctStreamingAggOperatorX final diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index f1bcee1ad7d..a6d48a41fca 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -499,6 +499,9 @@ public: virtual void append_data_by_selector(MutablePtr& res, const Selector& selector) const = 0; + virtual void append_data_by_selector(MutablePtr& res, const Selector& selector, size_t begin, + size_t end) const = 0; + /// Insert data from several other columns according to source mask (used in vertical merge). /// For now it is a helper to de-virtualize calls to insert*() functions inside gather loop /// (descendants should call gatherer_stream.gather(*this) to implement this function.) @@ -695,6 +698,9 @@ public: protected: template <typename Derived> void append_data_by_selector_impl(MutablePtr& res, const Selector& selector) const; + template <typename Derived> + void append_data_by_selector_impl(MutablePtr& res, const Selector& selector, size_t begin, + size_t end) const; }; using ColumnPtr = IColumn::Ptr; diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 118e7ab05c6..7d619c14eff 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -207,6 +207,10 @@ public: const IColumn::Selector& selector) const override { return append_data_by_selector_impl<ColumnArray>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + return append_data_by_selector_impl<ColumnArray>(res, selector, begin, end); + } void for_each_subcolumn(ColumnCallback callback) override { callback(offsets); diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index d983fc9175e..54b0650e800 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -262,6 +262,11 @@ public: const IColumn::Selector& selector) const override { this->template append_data_by_selector_impl<ColumnComplexType<T>>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + this->template append_data_by_selector_impl<ColumnComplexType<T>>(res, selector, begin, + end); + } void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override { DCHECK(size() > self_row); diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 66db2ed54f0..e1c3c52949b 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -253,6 +253,10 @@ public: const IColumn::Selector& selector) const override { assert_cast<Self&>(*res).resize(selector.size()); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + assert_cast<Self&>(*res).resize(end - begin); + } void for_each_subcolumn(ColumnCallback callback) override { callback(data); } diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 152d4165416..7f286699ab8 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -231,6 +231,10 @@ public: const IColumn::Selector& selector) const override { this->template append_data_by_selector_impl<Self>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + this->template append_data_by_selector_impl<Self>(res, selector, begin, end); + } // void gather(ColumnGathererStream & gatherer_stream) override; diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index 175912f9668..3b1537b83d3 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -195,6 +195,11 @@ public: LOG(FATAL) << "append_data_by_selector is not supported in ColumnDictionary!"; } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + LOG(FATAL) << "append_data_by_selector is not supported in ColumnDictionary!"; + } + [[noreturn]] ColumnPtr index(const IColumn& indexes, size_t limit) const override { LOG(FATAL) << "index not implemented"; __builtin_unreachable(); diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index f330a60642a..a28133c69a3 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -130,6 +130,20 @@ public: for (size_t i = 0; i < selector.size(); ++i) res->insert_from(*this, selector[i]); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + size_t num_rows = size(); + + if (num_rows < selector.size()) { + LOG(FATAL) << fmt::format("Size of selector: {}, is larger than size of column:{}", + selector.size(), num_rows); + } + + res->reserve(num_rows); + + for (size_t i = begin; i < end; ++i) res->insert_from(*this, selector[i]); + } + void addSize(size_t delta) { s += delta; } void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override { diff --git a/be/src/vec/columns/column_fixed_length_object.h b/be/src/vec/columns/column_fixed_length_object.h index 5875b8f6c30..a10204ed48b 100644 --- a/be/src/vec/columns/column_fixed_length_object.h +++ b/be/src/vec/columns/column_fixed_length_object.h @@ -238,6 +238,11 @@ public: this->template append_data_by_selector_impl<Self>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + this->template append_data_by_selector_impl<Self>(res, selector, begin, end); + } + size_t byte_size() const override { return _data.size(); } size_t item_size() const { return _item_size; } diff --git a/be/src/vec/columns/column_impl.h b/be/src/vec/columns/column_impl.h index 20292ad2351..4a66720c116 100644 --- a/be/src/vec/columns/column_impl.h +++ b/be/src/vec/columns/column_impl.h @@ -33,7 +33,8 @@ namespace doris::vectorized { template <typename Derived> -void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& selector) const { +void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& selector, size_t begin, + size_t end) const { size_t num_rows = size(); if (num_rows < selector.size()) { @@ -43,8 +44,13 @@ void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& sele res->reserve(num_rows); - for (size_t i = 0; i < selector.size(); ++i) + for (size_t i = begin; i < end; ++i) { static_cast<Derived&>(*res).insert_from(*this, selector[i]); + } +} +template <typename Derived> +void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& selector) const { + append_data_by_selector_impl<Derived>(res, selector, 0, selector.size()); } template <typename Derived> diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 2cdfcae8c73..e0bc7e72d78 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -133,6 +133,10 @@ public: const IColumn::Selector& selector) const override { return append_data_by_selector_impl<ColumnMap>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + return append_data_by_selector_impl<ColumnMap>(res, selector, begin, end); + } void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override { LOG(FATAL) << "Method replace_column_data is not supported for " << get_name(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 56dbc619a39..0b783309499 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -232,6 +232,11 @@ public: append_data_by_selector_impl<ColumnNullable>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + append_data_by_selector_impl<ColumnNullable>(res, selector, begin, end); + } + // void gather(ColumnGathererStream & gatherer_stream) override; void for_each_subcolumn(ColumnCallback callback) override { diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index c596717194f..6d7104647c7 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -1435,11 +1435,6 @@ Status ColumnObject::extract_root(const PathInData& path, MutableColumnPtr& dst) return Status::OK(); } -void ColumnObject::append_data_by_selector(MutableColumnPtr& res, - const IColumn::Selector& selector) const { - return append_data_by_selector_impl<ColumnObject>(res, selector); -} - void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) { for (const auto* x = indices_begin; x != indices_end; ++x) { diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 26b2c66a755..02edde29cf5 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -385,7 +385,14 @@ public: void insert(const Field& field) override { try_insert(field); } void append_data_by_selector(MutableColumnPtr& res, - const IColumn::Selector& selector) const override; + const IColumn::Selector& selector) const override { + append_data_by_selector_impl<ColumnObject>(res, selector); + } + + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + append_data_by_selector_impl<ColumnObject>(res, selector, begin, end); + } void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override; diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 405ada3b48d..c2eedfbc791 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -521,6 +521,10 @@ public: append_data_by_selector_impl<ColumnString>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + append_data_by_selector_impl<ColumnString>(res, selector, begin, end); + } // void gather(ColumnGathererStream & gatherer_stream) override; void reserve(size_t n) override; diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index 2ca4fdec015..5157b1ad6b0 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -132,6 +132,10 @@ public: void append_data_by_selector(MutableColumnPtr& res, const Selector& selector) const override { return append_data_by_selector_impl<ColumnStruct>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const Selector& selector, size_t begin, + size_t end) const override { + return append_data_by_selector_impl<ColumnStruct>(res, selector, begin, end); + } void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override { LOG(FATAL) << "Method replace_column_data is not supported for " << get_name(); } diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 7e035b8b47a..0b7d8350c5f 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -412,6 +412,10 @@ public: const IColumn::Selector& selector) const override { this->template append_data_by_selector_impl<Self>(res, selector); } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + this->template append_data_by_selector_impl<Self>(res, selector, begin, end); + } bool is_fixed_and_contiguous() const override { return true; } size_t size_of_value_if_fixed() const override { return sizeof(T); } diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h index 2a390d39d29..0fdca54dc38 100644 --- a/be/src/vec/columns/predicate_column.h +++ b/be/src/vec/columns/predicate_column.h @@ -433,6 +433,10 @@ public: const IColumn::Selector& selector) const override { LOG(FATAL) << "append_data_by_selector is not supported in PredicateColumnType!"; } + void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector, + size_t begin, size_t end) const override { + LOG(FATAL) << "append_data_by_selector is not supported in PredicateColumnType!"; + } Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override { ColumnType* column = assert_cast<ColumnType*>(col_ptr); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
