This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 7199102d7c [Opt][VecLoad] Opt the vec stream load performance (#9772) 7199102d7c is described below commit 7199102d7cf421d245245229a8cb003a6089ec55 Author: HappenLee <happen...@hotmail.com> AuthorDate: Tue May 31 11:53:32 2022 +0800 [Opt][VecLoad] Opt the vec stream load performance (#9772) Co-authored-by: lihaopeng <lihaop...@baidu.com> --- be/src/exec/text_converter.h | 4 ++++ be/src/exec/text_converter.hpp | 18 ++++++++++++++++++ be/src/olap/delta_writer.cpp | 13 +------------ be/src/olap/memtable.cpp | 13 +++++++++---- be/src/olap/memtable.h | 2 +- be/src/vec/exec/vbroker_scanner.cpp | 18 +++--------------- be/src/vec/exec/vbroker_scanner.h | 3 --- 7 files changed, 36 insertions(+), 35 deletions(-) diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h index 7d957234a7..0f0f871e19 100644 --- a/be/src/exec/text_converter.h +++ b/be/src/exec/text_converter.h @@ -48,6 +48,10 @@ public: bool write_slot(const SlotDescriptor* slot_desc, Tuple* tuple, const char* data, int len, bool copy_string, bool need_escape, MemPool* pool); + void write_string_column(const SlotDescriptor* slot_desc, + vectorized::MutableColumnPtr* column_ptr, const char* data, + size_t len); + bool write_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len, bool copy_string, bool need_escape); diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp index 14667c35c0..0d4c5af788 100644 --- a/be/src/exec/text_converter.hpp +++ b/be/src/exec/text_converter.hpp @@ -166,6 +166,24 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc, Tuple* tu return true; } +inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc, + vectorized::MutableColumnPtr* column_ptr, + const char* data, size_t len) { + vectorized::IColumn* col_ptr = column_ptr->get(); + // \N means it's NULL + if (LIKELY(slot_desc->is_nullable())) { + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get()); + if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) { + nullable_column->insert_data(nullptr, 0); + return; + } else { + nullable_column->get_null_map_data().push_back(0); + col_ptr = &nullable_column->get_nested_column(); + } + } + reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data, len); +} + inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len, bool copy_string, bool need_escape) { diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index fe0ffeddbb..132a79a390 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -190,18 +190,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - int start = 0, end = 0; - const size_t num_rows = row_idxs.size(); - for (; start < num_rows;) { - auto count = end + 1 - start; - if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) { - _mem_table->insert(block, row_idxs[start], count); - start += count; - end = start; - } else { - end++; - } - } + _mem_table->insert(block, row_idxs); if (_mem_table->need_to_agg()) { _mem_table->shrink_memtable_by_agg(); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 77b0313004..08c92c3404 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -114,7 +114,7 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, *_pblock, -1); } -void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) { +void MemTable::insert(const vectorized::Block* block, const std::vector<int>& row_idxs) { if (_is_first_insertion) { _is_first_insertion = false; auto cloneBlock = block->clone_without_columns(); @@ -125,8 +125,9 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num _init_agg_functions(block); } } + auto num_rows = row_idxs.size(); size_t cursor_in_mutableblock = _input_mutable_block.rows(); - _input_mutable_block.add_rows(block, row_pos, num_rows); + _input_mutable_block.add_rows(block, row_idxs.data(), row_idxs.data() + num_rows); size_t input_size = block->allocated_bytes() * num_rows / block->rows(); _mem_usage += input_size; _mem_tracker->consume(input_size); @@ -245,11 +246,15 @@ template <bool is_final> void MemTable::_collect_vskiplist_results() { VecTable::Iterator it(_vec_skip_list.get()); vectorized::Block in_block = _input_mutable_block.to_block(); - // TODO: should try to insert data by column, not by row. to opt the code if (_keys_type == KeysType::DUP_KEYS) { + std::vector<int> row_pos_vec; + DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); + row_pos_vec.reserve(in_block.rows()); for (it.SeekToFirst(); it.Valid(); it.Next()) { - _output_mutable_block.add_row(&in_block, it.key()->_row_pos); + row_pos_vec.emplace_back(it.key()->_row_pos); } + _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), + row_pos_vec.data() + in_block.rows()); } else { size_t idx = 0; for (it.SeekToFirst(); it.Valid(); it.Next()) { diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index c73b39b39d..c594f77679 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -52,7 +52,7 @@ public: inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); } // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows); + void insert(const vectorized::Block* block, const std::vector<int>& row_idxs); void shrink_memtable_by_agg(); diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index 3006dba788..41d0dcac1d 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -20,11 +20,11 @@ #include <fmt/format.h> #include <iostream> -#include <sstream> #include "exec/exec_node.h" #include "exec/plain_text_line_reader.h" #include "exec/text_converter.h" +#include "exec/text_converter.hpp" #include "exprs/expr_context.h" #include "util/utf8_check.h" @@ -111,22 +111,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line, continue; } - RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc, - &columns[dest_index], _state)); + _text_converter->write_string_column(src_slot_desc, &columns[dest_index], value.data, + value.size); } return Status::OK(); } - -Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot, - vectorized::MutableColumnPtr* column_ptr, - RuntimeState* state) { - if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) { - std::stringstream ss; - ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`" - << slot->col_name() + "`"; - return Status::InternalError(ss.str()); - } - return Status::OK(); -} } // namespace doris::vectorized diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h index 11d8b494fa..cbd00f859a 100644 --- a/be/src/vec/exec/vbroker_scanner.h +++ b/be/src/vec/exec/vbroker_scanner.h @@ -39,9 +39,6 @@ public: private: std::unique_ptr<TextConverter> _text_converter; - Status _write_text_column(char* value, int length, SlotDescriptor* slot, - MutableColumnPtr* column_ptr, RuntimeState* state); - Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns); }; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org