This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 90fb3b7783 [Improvement](load) accelerate tablet sink (#12174)
90fb3b7783 is described below

commit 90fb3b77830d75093f11208a0419f79dd42d042e
Author: Gabriel <[email protected]>
AuthorDate: Thu Sep 1 10:08:09 2022 +0800

    [Improvement](load) accelerate tablet sink (#12174)
---
 be/src/exec/tablet_sink.h                       |  7 ++-
 be/src/vec/columns/column.h                     |  5 +++
 be/src/vec/columns/column_array.h               |  5 +++
 be/src/vec/columns/column_complex.h             |  5 +++
 be/src/vec/columns/column_const.h               |  5 +++
 be/src/vec/columns/column_decimal.h             |  5 +++
 be/src/vec/columns/column_dictionary.h          |  5 +++
 be/src/vec/columns/column_dummy.h               | 14 ++++++
 be/src/vec/columns/column_fixed_length_object.h |  5 +++
 be/src/vec/columns/column_impl.h                | 15 +++++++
 be/src/vec/columns/column_nullable.h            |  5 +++
 be/src/vec/columns/column_string.h              |  5 +++
 be/src/vec/columns/column_vector.h              |  5 +++
 be/src/vec/columns/predicate_column.h           |  5 +++
 be/src/vec/core/block.cpp                       |  8 ++++
 be/src/vec/core/block.h                         |  2 +
 be/src/vec/sink/vtablet_sink.cpp                | 60 ++++++++++++++++++-------
 be/src/vec/sink/vtablet_sink.h                  |  4 +-
 18 files changed, 146 insertions(+), 19 deletions(-)

diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 6b9b2f1c32..5f6ad148eb 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -183,8 +183,11 @@ public:
     virtual Status open_wait();
 
     Status add_row(Tuple* tuple, int64_t tablet_id);
-    virtual Status add_row(const BlockRow& block_row, int64_t tablet_id) {
-        LOG(FATAL) << "add block row to NodeChannel not supported";
+
+    virtual Status add_block(vectorized::Block* block,
+                             const 
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+                                             std::vector<int64_t>>& payload) {
+        LOG(FATAL) << "add block to NodeChannel not supported";
         return Status::OK();
     }
 
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 002fc8eea3..4307153739 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -361,6 +361,8 @@ public:
     virtual std::vector<MutablePtr> scatter(ColumnIndex num_columns,
                                             const Selector& selector) const = 
0;
 
+    virtual void append_data_by_selector(MutablePtr& res, const Selector& 
selector) const = 0;
+
     /// Insert data from several other columns according to source mask (used 
in vertical merge).
     /// For now it is a helper to de-virtualize calls to insert*() functions 
inside gather loop
     /// (descendants should call gatherer_stream.gather(*this) to implement 
this function.)
@@ -530,6 +532,9 @@ protected:
     /// In derived classes (that use final keyword), implement scatter method 
as call to scatter_impl.
     template <typename Derived>
     std::vector<MutablePtr> scatter_impl(ColumnIndex num_columns, const 
Selector& selector) const;
+
+    template <typename Derived>
+    void append_data_by_selector_impl(MutablePtr& res, const Selector& 
selector) const;
 };
 
 using ColumnPtr = IColumn::Ptr;
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index 182f7b185d..50f864fbb9 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -136,6 +136,11 @@ public:
         return scatter_impl<ColumnArray>(num_columns, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        return append_data_by_selector_impl<ColumnArray>(res, selector);
+    }
+
     void for_each_subcolumn(ColumnCallback callback) override {
         callback(offsets);
         callback(data);
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index fe732ec8a9..e61bfa8e8b 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -230,6 +230,11 @@ public:
         LOG(FATAL) << "scatter not implemented";
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        this->template append_data_by_selector_impl<ColumnComplexType<T>>(res, 
selector);
+    }
+
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
         DCHECK(size() > self_row);
         data[self_row] = static_cast<const Self&>(rhs).data[row];
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 8a3646025f..615c936a5e 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -165,6 +165,11 @@ public:
 
     MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) 
const override;
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        LOG(FATAL) << "append_data_by_selector is not supported in 
ColumnConst!";
+    }
+
     void get_extremes(Field& min, Field& max) const override { 
data->get_extremes(min, max); }
 
     void for_each_subcolumn(ColumnCallback callback) override { 
callback(data); }
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index e318271b13..70ff712a08 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -194,6 +194,11 @@ public:
         return this->template scatter_impl<Self>(num_columns, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        this->template append_data_by_selector_impl<Self>(res, selector);
+    }
+
     //    void gather(ColumnGathererStream & gatherer_stream) override;
 
     bool structure_equals(const IColumn& rhs) const override {
diff --git a/be/src/vec/columns/column_dictionary.h 
b/be/src/vec/columns/column_dictionary.h
index 8a6cc0c905..d56265b757 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -185,6 +185,11 @@ public:
         LOG(FATAL) << "scatter not supported in ColumnDictionary";
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        LOG(FATAL) << "append_data_by_selector is not supported in 
ColumnDictionary!";
+    }
+
     Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* 
