This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 395840cbbb [Chore](refactor) Split IndexChannel from vtablet_sink.h 
into vtablet_sink.cc (#22848)
395840cbbb is described below

commit 395840cbbba9438793b4240be0da9c0525dcea33
Author: Jack Drogon <[email protected]>
AuthorDate: Sun Aug 13 10:21:12 2023 +0800

    [Chore](refactor) Split IndexChannel from vtablet_sink.h into 
vtablet_sink.cc (#22848)
    
    Signed-off-by: Jack Drogon <[email protected]>
---
 be/src/vec/sink/vtablet_sink.cpp | 78 +++++++++++++++++++++++++++++++++++++++-
 be/src/vec/sink/vtablet_sink.h   | 78 ----------------------------------------
 2 files changed, 77 insertions(+), 79 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 39589a239d..536a8cf330 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -101,7 +101,83 @@ class TExpr;
 
 namespace stream_load {
 
-IndexChannel::~IndexChannel() = default;
+class IndexChannel {
+public:
+    IndexChannel(VOlapTableSink* parent, int64_t index_id,
+                 const vectorized::VExprContextSPtr& where_clause)
+            : _parent(parent), _index_id(index_id), 
_where_clause(where_clause) {
+        _index_channel_tracker =
+                std::make_unique<MemTracker>("IndexChannel:indexID=" + 
std::to_string(_index_id));
+    }
+    ~IndexChannel() = default;
+
+    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets);
+
+    void for_each_node_channel(
+            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
+        for (auto& it : _node_channels) {
+            func(it.second);
+        }
+    }
+
+    void mark_as_failed(int64_t node_id, const std::string& host, const 
std::string& err,
+                        int64_t tablet_id = -1);
+    Status check_intolerable_failure();
+
+    // set error tablet info in runtime state, so that it can be returned to 
FE.
+    void set_error_tablet_in_state(RuntimeState* state);
+
+    size_t num_node_channels() const { return _node_channels.size(); }
+
+    size_t get_pending_bytes() const {
+        size_t mem_consumption = 0;
+        for (auto& kv : _node_channels) {
+            mem_consumption += kv.second->get_pending_bytes();
+        }
+        return mem_consumption;
+    }
+
+    void set_tablets_received_rows(
+            const std::vector<std::pair<int64_t, int64_t>>& 
tablets_received_rows, int64_t node_id);
+
+    // check whether the rows num written by different replicas is consistent
+    Status check_tablet_received_rows_consistency();
+
+    vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
+
+private:
+    friend class VNodeChannel;
+    friend class VOlapTableSink;
+
+    VOlapTableSink* _parent;
+    int64_t _index_id;
+    vectorized::VExprContextSPtr _where_clause;
+
+    // from backend channel to tablet_id
+    // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
+    // Because the destruct order of objects is opposite to the creation order.
+    // So NodeChannel will be destructured first.
+    // And the destructor function of NodeChannel waits for all RPCs to finish.
+    // This ensures that it is safe to use `_tablets_by_channel` in the 
callback function for the end of the RPC.
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> 
_tablets_by_channel;
+    // BeId -> channel
+    std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
+    // from tablet_id to backend channel
+    std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> 
_channels_by_tablet;
+
+    // lock to protect _failed_channels and _failed_channels_msgs
+    mutable doris::SpinLock _fail_lock;
+    // key is tablet_id, value is a set of failed node id
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
+    // key is tablet_id, value is error message
+    std::unordered_map<int64_t, std::string> _failed_channels_msgs;
+    Status _intolerable_failure_status = Status::OK();
+
+    std::unique_ptr<MemTracker> _index_channel_tracker;
+    // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, 
rows_num>
+    // used to verify whether the rows num received by different replicas is 
consistent
+    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> 
_tablets_received_rows;
+};
 
 Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets) {
     SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index be50285fd8..8b45f251ee 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -382,84 +382,6 @@ protected:
     ReusableClosure<PTabletWriterAddBlockResult>* _add_block_closure = nullptr;
 };
 
-class IndexChannel {
-public:
-    IndexChannel(VOlapTableSink* parent, int64_t index_id,
-                 const vectorized::VExprContextSPtr& where_clause)
-            : _parent(parent), _index_id(index_id), 
_where_clause(where_clause) {
-        _index_channel_tracker =
-                std::make_unique<MemTracker>("IndexChannel:indexID=" + 
std::to_string(_index_id));
-    }
-    ~IndexChannel();
-
-    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets);
-
-    void for_each_node_channel(
-            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
-        for (auto& it : _node_channels) {
-            func(it.second);
-        }
-    }
-
-    void mark_as_failed(int64_t node_id, const std::string& host, const 
std::string& err,
-                        int64_t tablet_id = -1);
-    Status check_intolerable_failure();
-
-    // set error tablet info in runtime state, so that it can be returned to 
FE.
-    void set_error_tablet_in_state(RuntimeState* state);
-
-    size_t num_node_channels() const { return _node_channels.size(); }
-
-    size_t get_pending_bytes() const {
-        size_t mem_consumption = 0;
-        for (auto& kv : _node_channels) {
-            mem_consumption += kv.second->get_pending_bytes();
-        }
-        return mem_consumption;
-    }
-
-    void set_tablets_received_rows(
-            const std::vector<std::pair<int64_t, int64_t>>& 
tablets_received_rows, int64_t node_id);
-
-    // check whether the rows num written by different replicas is consistent
-    Status check_tablet_received_rows_consistency();
-
-    vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
-
-private:
-    friend class VNodeChannel;
-    friend class VOlapTableSink;
-
-    VOlapTableSink* _parent;
-    int64_t _index_id;
-    vectorized::VExprContextSPtr _where_clause;
-
-    // from backend channel to tablet_id
-    // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
-    // Because the destruct order of objects is opposite to the creation order.
-    // So NodeChannel will be destructured first.
-    // And the destructor function of NodeChannel waits for all RPCs to finish.
-    // This ensures that it is safe to use `_tablets_by_channel` in the 
callback function for the end of the RPC.
-    std::unordered_map<int64_t, std::unordered_set<int64_t>> 
_tablets_by_channel;
-    // BeId -> channel
-    std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
-    // from tablet_id to backend channel
-    std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> 
_channels_by_tablet;
-
-    // lock to protect _failed_channels and _failed_channels_msgs
-    mutable doris::SpinLock _fail_lock;
-    // key is tablet_id, value is a set of failed node id
-    std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
-    // key is tablet_id, value is error message
-    std::unordered_map<int64_t, std::string> _failed_channels_msgs;
-    Status _intolerable_failure_status = Status::OK();
-
-    std::unique_ptr<MemTracker> _index_channel_tracker;
-    // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, 
rows_num>
-    // used to verify whether the rows num received by different replicas is 
consistent
-    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> 
_tablets_received_rows;
-};
-
 // Write block data to Olap Table.
 // When OlapTableSink::open() called, there will be a consumer thread running 
in the background.
 // When you call VOlapTableSink::send(), you will be the producer who products 
pending batches.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to