This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch vectorized
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit f45d2a2144179bcc64b807a68f4a6befdc796383
Author: zuochunwei <[email protected]>
AuthorDate: Mon Jan 17 17:10:24 2022 +0800

    [Vectorized](improving) (exec) optimize VDataStreamSender's send() 
performance #7747 (#7751)
---
 be/src/vec/columns/column.h             |  4 ++
 be/src/vec/columns/column_complex.h     | 10 ++++-
 be/src/vec/columns/column_const.h       |  4 ++
 be/src/vec/columns/column_decimal.h     | 10 +++++
 be/src/vec/columns/column_dummy.h       |  4 ++
 be/src/vec/columns/column_nullable.cpp  |  6 +++
 be/src/vec/columns/column_nullable.h    |  1 +
 be/src/vec/columns/column_string.cpp    |  6 +++
 be/src/vec/columns/column_string.h      |  2 +
 be/src/vec/columns/column_vector.cpp    | 10 +++++
 be/src/vec/columns/column_vector.h      |  2 +
 be/src/vec/columns/predicate_column.h   |  6 ++-
 be/src/vec/core/block.cpp               | 13 +++++-
 be/src/vec/core/block.h                 |  2 +
 be/src/vec/sink/vdata_stream_sender.cpp | 79 ++++++++++++++++++++++-----------
 be/src/vec/sink/vdata_stream_sender.h   | 36 ++++++++++++++-
 16 files changed, 164 insertions(+), 31 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index a869a65..d58979d 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -160,6 +160,10 @@ public:
     virtual void insert_many_from(const IColumn& src, size_t position, size_t 
length) {
         for (size_t i = 0; i < length; ++i) insert_from(src, position);
     }
+ 
+    /// Appends a batch elements from other column with the same type
+    /// indices_begin + indices_end represent the row indices of column src
+    virtual void insert_indices_from(const IColumn& src, const int* 
indices_begin, const int* indices_end) = 0;
 
     /// Appends data located in specified memory chunk if it is possible 
(throws an exception if it cannot be implemented).
     /// Is used to optimize some computations (in aggregation, for example).
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index 296f94b..18794d3 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -127,6 +127,14 @@ public:
         data.insert(data.end(), st, ed);
     }
 
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override {
+        const Self& src_vec = assert_cast<const Self&>(src);
+        data.reserve(size() + (indices_end - indices_begin));
+        for (auto x = indices_begin; x != indices_end; ++x) {
+            data.push_back(src_vec.get_element(*x));
+        }
+    }
+
     void pop_back(size_t n) { data.erase(data.end() - n, data.end()); }
     // it's impossable to use ComplexType as key , so we don't have to 
implemnt them
     [[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
@@ -286,4 +294,4 @@ ColumnPtr ColumnComplexType<T>::replicate(const 
IColumn::Offsets& offsets) const
 }
 
 using ColumnBitmap = ColumnComplexType<BitmapValue>;
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 703e226..e019c56 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -84,6 +84,10 @@ public:
         s += length;
     }
 
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override {
+        s += (indices_end - indices_begin);
+    }
+
     void insert(const Field&) override { ++s; }
 
     void insert_data(const char*, size_t) override { ++s; }
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 46412cb..67f4fa9 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -26,6 +26,7 @@
 #include "vec/columns/column_impl.h"
 #include "vec/columns/column_vector_helper.h"
 #include "vec/common/typeid_cast.h"