col_ptr) override {
         auto* res_col = reinterpret_cast<vectorized::ColumnString*>(col_ptr);
         for (size_t i = 0; i < sel_size; i++) {
diff --git a/be/src/vec/columns/column_dummy.h 
b/be/src/vec/columns/column_dummy.h
index 4937e1391b..e7fb657161 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -125,6 +125,20 @@ public:
         return res;
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        size_t num_rows = size();
+
+        if (num_rows < selector.size()) {
+            LOG(FATAL) << fmt::format("Size of selector: {}, is larger than 
size of column:{}",
+                                      selector.size(), num_rows);
+        }
+
+        res->reserve(num_rows);
+
+        for (size_t i = 0; i < selector.size(); ++i) res->insert_from(*this, 
selector[i]);
+    }
+
     void get_extremes(Field&, Field&) const override {}
 
     void addSize(size_t delta) { s += delta; }
diff --git a/be/src/vec/columns/column_fixed_length_object.h 
b/be/src/vec/columns/column_fixed_length_object.h
index 5ecb13ec61..97098d2e5d 100644
--- a/be/src/vec/columns/column_fixed_length_object.h
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -165,6 +165,11 @@ public:
         LOG(FATAL) << "scatter not supported";
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        LOG(FATAL) << "append_data_by_selector is not supported!";
+    }
+
     void get_extremes(Field& min, Field& max) const override {
         LOG(FATAL) << "get_extremes not supported";
     }
diff --git a/be/src/vec/columns/column_impl.h b/be/src/vec/columns/column_impl.h
index 4e12f98d51..215eee711e 100644
--- a/be/src/vec/columns/column_impl.h
+++ b/be/src/vec/columns/column_impl.h
@@ -59,4 +59,19 @@ std::vector<IColumn::MutablePtr> 
IColumn::scatter_impl(ColumnIndex num_columns,
     return columns;
 }
 
+template <typename Derived>
+void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector& 
selector) const {
+    size_t num_rows = size();
+
+    if (num_rows < selector.size()) {
+        LOG(FATAL) << fmt::format("Size of selector: {}, is larger than size 
of column:{}",
+                                  selector.size(), num_rows);
+    }
+
+    res->reserve(num_rows);
+
+    for (size_t i = 0; i < selector.size(); ++i)
+        static_cast<Derived&>(*res).insert_from(*this, selector[i]);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index c9268118df..05807e10ec 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -165,6 +165,11 @@ public:
         return scatter_impl<ColumnNullable>(num_columns, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        append_data_by_selector_impl<ColumnNullable>(res, selector);
+    }
+
     //    void gather(ColumnGathererStream & gatherer_stream) override;
 
     void for_each_subcolumn(ColumnCallback callback) override {
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index bbe6993ae2..d8a704e362 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -302,6 +302,11 @@ public:
         return scatter_impl<ColumnString>(num_columns, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        append_data_by_selector_impl<ColumnString>(res, selector);
+    }
+
     //    void gather(ColumnGathererStream & gatherer_stream) override;
 
     void reserve(size_t n) override;
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index c673f0b7bd..f97dae0cc4 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -342,6 +342,11 @@ public:
         return this->template scatter_impl<Self>(num_columns, selector);
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        this->template append_data_by_selector_impl<Self>(res, selector);
+    }
+
     //    void gather(ColumnGathererStream & gatherer_stream) override;
 
     bool can_be_inside_nullable() const override { return true; }
diff --git a/be/src/vec/columns/predicate_column.h 
b/be/src/vec/columns/predicate_column.h
index 1131f93d59..6662545b2f 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -397,6 +397,11 @@ public:
         LOG(FATAL) << "scatter not supported in PredicateColumnType";
     }
 
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        LOG(FATAL) << "append_data_by_selector is not supported in 
PredicateColumnType!";
+    }
+
     Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* 
