HappenLee commented on a change in pull request #7751:
URL: https://github.com/apache/incubator-doris/pull/7751#discussion_r785614606



##########
File path: be/src/vec/columns/column_decimal.h
##########
@@ -95,6 +95,13 @@ class ColumnDecimal final : public 
COWHelper<ColumnVectorHelper, ColumnDecimal<T
     void insert_from(const IColumn& src, size_t n) override {
         data.push_back(static_cast<const Self&>(src).get_data()[n]);
     }
+
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override {
+        for (auto x = indices_begin; x != indices_end; ++x) {
+            Self::insert_from(src, *x);

Review comment:
       same to the column_vector

##########
File path: be/src/vec/columns/column_vector.cpp
##########
@@ -221,6 +222,15 @@ void ColumnVector<T>::insert_range_from(const IColumn& 
src, size_t start, size_t
     memcpy(data.data() + old_size, &src_vec.data[start], length * 
sizeof(data[0]));
 }
 
+template <typename T>
+void ColumnVector<T>::insert_indices_from(const IColumn& src, const int* 
indices_begin, const int* indices_end) {
+    const Self& src_vec = assert_cast<const Self&>(src);

Review comment:
       use `push_back_without_reserve`

##########
File path: be/src/vec/columns/column_nullable.cpp
##########
@@ -131,6 +131,12 @@ void ColumnNullable::insert_range_from(const IColumn& src, 
size_t start, size_t
     get_nested_column().insert_range_from(*nullable_col.nested_column, start, 
length);
 }
 
+void ColumnNullable::insert_indices_from(const IColumn& src, const int* 
indices_begin, const int* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {

Review comment:
       which will cause to many virtural function call, refactor it

##########
File path: be/src/vec/sink/vdata_stream_sender.h
##########
@@ -261,5 +274,24 @@ class VDataStreamSender::Channel {
     size_t _capacity;
     bool _is_local;
 };
+
+template <typename Channels, typename HashVals>
+Status VDataStreamSender::channel_add_rows(Channels& channels, int 
num_channels, const HashVals& hash_vals, int rows, Block* block) {
+    std::vector<int> channel2rows[num_channels];

Review comment:
       same to the up,  we should reuse the mem

##########
File path: be/src/vec/sink/vdata_stream_sender.cpp
##########
@@ -394,69 +430,62 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block) {
         int num_channels = _channels.size();
         // will only copy schema
         // we don't want send temp columns
-        auto send_block = *block;
 
-        std::vector<int> result(_partition_expr_ctxs.size());
-        int counter = 0;
+        int result_size = _partition_expr_ctxs.size();
+        int result[result_size];
+        RETURN_IF_ERROR(get_partition_column_result(block, result));
 
-        for (auto ctx : _partition_expr_ctxs) {
-            RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
-        }
         // vectorized caculate hash
         int rows = block->rows();
         // for each row, we have a siphash val
         std::vector<SipHash> siphashs(rows);
         // result[j] means column index, i means rows index
-        for (int j = 0; j < result.size(); ++j) {
+        for (int j = 0; j < result_size; ++j) {
             auto column = block->get_by_position(result[j]).column;
             for (int i = 0; i < rows; ++i) {
                 column->update_hash_with_value(i, siphashs[i]);
             }
         }
 
+        // channel2rows' subscript means channel id 
+        std::vector<vectorized::UInt64> hash_vals(rows);
         for (int i = 0; i < rows; i++) {
-            auto target_channel_id = siphashs[i].get64() % num_channels;
-            RETURN_IF_ERROR(_channels[target_channel_id]->add_row(&send_block, 
i));
+            hash_vals[i] = siphashs[i].get64();
         }
 
+        RETURN_IF_ERROR(channel_add_rows(_channels, num_channels, hash_vals, 
rows, block));
     } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         // 1. caculate hash
         // 2. dispatch rows to channel
         int num_channels = _channel_shared_ptrs.size();
-        auto send_block = *block;
-        std::vector<int> result(_partition_expr_ctxs.size());
-        int counter = 0;
-        for (auto ctx : _partition_expr_ctxs) {
-            RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
-        }
+
+        int result_size = _partition_expr_ctxs.size();
+        int result[result_size];
+        RETURN_IF_ERROR(get_partition_column_result(block, result));
+
         // vectorized caculate hash val
         int rows = block->rows();
         // for each row, we have a hash_val
         std::vector<size_t> hash_vals(rows);

Review comment:
       we should reuse the mem

##########
File path: be/src/vec/sink/vdata_stream_sender.cpp
##########
@@ -394,69 +430,62 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block) {
         int num_channels = _channels.size();
         // will only copy schema
         // we don't want send temp columns
-        auto send_block = *block;
 
-        std::vector<int> result(_partition_expr_ctxs.size());
-        int counter = 0;
+        int result_size = _partition_expr_ctxs.size();
+        int result[result_size];
+        RETURN_IF_ERROR(get_partition_column_result(block, result));
 
-        for (auto ctx : _partition_expr_ctxs) {
-            RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
-        }
         // vectorized caculate hash
         int rows = block->rows();
         // for each row, we have a siphash val
         std::vector<SipHash> siphashs(rows);
         // result[j] means column index, i means rows index
-        for (int j = 0; j < result.size(); ++j) {
+        for (int j = 0; j < result_size; ++j) {
             auto column = block->get_by_position(result[j]).column;
             for (int i = 0; i < rows; ++i) {
                 column->update_hash_with_value(i, siphashs[i]);
             }
         }
 
+        // channel2rows' subscript means channel id 
+        std::vector<vectorized::UInt64> hash_vals(rows);

Review comment:
       we should reuse the mem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to