+#include "vec/common/assert_cast.h"
 #include "vec/core/field.h"
 
 namespace doris::vectorized {
@@ -95,6 +96,15 @@ public:
     void insert_from(const IColumn& src, size_t n) override {
         data.push_back(static_cast<const Self&>(src).get_data()[n]);
     }
+
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override {
+        const Self& src_vec = assert_cast<const Self&>(src);
+        data.reserve(size() + (indices_end - indices_begin));
+        for (auto x = indices_begin; x != indices_end; ++x) {
+            data.push_back_without_reserve(src_vec.get_element(*x));
+        }
+    }
+
     void insert_data(const char* pos, size_t /*length*/) override;
     void insert_default() override { data.push_back(T()); }
     void insert(const Field& x) override {
diff --git a/be/src/vec/columns/column_dummy.h 
b/be/src/vec/columns/column_dummy.h
index 4531058..cac3a06 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -80,6 +80,10 @@ public:
         s += length;
     }
 
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override {
+        s += (indices_end - indices_begin);
+    }
+
     ColumnPtr filter(const Filter& filt, ssize_t /*result_size_hint*/) const 
override {
         return clone_dummy(count_bytes_in_filter(filt));
     }
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index bf4bb44..ae3a2fd 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -131,6 +131,12 @@ void ColumnNullable::insert_range_from(const IColumn& src, 
size_t start, size_t
     get_nested_column().insert_range_from(*nullable_col.nested_column, start, 
length);
 }
 
+void ColumnNullable::insert_indices_from(const IColumn& src, const int* 
indices_begin, const int* indices_end) {
+    const ColumnNullable& src_concrete = assert_cast<const 
ColumnNullable&>(src);
+    get_nested_column().insert_indices_from(src_concrete.get_nested_column(), 
indices_begin, indices_end);
+    
get_null_map_column().insert_indices_from(src_concrete.get_null_map_column(), 
indices_begin, indices_end);
+}
+
 void ColumnNullable::insert(const Field& x) {
     if (x.is_null()) {
         get_nested_column().insert_default();
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index fb6aa8f..8163149 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -83,6 +83,7 @@ public:
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override;
     void insert(const Field& x) override;
     void insert_from(const IColumn& src, size_t n) override;
 
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index 05a702b..f4a32dc 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -93,6 +93,12 @@ void ColumnString::insert_range_from(const IColumn& src, 
size_t start, size_t le
     }
 }
 
+void ColumnString::insert_indices_from(const IColumn& src, const int* 
indices_begin, const int* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        ColumnString::insert_from(src, *x);
+    }
+}
+
 ColumnPtr ColumnString::filter(const Filter& filt, ssize_t result_size_hint) 
const {
     if (offsets.size() == 0) return ColumnString::create();
 
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 44a6608..2234490 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -187,6 +187,8 @@ public:
 
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
 
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override;
+
     ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
 
     ColumnPtr permute(const Permutation& perm, size_t limit) const override;
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 017ae29..75ff144 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -29,6 +29,7 @@
 #include "runtime/datetime_value.h"
 #include "vec/columns/columns_common.h"
 #include "vec/common/arena.h"
+#include "vec/common/assert_cast.h"
 #include "vec/common/bit_cast.h"
 #include "vec/common/exception.h"
 #include "vec/common/nan_utils.h"
@@ -218,6 +219,15 @@ void ColumnVector<T>::insert_range_from(const IColumn& 
src, size_t start, size_t
 }
 
 template <typename T>
+void ColumnVector<T>::insert_indices_from(const IColumn& src, const int* 
indices_begin, const int* indices_end) {
+    const Self& src_vec = assert_cast<const Self&>(src);
+    data.reserve(size() + (indices_end - indices_begin));
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        data.push_back_without_reserve(src_vec.get_element(*x));
+    }
+}
+
+template <typename T>
 ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t 
result_size_hint) const {
     size_t size = data.size();
     if (size != filt.size()) {
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 6626229..2c5647e 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -200,6 +200,8 @@ public:
 
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
 
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override;
+
     ColumnPtr filter(const IColumn::Filter& filt, ssize_t result_size_hint) 
const override;
 
     // note(wb) this method is only used in storage layer now
diff --git a/be/src/vec/columns/predicate_column.h 
b/be/src/vec/columns/predicate_column.h
index 8095cff..a23c550 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -148,6 +148,10 @@ public:
          LOG(FATAL) << "insert_range_from not supported in 
PredicateColumnType";
     }
 
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override {
+         LOG(FATAL) << "insert_indices_from not supported in 
PredicateColumnType";
+    }
+
     void pop_back(size_t n) override {
         LOG(FATAL) << "pop_back not supported in PredicateColumnType";
     }
@@ -440,4 +444,4 @@ private:
 };
 using ColumnStringValue = PredicateColumnType<StringValue>;
 