col_ptr) override {
         if constexpr (std::is_same_v<T, StringValue>) {
             insert_string_to_res_column(sel, sel_size,
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 18b63a0975..b9562addd0 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -640,6 +640,14 @@ Block Block::copy_block(const std::vector<int>& 
column_offset) const {
     return columns_with_type_and_name;
 }
 
+void Block::append_block_by_selector(MutableColumns& columns,
+                                     const IColumn::Selector& selector) const {
+    DCHECK(data.size() == columns.size());
+    for (size_t i = 0; i < data.size(); i++) {
+        data[i].column->append_data_by_selector(columns[i], selector);
+    }
+}
+
 Status Block::filter_block(Block* block, int filter_column_id, int 
column_to_keep) {
     ColumnPtr filter_column = block->get_by_position(filter_column_id).column;
     if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 5fe90fdf6d..8895980e4d 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -257,6 +257,8 @@ public:
     // copy a new block by the offset column
     Block copy_block(const std::vector<int>& column_offset) const;
 
+    void append_block_by_selector(MutableColumns& columns, const 
IColumn::Selector& selector) const;
+
     static void filter_block_internal(Block* block, const IColumn::Filter& 
filter,
                                       uint32_t column_to_keep);
 
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index a9d091bf75..1c1ef24579 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -23,6 +23,7 @@
 #include "util/doris_metrics.h"
 #include "util/proto_util.h"
 #include "util/time.h"
+#include "vec/columns/column.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
@@ -160,9 +161,11 @@ Status VNodeChannel::open_wait() {
     return status;
 }
 
-Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
+Status VNodeChannel::add_block(vectorized::Block* block,
+                               const 
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+                                               std::vector<int64_t>>& payload) 
{
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    // If add_row() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
+    // If add_block() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
         if (_cancelled) {
@@ -177,22 +180,24 @@ Status VNodeChannel::add_row(const BlockRow& block_row, 
int64_t tablet_id) {
     // so in the ideal case, mem limit is a matter for _plan node.
     // But there is still some unfinished things, we do mem limit here 
temporarily.
     // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
-    // It's fine to do a fake add_row() and return OK, because we will check 
_cancelled in next add_row() or mark_close().
+    // It's fine to do a fake add_block() and return OK, because we will check 
_cancelled in next add_block() or mark_close().
     while (!_cancelled && _pending_batches_num > 0 &&
            _pending_batches_bytes > _max_pending_batches_bytes) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     }
 
-    _cur_mutable_block->add_row(block_row.first, block_row.second);
-    _cur_add_block_request.add_tablet_ids(tablet_id);
+    block->append_block_by_selector(_cur_mutable_block->mutable_columns(), 
*(payload.first));
+    for (auto tablet_id : payload.second) {
+        _cur_add_block_request.add_tablet_ids(tablet_id);
+    }
 
-    if (_cur_mutable_block->rows() == _batch_size ||
+    if (_cur_mutable_block->rows() >= _batch_size ||
         _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
         {
             SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
             std::lock_guard<std::mutex> l(_pending_batches_lock);
-            //To simplify the add_row logic, postpone adding block into req 
until the time of sending req
+            // To simplify the add_row logic, postpone adding block into req 
until the time of sending req
             _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
             _pending_blocks.emplace(std::move(_cur_mutable_block), 
_cur_add_block_request);
             _pending_batches_num++;
@@ -480,12 +485,11 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
         _partition_to_tablet_map.clear();
     }
 
-    //if pending bytes is more than 500M, wait
-    //constexpr size_t MAX_PENDING_BYTES = 500 * 1024 * 1024;
-    //while (get_pending_bytes() > MAX_PENDING_BYTES) {
-    //    std::this_thread::sleep_for(std::chrono::microseconds(500));
-    //}
-
+    std::vector<std::unordered_map<
+            NodeChannel*,
+            std::pair<std::unique_ptr<vectorized::IColumn::Selector>, 
std::vector<int64_t>>>>
+            channel_to_payload;
+    channel_to_payload.resize(_channels.size());
     for (int i = 0; i < num_rows; ++i) {
         if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
             continue;
@@ -520,12 +524,36 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
             tablet_index = _vpartition->find_tablet(&block_row, *partition);
         }
         for (int j = 0; j < partition->indexes.size(); ++j) {
-            int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
-            _channels[j]->add_row(block_row, tablet_id);
+            auto tid = partition->indexes[j].tablets[tablet_index];
+            auto it = _channels[j]->_channels_by_tablet.find(tid);
+            DCHECK(it != _channels[j]->_channels_by_tablet.end())
+                    << "unknown tablet, tablet_id=" << tablet_index;
+            for (const auto& channel : it->second) {
+                if (channel_to_payload[j].count(channel.get()) < 1) {
+                    channel_to_payload[j].insert(
+                            {channel.get(),
+                             
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+                                       std::vector<int64_t>> {
+                                     
std::unique_ptr<vectorized::IColumn::Selector>(
+                                             new 
vectorized::IColumn::Selector()),
+                                     std::vector<int64_t>()}});
+                }
+                channel_to_payload[j][channel.get()].first->push_back(i);
+                channel_to_payload[j][channel.get()].second.push_back(tid);
+            }
             _number_output_rows++;
         }
     }
-
+    for (size_t i = 0; i < _channels.size(); i++) {
+        for (const auto& entry : channel_to_payload[i]) {
+            // if this node channel is already failed, this add_row will be 
skipped
+            auto st = entry.first->add_block(&block, entry.second);
+            if (!st.ok()) {
+                _channels[i]->mark_as_failed(entry.first->node_id(), 
entry.first->host(),
+                                             st.get_error_msg());
+            }
+        }
+    }
     // check intolerable failure
     for (const auto& index_channel : _channels) {
         RETURN_IF_ERROR(index_channel->check_intolerable_failure());
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 36943473a8..c841cab5c0 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -38,7 +38,9 @@ public:
 
     Status open_wait() override;
 
-    Status add_row(const BlockRow& block_row, int64_t tablet_id) override;
+    Status add_block(vectorized::Block* block,
+                     const 
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+                                     std::vector<int64_t>>& payload) override;
 
     int try_send_and_fetch_status(RuntimeState* state,
                                   std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to