This is an automated email from the ASF dual-hosted git repository. weixiang pushed a commit to branch memtable_opt_rebase_bak in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0d3880b41586fe8f159a3c8ab8a395f65cfe0ef7 Author: weixiang <[email protected]> AuthorDate: Mon Apr 18 21:37:34 2022 +0800 [feature-wip](stream-load-vec) opt memtable --- be/src/olap/delta_writer.cpp | 7 +- be/src/olap/memtable.cpp | 147 +++++++++++++++-- be/src/olap/memtable.h | 43 ++++- be/src/vec/CMakeLists.txt | 1 + .../vec/aggregate_functions/block_aggregator.cpp | 180 +++++++++++++++++++++ be/src/vec/aggregate_functions/block_aggregator.h | 70 ++++++++ be/src/vec/core/block.cpp | 20 +++ be/src/vec/core/block.h | 13 ++ 8 files changed, 461 insertions(+), 20 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 11ee242449..5cf0c9cd22 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -191,11 +191,14 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> } int start = 0, end = 0; + bool flush = false; const size_t num_rows = row_idxs.size(); for (; start < num_rows;) { auto count = end + 1 - start; if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) { - _mem_table->insert(block, row_idxs[start], count); + if (_mem_table->insert(block, row_idxs[start], count)) { + flush = true; + } start += count; end = start; } else { @@ -203,7 +206,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> } } - if (_mem_table->memory_usage() >= config::write_buffer_size) { + if (flush || _mem_table->is_full()) { RETURN_NOT_OK(_flush_memtable_async()); _reset_mem_table(); } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index c7f94ead4f..e1c97f2d4d 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -53,6 +53,8 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet // TODO: Support ZOrderComparator in the future _vec_skip_list = std::make_unique<VecTable>( _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); + _block_aggregator = + std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true); } else { _vec_skip_list = nullptr; if (_keys_type == KeysType::DUP_KEYS) { @@ -114,27 +116,135 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, *_pblock, -1); } -void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) { +bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) { if (_is_first_insertion) { _is_first_insertion = false; auto cloneBlock = block->clone_without_columns(); - _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); - _vec_row_comparator->set_block(&_input_mutable_block); - _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock); if (_keys_type != KeysType::DUP_KEYS) { _init_agg_functions(block); } } - size_t cursor_in_mutableblock = _input_mutable_block.rows(); - size_t oldsize = _input_mutable_block.allocated_bytes(); - _input_mutable_block.add_rows(block, row_pos, num_rows); - size_t newsize = _input_mutable_block.allocated_bytes(); - _mem_usage += newsize - oldsize; - _mem_tracker->consume(newsize - oldsize); - - for (int i = 0; i < num_rows; i++) { - _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); - _insert_one_row_from_block(_row_in_blocks.back()); + _block->add_rows(block, row_pos, num_rows); + _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows(); + // Memtalbe is full, do not flush immediately + // First try to merge these blocks + // If the merged memtable is still full or we can not benefit a lot from merge at first + // Then flush the memtable into disk. + bool is_flush = false; + if (is_full()) { + size_t before_merge_bytes = bytes_allocated(); + _merge(); + size_t after_merged_bytes = bytes_allocated(); + // TODO(weixiang): magic number here, make it configurable later. + if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) { + is_flush = true; + } + } + return is_flush; +} + +size_t MemTable::bytes_allocated() const { + return _block_bytes_usage + _block_aggregator->get_bytes_usage(); +} + +bool MemTable::is_full() const { + return bytes_allocated() > config::write_buffer_size; +} + +void MemTable::_merge() { + if (_block == nullptr || _keys_type == KeysType::DUP_KEYS) { + return; + } + _sort(false); + _agg(false); + _merge_count++; +} + +void MemTable::_agg(const bool finalize) { + // note that the _block had been sorted before. + if (_sorted_block == nullptr || _sorted_block->rows() <= 0) { + return; + } + vectorized::Block sorted_block = _sorted_block->to_block(); + _block_aggregator->append_block(&sorted_block); + _block_aggregator->partial_sort_merged_aggregate(); + if (finalize) { + _sorted_block.reset(); + } else { + _sorted_block->clear_column_data(); + } +} + +void MemTable::_sort(const bool finalize) { + _index_for_sort.resize(_block->rows()); + for (uint32_t i = 0; i < _block->rows(); i++) { + _index_for_sort[i] = {i, i}; + } + + _sort_block_by_rows(); + _sorted_block = _block->create_same_struct_block(0); + _append_sorted_block(_block.get(), _sorted_block.get()); + if (finalize) { + _block.reset(); + } else { + _block->clear_column_data(); + } + _block_bytes_usage = 0; +} + +void MemTable::_sort_block_by_rows() { + std::sort(_index_for_sort.begin(), _index_for_sort.end(), + [this](const MemTable::OrderedIndexItem& left, + const MemTable::OrderedIndexItem& right) { + int res = _block->compare_at(left.index_in_block, right.index_in_block, + _schema->num_key_columns(), *_block.get(), -1); + if (res != 0) { + return res < 0; + } + return left.incoming_index < right.incoming_index; + }); +} + +void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst) { + size_t row_num = src->rows(); + _sorted_index_in_block.clear(); + _sorted_index_in_block.reserve(row_num); + for (size_t i = 0; i < row_num; i++) { + _sorted_index_in_block.push_back(_index_for_sort[i].index_in_block); + } + vectorized::Block src_block = src->to_block(); + dst->add_rows(&src_block, _sorted_index_in_block.data(), + _sorted_index_in_block.data() + row_num); +} + +void MemTable::finalize() { + //TODO(weixiang): check here + if (_block == nullptr) { + return; + } + + if (_keys_type != KeysType::DUP_KEYS) { + // agg mode + if (_block->rows() > 0) { + _merge(); + } + if (_merge_count > 1) { + _block = _block_aggregator->get_partial_agged_block(); + _block_aggregator->reset_aggregator(); + _sort(true); + _agg(true); + } else { + _block.reset(); + _sorted_block.reset(); + } + + _block_bytes_usage = 0; + _sorted_block = _block_aggregator->get_partial_agged_block(); + + } else { + // dup mode + _sort(true); } } @@ -271,6 +381,13 @@ vectorized::Block MemTable::_collect_vskiplist_results() { } Status MemTable::flush() { + if (!_skip_list) { + finalize(); + if (_sorted_block == nullptr) { + return Status::OK(); + } + } + VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; @@ -301,7 +418,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) { RETURN_NOT_OK(st); } } else { - vectorized::Block block = _collect_vskiplist_results(); + vectorized::Block block = _sorted_block->to_block(); // beta rowset flush parallel, segment write add block is not // thread safe, so use tmp variable segment_write instead of // member variable diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 910cd92270..daca1f71fc 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -27,6 +27,7 @@ #include "vec/aggregate_functions/aggregate_function.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" +#include "vec/aggregate_functions/block_aggregator.h" namespace doris { @@ -48,11 +49,17 @@ public: int64_t tablet_id() const { return _tablet_id; } size_t memory_usage() const { return _mem_tracker->consumption(); } + std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; } - inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); } - // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows); + inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }; + //insert tuple from (row_pos) to (row_pos+num_rows) + bool insert(const vectorized::Block* block, size_t row_pos, size_t num_rows); + + bool is_full() const; + size_t bytes_allocated() const; + + void finalize(); /// Flush Status flush(); @@ -146,6 +153,14 @@ private: // for vectorized void _insert_one_row_from_block(RowInBlock* row_in_block); void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist); + void _sort(const bool finalize); + void _sort_block_by_rows(); + + void _merge(); + + void _agg(const bool finalize); + + void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst); int64_t _tablet_id; Schema* _schema; @@ -195,13 +210,35 @@ private: //for vectorized vectorized::MutableBlock _input_mutable_block; vectorized::MutableBlock _output_mutable_block; + + struct OrderedIndexItem { + uint32_t index_in_block; + uint32_t incoming_index; // used for sort by column + }; + + using OrderedIndex = std::vector<OrderedIndexItem>; + + OrderedIndex _index_for_sort; + + std::vector<int> _sorted_index_in_block; + + vectorized::MutableBlockPtr _block; + + vectorized::MutableBlockPtr _sorted_block; + + std::unique_ptr<vectorized::BlockAggregator> _block_aggregator; + vectorized::Block _collect_vskiplist_results(); + bool _is_first_insertion; void _init_agg_functions(const vectorized::Block* block); std::vector<vectorized::AggregateFunctionPtr> _agg_functions; std::vector<RowInBlock*> _row_in_blocks; size_t _mem_usage; + size_t _block_bytes_usage = 0; + size_t _agg_bytes_usage = 0; + int _merge_count = 0; }; // class MemTable inline std::ostream& operator<<(std::ostream& os, const MemTable& table) { diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 265d6fd884..ca768bbe3d 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -39,6 +39,7 @@ set(VEC_FILES aggregate_functions/aggregate_function_group_concat.cpp aggregate_functions/aggregate_function_percentile_approx.cpp aggregate_functions/aggregate_function_simple_factory.cpp + aggregate_functions/block_aggregator.cpp columns/collator.cpp columns/column.cpp columns/column_array.cpp diff --git a/be/src/vec/aggregate_functions/block_aggregator.cpp b/be/src/vec/aggregate_functions/block_aggregator.cpp new file mode 100644 index 0000000000..201cca75e0 --- /dev/null +++ b/be/src/vec/aggregate_functions/block_aggregator.cpp @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "block_aggregator.h" + +namespace doris::vectorized { + +BlockAggregator::BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted) + : _schema(schema), _tablet_schema(tablet_schema), _src_sorted(src_sorted) { + _init_agg_functions(); +} + +BlockAggregator::~BlockAggregator() { +} + +void BlockAggregator::_init_agg_functions() { + _cols_num = _schema->num_columns(); + _key_cols_num = _schema->num_key_columns(); + _value_cols_num = _cols_num - _key_cols_num; + //TODO(weixiang): save memory just use value length. + _agg_functions.resize(_schema->num_columns()); + _agg_places.resize(_value_cols_num); + for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) { + FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation(); + std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_LOAD_SUFFIX; + + std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), + [](unsigned char c) { return std::tolower(c); }); + + // create aggregate function + DataTypes argument_types; + // TODO(weixiang): 检查这块这么写是否有隐患 + DataTypePtr dtptr = Schema::get_data_type_ptr(*_schema->column(cid)); + argument_types.push_back(dtptr); + Array params; + AggregateFunctionPtr function = + AggregateFunctionSimpleFactory::instance().get( + agg_name, argument_types, params, dtptr->is_nullable()); + + DCHECK(function != nullptr); + _agg_functions[cid] = function; + } +} + + +void BlockAggregator::append_block(Block* block) { + if (block == nullptr || block->rows() <= 0){ + return; + } + _agg_data_counters.reserve(_agg_data_counters.size() + block->rows()); + size_t key_num = _schema->num_key_columns(); + + size_t same_rows = 1; + for (size_t i = 0; i < block->rows(); i++) { + if ( i+1 == block->rows() || block->compare_at(i, i+1, key_num, *block, -1) != 0) { + _agg_data_counters.push_back(same_rows); + same_rows = 0; + } + same_rows++; + } + if (_is_first_append) { + // this means it is appending block for the first time + _aggregated_block = std::make_shared<MutableBlock>(block); + _is_first_append = false; + } else { + _aggregated_block->add_rows(block, 0, block->rows()); + } +} + +/** + * @brief aggregate sorted block + * 1. _agg_data_counters save the following N rows to agg in partial sort block + * 2. first_row_idx records the first row num of rows with the same keys. + * + * + * TODO(weixiang): + * 1. refactor function partial_sort_merged_aggregate, 拆成多个函数:init等 + */ + +void BlockAggregator::partial_sort_merged_aggregate() { + DCHECK(!_agg_data_counters.empty()); + std::vector<int> first_row_idx; // TODO(weixiang): add into member variables + std::vector<MutableColumnPtr> aggregated_cols; + first_row_idx.reserve(_agg_data_counters.size()); + int row_pos = _cumulative_agg_num; + for (size_t i = 0; i < _agg_data_counters.size(); i++) { + first_row_idx.push_back(row_pos); + row_pos += _agg_data_counters[i]; + } + auto col_ids = _schema->column_ids(); + size_t agged_row_num = first_row_idx.size(); + // for keys: + for (size_t cid = 0; cid < _key_cols_num; cid++) { + + MutableColumnPtr key_col = + _schema->get_data_type_ptr(*_schema->column(col_ids[cid]))->create_column(); + key_col->insert_indices_from(*_aggregated_block->mutable_columns()[cid], + first_row_idx.data(), + first_row_idx.data() + agged_row_num); + aggregated_cols.emplace_back(std::move(key_col)); + } + + // init agged place for values: + for (size_t cid = _key_cols_num; cid < _cols_num; cid++) { + size_t place_size = _agg_functions[cid]->size_of_data(); + _agg_places[cid - _key_cols_num] = new char[place_size * agged_row_num]; + for (auto i = 0; i < agged_row_num; i++) { + AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i; + _agg_functions[cid]->create(place); + } + + } + + // do agg + for (size_t cid = _key_cols_num; cid < _cols_num; cid++) { + size_t place_size = _agg_functions[cid]->size_of_data(); + auto* src_value_col_ptr = _aggregated_block->mutable_columns()[cid].get(); + size_t agg_begin_idx = 0; + + for (size_t i = 0; i < agged_row_num; i++) { + AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i; + _agg_functions[cid]->add_batch_range( + agg_begin_idx, + agg_begin_idx + _agg_data_counters[i] - 1, place, + const_cast<const doris::vectorized::IColumn**>(&src_value_col_ptr), nullptr); + agg_begin_idx += _agg_data_counters[i]; + } + } + + // move to result column + for (size_t value_col_idx = 0; value_col_idx < _value_cols_num; value_col_idx++) { + size_t place_size = _agg_functions[value_col_idx + _key_cols_num]->size_of_data(); + MutableColumnPtr dst_value_col_ptr = + _schema->get_data_type_ptr(*_schema->column(col_ids[value_col_idx + _key_cols_num])) + ->create_column(); + for (size_t i = 0; i < first_row_idx.size(); i++) { + _agg_functions[value_col_idx + _key_cols_num]->insert_result_into( + _agg_places[value_col_idx] + i * place_size, + *reinterpret_cast<doris::vectorized::IColumn*>(dst_value_col_ptr.get())); + } + aggregated_cols.emplace_back(std::move(dst_value_col_ptr)); + } + + _aggregated_block->clear_column_data(); + _aggregated_block->append_from_columns(aggregated_cols, agged_row_num); + _agg_data_counters.clear(); + _cumulative_agg_num += agged_row_num; + + for(auto place : _agg_places) { + // free aggregated memory + delete[] place; + } + + +} + + + +size_t BlockAggregator::get_bytes_usage() const{ + if(UNLIKELY(_aggregated_block == nullptr)) { + return 0; + } + return _aggregated_block->allocated_bytes(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/block_aggregator.h b/be/src/vec/aggregate_functions/block_aggregator.h new file mode 100644 index 0000000000..510e614dd2 --- /dev/null +++ b/be/src/vec/aggregate_functions/block_aggregator.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#pragma once +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_reader.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/core/block.h" +#include "olap/schema.h" + +namespace doris::vectorized { + +using BlockPtr = std::shared_ptr<Block>; +using MutableBlockPtr = std::shared_ptr<MutableBlock>; + +class BlockAggregator { + + +public: + BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted); + ~BlockAggregator(); + void append_block(Block* block); + void partial_sort_merged_aggregate(); + void _init_agg_functions(); + size_t get_bytes_usage() const; + + MutableBlockPtr get_partial_agged_block() { + return _aggregated_block; + } + + void reset_aggregator() { + _aggregated_block.reset(); + _agg_data_counters.clear(); + _cumulative_agg_num = 0; + _is_first_append = true; + } + +private: + bool _is_first_append = true; + size_t _key_cols_num; + size_t _value_cols_num; + size_t _cumulative_agg_num = 0; + size_t _cols_num; + const Schema* _schema; + const TabletSchema* _tablet_schema; + bool _src_sorted; + MutableBlockPtr _aggregated_block; + std::vector<int> _agg_data_counters; + std::vector<AggregateFunctionPtr> _agg_functions; + + std::vector<AggregateDataPtr> _agg_places; + +}; + +} // namespace diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index aa482fcfbf..8efc0e0e1b 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -897,6 +897,14 @@ Block MutableBlock::to_block(int start_column, int end_column) { return {columns_with_schema}; } +void MutableBlock::clear_column_data() noexcept { + for (auto& col : _columns) { + if (col) { + col->clear(); + } + } +} + std::string MutableBlock::dump_data(size_t row_limit) const { std::vector<std::string> headers; std::vector<size_t> headers_size; @@ -956,6 +964,18 @@ std::unique_ptr<Block> Block::create_same_struct_block(size_t size) const { return temp_block; } +//TODO(weixiang): unique_ptr? +std::shared_ptr<MutableBlock> MutableBlock::create_same_struct_block(size_t size) const { + Block temp_block; + for (const auto& d : _data_types) { + auto column = d->create_column(); + column->resize(size); + temp_block.insert({std::move(column), d, ""}); + } + auto result = std::make_shared<MutableBlock>(std::move(temp_block)); + return result; +} + void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) { for (auto idx : char_type_idx) { if (idx < data.size()) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 729f531291..7a6f61deb9 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -352,12 +352,25 @@ public: size_t rows() const; size_t columns() const { return _columns.size(); } + + std::shared_ptr<MutableBlock> create_same_struct_block(size_t size) const; + + void clear_column_data() noexcept; + bool empty() const { return rows() == 0; } MutableColumns& mutable_columns() { return _columns; } void set_muatable_columns(MutableColumns&& columns) { _columns = std::move(columns); } + void append_from_columns(MutableColumns& columns, size_t length) { + DCHECK(_columns.size() == columns.size()); + for (size_t i = 0; i < _columns.size(); i++) { + DCHECK(columns[i]->size() >= length); + _columns[i]->insert_range_from(*columns[i], 0, length); + } + } + DataTypes& data_types() { return _data_types; } MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