-} // namespace
\ No newline at end of file
+} // namespace
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index d200a46..3664a2a 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -837,9 +837,18 @@ size_t MutableBlock::rows() const {
 }
 
 void MutableBlock::add_row(const Block* block, int row) {
-    auto& src_columns_with_schema = block->get_columns_with_type_and_name();
+    auto& block_data = block->get_columns_with_type_and_name();
     for (size_t i = 0; i < _columns.size(); ++i) {
-        _columns[i]->insert_from(*src_columns_with_schema[i].column.get(), 
row);
+        _columns[i]->insert_from(*block_data[i].column.get(), row);
+    }
+}
+
+void MutableBlock::add_rows(const Block* block, const int* row_begin, const 
int* row_end) {
+    auto& block_data = block->get_columns_with_type_and_name();
+    for (size_t i = 0; i < _columns.size(); ++i) {
+        auto& dst = _columns[i];
+        auto& src = *block_data[i].column.get();
+        dst->insert_indices_from(src, row_begin, row_end);
     }
 }
 
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 1c435ee..a39baa9 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -339,6 +339,8 @@ public:
     Block to_block(int start_column, int end_column);
 
     void add_row(const Block* block, int row);
+    void add_rows(const Block* block, const int* row_begin, const int* 
row_end);
+
     std::string dump_data(size_t row_limit = 100) const;
 
     void clear() {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 0cc5c66..d953c9d 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -169,6 +169,42 @@ Status VDataStreamSender::Channel::add_row(Block* block, 
int row) {
     return Status::OK();
 }
 
+Status VDataStreamSender::Channel::add_rows(Block* block, const 
std::vector<int>& rows) {
+    if (_fragment_instance_id.lo == -1) {
+        return Status::OK();
+    }
+
+    if (_mutable_block.get() == nullptr) {
+        auto empty_block = block->clone_empty();
+        _mutable_block.reset(
+                new MutableBlock(empty_block.mutate_columns(), 
empty_block.get_data_types()));
+    }
+
+    int row_wait_add = rows.size();
+    int batch_size = _parent->state()->batch_size();
+    const int* begin = &rows[0];
+
+    while (row_wait_add > 0) {
+        int row_add, max_add = batch_size - _mutable_block->rows();
+        if (row_wait_add >= max_add)  {
+            row_add = max_add;
+        } else {
+            row_add = row_wait_add;
+        }
+
+        _mutable_block->add_rows(block, begin, begin + row_add);
+
+        row_wait_add -= row_add;
+        begin += row_add;
+
+        if (row_add == max_add) {
+            RETURN_IF_ERROR(send_current_block());
+        }
+    }
+
+    return Status::OK();
+}
+
 Status VDataStreamSender::Channel::close_wait(RuntimeState* state) {
     if (_need_close) {
         Status st = _wait_last_brpc();
@@ -394,51 +430,49 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block) {
         int num_channels = _channels.size();
         // will only copy schema
         // we don't want send temp columns
-        auto send_block = *block;
 
-        std::vector<int> result(_partition_expr_ctxs.size());
-        int counter = 0;
+        int result_size = _partition_expr_ctxs.size();
+        int result[result_size];
+        RETURN_IF_ERROR(get_partition_column_result(block, result));
 
-        for (auto ctx : _partition_expr_ctxs) {
-            RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
-        }
         // vectorized caculate hash
         int rows = block->rows();
         // for each row, we have a siphash val
         std::vector<SipHash> siphashs(rows);
         // result[j] means column index, i means rows index
-        for (int j = 0; j < result.size(); ++j) {
+        for (int j = 0; j < result_size; ++j) {
             auto column = block->get_by_position(result[j]).column;
             for (int i = 0; i < rows; ++i) {
                 column->update_hash_with_value(i, siphashs[i]);
             }
         }
 
+        // channel2rows' subscript means channel id 
+        std::vector<vectorized::UInt64> hash_vals(rows);
         for (int i = 0; i < rows; i++) {
-            auto target_channel_id = siphashs[i].get64() % num_channels;
-            RETURN_IF_ERROR(_channels[target_channel_id]->add_row(&send_block, 
i));
+            hash_vals[i] = siphashs[i].get64();
         }
 
+        RETURN_IF_ERROR(channel_add_rows(_channels, num_channels, hash_vals, 
rows, block));
     } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         // 1. caculate hash
         // 2. dispatch rows to channel
         int num_channels = _channel_shared_ptrs.size();
-        auto send_block = *block;
-        std::vector<int> result(_partition_expr_ctxs.size());
-        int counter = 0;
-        for (auto ctx : _partition_expr_ctxs) {
-            RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
-        }
+
+        int result_size = _partition_expr_ctxs.size();
+        int result[result_size];
+        RETURN_IF_ERROR(get_partition_column_result(block, result));
+
         // vectorized caculate hash val
         int rows = block->rows();
         // for each row, we have a hash_val
         std::vector<size_t> hash_vals(rows);
 
         // result[j] means column index, i means rows index
-        for (int j = 0; j < result.size(); ++j) {
+        for (int j = 0; j < result_size; ++j) {
+            auto& column = block->get_by_position(result[j]).column;
             for (int i = 0; i < rows; ++i) {
-                auto val = 
block->get_by_position(result[j]).column->get_data_at(i);
-
+                auto val = column->get_data_at(i);
                 if (val.data == nullptr) {
                     // nullptr is treat as 0 when hash
                     static const int INT_VALUE = 0;
@@ -446,17 +480,12 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block) {
                     hash_vals[i] = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, 
hash_vals[i]);
                 } else {
                     hash_vals[i] = RawValue::zlib_crc32(val.data, val.size,
-                                                        
_partition_expr_ctxs[j]->root()->type(),
-                                                        hash_vals[i]);
+                                                
_partition_expr_ctxs[j]->root()->type(), hash_vals[i]);
                 }
             }
         }
 
-        for (int i = 0; i < rows; i++) {
-            auto target_channel_id = hash_vals[i] % num_channels;
-            
RETURN_IF_ERROR(_channel_shared_ptrs[target_channel_id]->add_row(&send_block, 
i));
-        }
-
+        RETURN_IF_ERROR(channel_add_rows(_channel_shared_ptrs, num_channels, 
hash_vals, rows, block));
     } else {
         // Range partition
         // 1. caculate range
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index a15d39d..9d20987 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -72,6 +72,17 @@ public:
 private:
     class Channel;
 
+    Status get_partition_column_result(Block* block, int* result) const {
+        int counter = 0;
+        for (auto ctx : _partition_expr_ctxs) {
+            RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
+        }
+        return Status::OK();
+    }
+
+    template <typename Channels, typename HashVals>
+    Status channel_add_rows(Channels& channels, int num_channels, const 
HashVals& hash_vals, int rows, Block* block);
+
     struct hash_128 {
         uint64_t high;
         uint64_t low;
@@ -179,6 +190,7 @@ public:
     Status send_block(PBlock* block, bool eos = false);
 
     Status add_row(Block* block, int row);
+    Status add_rows(Block* block, const std::vector<int>& row);
 
     Status send_current_block(bool eos = false);
 
@@ -203,9 +215,9 @@ public:
         return uid.to_string();
     }
 
-    TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
+    TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; 
}
 
-    bool is_local() { return _is_local; }
+    bool is_local() const { return _is_local; }
 
 private:
     inline Status _wait_last_brpc() {
@@ -223,6 +235,7 @@ private:
         return Status::OK();
     }
 
+
 private:
     // Serialize _batch into _thrift_batch and send via send_batch().
     // Returns send_batch() status.
@@ -261,5 +274,24 @@ private:
     size_t _capacity;
     bool _is_local;
 };
+
+template <typename Channels, typename HashVals>
+Status VDataStreamSender::channel_add_rows(Channels& channels, int 
num_channels, const HashVals& hash_vals, int rows, Block* block) {
+    std::vector<int> channel2rows[num_channels];
+
+    for (int i = 0; i < rows; i++) {
+        auto cid = hash_vals[i] % num_channels;
+        channel2rows[cid].emplace_back(i);
+    }
+
+    for (int i = 0; i < num_channels; ++i) {
+        if (!channel2rows[i].empty()) {
+            RETURN_IF_ERROR(channels[i]->add_rows(block, channel2rows[i]));
+        }
+    }
+
+    return Status::OK();
+}
+
 } // namespace vectorized
 } // namespace doris

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to