This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch vectorized in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit f45d2a2144179bcc64b807a68f4a6befdc796383 Author: zuochunwei <[email protected]> AuthorDate: Mon Jan 17 17:10:24 2022 +0800 [Vectorized](improving) (exec) optimize VDataStreamSender's send() performance #7747 (#7751) --- be/src/vec/columns/column.h | 4 ++ be/src/vec/columns/column_complex.h | 10 ++++- be/src/vec/columns/column_const.h | 4 ++ be/src/vec/columns/column_decimal.h | 10 +++++ be/src/vec/columns/column_dummy.h | 4 ++ be/src/vec/columns/column_nullable.cpp | 6 +++ be/src/vec/columns/column_nullable.h | 1 + be/src/vec/columns/column_string.cpp | 6 +++ be/src/vec/columns/column_string.h | 2 + be/src/vec/columns/column_vector.cpp | 10 +++++ be/src/vec/columns/column_vector.h | 2 + be/src/vec/columns/predicate_column.h | 6 ++- be/src/vec/core/block.cpp | 13 +++++- be/src/vec/core/block.h | 2 + be/src/vec/sink/vdata_stream_sender.cpp | 79 ++++++++++++++++++++++----------- be/src/vec/sink/vdata_stream_sender.h | 36 ++++++++++++++- 16 files changed, 164 insertions(+), 31 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index a869a65..d58979d 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -160,6 +160,10 @@ public: virtual void insert_many_from(const IColumn& src, size_t position, size_t length) { for (size_t i = 0; i < length; ++i) insert_from(src, position); } + + /// Appends a batch elements from other column with the same type + /// indices_begin + indices_end represent the row indices of column src + virtual void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) = 0; /// Appends data located in specified memory chunk if it is possible (throws an exception if it cannot be implemented). /// Is used to optimize some computations (in aggregation, for example). diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 296f94b..18794d3 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -127,6 +127,14 @@ public: data.insert(data.end(), st, ed); } + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override { + const Self& src_vec = assert_cast<const Self&>(src); + data.reserve(size() + (indices_end - indices_begin)); + for (auto x = indices_begin; x != indices_end; ++x) { + data.push_back(src_vec.get_element(*x)); + } + } + void pop_back(size_t n) { data.erase(data.end() - n, data.end()); } // it's impossable to use ComplexType as key , so we don't have to implemnt them [[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena, @@ -286,4 +294,4 @@ ColumnPtr ColumnComplexType<T>::replicate(const IColumn::Offsets& offsets) const } using ColumnBitmap = ColumnComplexType<BitmapValue>; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 703e226..e019c56 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -84,6 +84,10 @@ public: s += length; } + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override { + s += (indices_end - indices_begin); + } + void insert(const Field&) override { ++s; } void insert_data(const char*, size_t) override { ++s; } diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 46412cb..67f4fa9 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -26,6 +26,7 @@ #include "vec/columns/column_impl.h" #include "vec/columns/column_vector_helper.h" #include "vec/common/typeid_cast.h" +#include "vec/common/assert_cast.h" #include "vec/core/field.h" namespace doris::vectorized { @@ -95,6 +96,15 @@ public: void insert_from(const IColumn& src, size_t n) override { data.push_back(static_cast<const Self&>(src).get_data()[n]); } + + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override { + const Self& src_vec = assert_cast<const Self&>(src); + data.reserve(size() + (indices_end - indices_begin)); + for (auto x = indices_begin; x != indices_end; ++x) { + data.push_back_without_reserve(src_vec.get_element(*x)); + } + } + void insert_data(const char* pos, size_t /*length*/) override; void insert_default() override { data.push_back(T()); } void insert(const Field& x) override { diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index 4531058..cac3a06 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -80,6 +80,10 @@ public: s += length; } + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override { + s += (indices_end - indices_begin); + } + ColumnPtr filter(const Filter& filt, ssize_t /*result_size_hint*/) const override { return clone_dummy(count_bytes_in_filter(filt)); } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index bf4bb44..ae3a2fd 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -131,6 +131,12 @@ void ColumnNullable::insert_range_from(const IColumn& src, size_t start, size_t get_nested_column().insert_range_from(*nullable_col.nested_column, start, length); } +void ColumnNullable::insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) { + const ColumnNullable& src_concrete = assert_cast<const ColumnNullable&>(src); + get_nested_column().insert_indices_from(src_concrete.get_nested_column(), indices_begin, indices_end); + get_null_map_column().insert_indices_from(src_concrete.get_null_map_column(), indices_begin, indices_end); +} + void ColumnNullable::insert(const Field& x) { if (x.is_null()) { get_nested_column().insert_default(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index fb6aa8f..8163149 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -83,6 +83,7 @@ public: StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; void insert(const Field& x) override; void insert_from(const IColumn& src, size_t n) override; diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 05a702b..f4a32dc 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -93,6 +93,12 @@ void ColumnString::insert_range_from(const IColumn& src, size_t start, size_t le } } +void ColumnString::insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) { + for (auto x = indices_begin; x != indices_end; ++x) { + ColumnString::insert_from(src, *x); + } +} + ColumnPtr ColumnString::filter(const Filter& filt, ssize_t result_size_hint) const { if (offsets.size() == 0) return ColumnString::create(); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 44a6608..2234490 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -187,6 +187,8 @@ public: void insert_range_from(const IColumn& src, size_t start, size_t length) override; + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; + ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; ColumnPtr permute(const Permutation& perm, size_t limit) const override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 017ae29..75ff144 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -29,6 +29,7 @@ #include "runtime/datetime_value.h" #include "vec/columns/columns_common.h" #include "vec/common/arena.h" +#include "vec/common/assert_cast.h" #include "vec/common/bit_cast.h" #include "vec/common/exception.h" #include "vec/common/nan_utils.h" @@ -218,6 +219,15 @@ void ColumnVector<T>::insert_range_from(const IColumn& src, size_t start, size_t } template <typename T> +void ColumnVector<T>::insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) { + const Self& src_vec = assert_cast<const Self&>(src); + data.reserve(size() + (indices_end - indices_begin)); + for (auto x = indices_begin; x != indices_end; ++x) { + data.push_back_without_reserve(src_vec.get_element(*x)); + } +} + +template <typename T> ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t result_size_hint) const { size_t size = data.size(); if (size != filt.size()) { diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 6626229..2c5647e 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -200,6 +200,8 @@ public: void insert_range_from(const IColumn& src, size_t start, size_t length) override; + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; + ColumnPtr filter(const IColumn::Filter& filt, ssize_t result_size_hint) const override; // note(wb) this method is only used in storage layer now diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h index 8095cff..a23c550 100644 --- a/be/src/vec/columns/predicate_column.h +++ b/be/src/vec/columns/predicate_column.h @@ -148,6 +148,10 @@ public: LOG(FATAL) << "insert_range_from not supported in PredicateColumnType"; } + void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override { + LOG(FATAL) << "insert_indices_from not supported in PredicateColumnType"; + } + void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported in PredicateColumnType"; } @@ -440,4 +444,4 @@ private: }; using ColumnStringValue = PredicateColumnType<StringValue>; -} // namespace \ No newline at end of file +} // namespace diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index d200a46..3664a2a 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -837,9 +837,18 @@ size_t MutableBlock::rows() const { } void MutableBlock::add_row(const Block* block, int row) { - auto& src_columns_with_schema = block->get_columns_with_type_and_name(); + auto& block_data = block->get_columns_with_type_and_name(); for (size_t i = 0; i < _columns.size(); ++i) { - _columns[i]->insert_from(*src_columns_with_schema[i].column.get(), row); + _columns[i]->insert_from(*block_data[i].column.get(), row); + } +} + +void MutableBlock::add_rows(const Block* block, const int* row_begin, const int* row_end) { + auto& block_data = block->get_columns_with_type_and_name(); + for (size_t i = 0; i < _columns.size(); ++i) { + auto& dst = _columns[i]; + auto& src = *block_data[i].column.get(); + dst->insert_indices_from(src, row_begin, row_end); } } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 1c435ee..a39baa9 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -339,6 +339,8 @@ public: Block to_block(int start_column, int end_column); void add_row(const Block* block, int row); + void add_rows(const Block* block, const int* row_begin, const int* row_end); + std::string dump_data(size_t row_limit = 100) const; void clear() { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 0cc5c66..d953c9d 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -169,6 +169,42 @@ Status VDataStreamSender::Channel::add_row(Block* block, int row) { return Status::OK(); } +Status VDataStreamSender::Channel::add_rows(Block* block, const std::vector<int>& rows) { + if (_fragment_instance_id.lo == -1) { + return Status::OK(); + } + + if (_mutable_block.get() == nullptr) { + auto empty_block = block->clone_empty(); + _mutable_block.reset( + new MutableBlock(empty_block.mutate_columns(), empty_block.get_data_types())); + } + + int row_wait_add = rows.size(); + int batch_size = _parent->state()->batch_size(); + const int* begin = &rows[0]; + + while (row_wait_add > 0) { + int row_add, max_add = batch_size - _mutable_block->rows(); + if (row_wait_add >= max_add) { + row_add = max_add; + } else { + row_add = row_wait_add; + } + + _mutable_block->add_rows(block, begin, begin + row_add); + + row_wait_add -= row_add; + begin += row_add; + + if (row_add == max_add) { + RETURN_IF_ERROR(send_current_block()); + } + } + + return Status::OK(); +} + Status VDataStreamSender::Channel::close_wait(RuntimeState* state) { if (_need_close) { Status st = _wait_last_brpc(); @@ -394,51 +430,49 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { int num_channels = _channels.size(); // will only copy schema // we don't want send temp columns - auto send_block = *block; - std::vector<int> result(_partition_expr_ctxs.size()); - int counter = 0; + int result_size = _partition_expr_ctxs.size(); + int result[result_size]; + RETURN_IF_ERROR(get_partition_column_result(block, result)); - for (auto ctx : _partition_expr_ctxs) { - RETURN_IF_ERROR(ctx->execute(block, &result[counter++])); - } // vectorized caculate hash int rows = block->rows(); // for each row, we have a siphash val std::vector<SipHash> siphashs(rows); // result[j] means column index, i means rows index - for (int j = 0; j < result.size(); ++j) { + for (int j = 0; j < result_size; ++j) { auto column = block->get_by_position(result[j]).column; for (int i = 0; i < rows; ++i) { column->update_hash_with_value(i, siphashs[i]); } } + // channel2rows' subscript means channel id + std::vector<vectorized::UInt64> hash_vals(rows); for (int i = 0; i < rows; i++) { - auto target_channel_id = siphashs[i].get64() % num_channels; - RETURN_IF_ERROR(_channels[target_channel_id]->add_row(&send_block, i)); + hash_vals[i] = siphashs[i].get64(); } + RETURN_IF_ERROR(channel_add_rows(_channels, num_channels, hash_vals, rows, block)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { // 1. caculate hash // 2. dispatch rows to channel int num_channels = _channel_shared_ptrs.size(); - auto send_block = *block; - std::vector<int> result(_partition_expr_ctxs.size()); - int counter = 0; - for (auto ctx : _partition_expr_ctxs) { - RETURN_IF_ERROR(ctx->execute(block, &result[counter++])); - } + + int result_size = _partition_expr_ctxs.size(); + int result[result_size]; + RETURN_IF_ERROR(get_partition_column_result(block, result)); + // vectorized caculate hash val int rows = block->rows(); // for each row, we have a hash_val std::vector<size_t> hash_vals(rows); // result[j] means column index, i means rows index - for (int j = 0; j < result.size(); ++j) { + for (int j = 0; j < result_size; ++j) { + auto& column = block->get_by_position(result[j]).column; for (int i = 0; i < rows; ++i) { - auto val = block->get_by_position(result[j]).column->get_data_at(i); - + auto val = column->get_data_at(i); if (val.data == nullptr) { // nullptr is treat as 0 when hash static const int INT_VALUE = 0; @@ -446,17 +480,12 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { hash_vals[i] = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, hash_vals[i]); } else { hash_vals[i] = RawValue::zlib_crc32(val.data, val.size, - _partition_expr_ctxs[j]->root()->type(), - hash_vals[i]); + _partition_expr_ctxs[j]->root()->type(), hash_vals[i]); } } } - for (int i = 0; i < rows; i++) { - auto target_channel_id = hash_vals[i] % num_channels; - RETURN_IF_ERROR(_channel_shared_ptrs[target_channel_id]->add_row(&send_block, i)); - } - + RETURN_IF_ERROR(channel_add_rows(_channel_shared_ptrs, num_channels, hash_vals, rows, block)); } else { // Range partition // 1. caculate range diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index a15d39d..9d20987 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -72,6 +72,17 @@ public: private: class Channel; + Status get_partition_column_result(Block* block, int* result) const { + int counter = 0; + for (auto ctx : _partition_expr_ctxs) { + RETURN_IF_ERROR(ctx->execute(block, &result[counter++])); + } + return Status::OK(); + } + + template <typename Channels, typename HashVals> + Status channel_add_rows(Channels& channels, int num_channels, const HashVals& hash_vals, int rows, Block* block); + struct hash_128 { uint64_t high; uint64_t low; @@ -179,6 +190,7 @@ public: Status send_block(PBlock* block, bool eos = false); Status add_row(Block* block, int row); + Status add_rows(Block* block, const std::vector<int>& row); Status send_current_block(bool eos = false); @@ -203,9 +215,9 @@ public: return uid.to_string(); } - TUniqueId get_fragment_instance_id() { return _fragment_instance_id; } + TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; } - bool is_local() { return _is_local; } + bool is_local() const { return _is_local; } private: inline Status _wait_last_brpc() { @@ -223,6 +235,7 @@ private: return Status::OK(); } + private: // Serialize _batch into _thrift_batch and send via send_batch(). // Returns send_batch() status. @@ -261,5 +274,24 @@ private: size_t _capacity; bool _is_local; }; + +template <typename Channels, typename HashVals> +Status VDataStreamSender::channel_add_rows(Channels& channels, int num_channels, const HashVals& hash_vals, int rows, Block* block) { + std::vector<int> channel2rows[num_channels]; + + for (int i = 0; i < rows; i++) { + auto cid = hash_vals[i] % num_channels; + channel2rows[cid].emplace_back(i); + } + + for (int i = 0; i < num_channels; ++i) { + if (!channel2rows[i].empty()) { + RETURN_IF_ERROR(channels[i]->add_rows(block, channel2rows[i])); + } + } + + return Status::OK(); +} + } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
