github-actions[bot] commented on code in PR #15283:
URL: https://github.com/apache/doris/pull/15283#discussion_r1055337905


##########
be/src/vec/sink/vtablet_sink.h:
##########
@@ -29,36 +50,265 @@ class VExprContext;
 
 namespace stream_load {
 
-class VNodeChannel : public NodeChannel {
+// The counter of add_batch rpc of a single node
+struct AddBatchCounter {
+    // total execution time of a add_batch rpc
+    int64_t add_batch_execution_time_us = 0;
+    // lock waiting time in a add_batch rpc
+    int64_t add_batch_wait_execution_time_us = 0;
+    // number of add_batch call
+    int64_t add_batch_num = 0;
+    // time passed between marked close and finish close
+    int64_t close_wait_time_ms = 0;
+
+    AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
+        add_batch_execution_time_us += rhs.add_batch_execution_time_us;
+        add_batch_wait_execution_time_us += 
rhs.add_batch_wait_execution_time_us;
+        add_batch_num += rhs.add_batch_num;
+        close_wait_time_ms += rhs.close_wait_time_ms;
+        return *this;
+    }
+    friend AddBatchCounter operator+(const AddBatchCounter& lhs, const 
AddBatchCounter& rhs) {
+        AddBatchCounter sum = lhs;
+        sum += rhs;
+        return sum;
+    }
+};
+
+// It's very error-prone to guarantee the handler capture vars' & this 
closure's destruct sequence.
+// So using create() to get the closure pointer is recommended. We can delete 
the closure ptr before the capture vars destruction.
+// Delete this point is safe, don't worry about RPC callback will run after 
ReusableClosure deleted.
+template <typename T>
+class ReusableClosure final : public google::protobuf::Closure {
 public:
-    VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t 
node_id);
+    ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
+    ~ReusableClosure() override {
+        // shouldn't delete when Run() is calling or going to be called, wait 
for current Run() done.
+        join();
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+        cntl.Reset();
+    }
+
+    static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+
+    void addFailedHandler(const std::function<void(bool)>& fn) { 
failed_handler = fn; }
+    void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { 
success_handler = fn; }
+
+    void join() {
+        // We rely on in_flight to assure one rpc is running,
+        // while cid is not reliable due to memory order.
+        // in_flight is written before getting callid,
+        // so we can not use memory fence to synchronize.
+        while (_packet_in_flight) {
+            // cid here is complicated
+            if (cid != INVALID_BTHREAD_ID) {
+                // actually cid may be the last rpc call id.
+                brpc::Join(cid);
+            }
+            if (_packet_in_flight) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+        }
+    }
+
+    // plz follow this order: reset() -> set_in_flight() -> send brpc batch
+    void reset() {
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+        cntl.Reset();
+        cid = cntl.call_id();
+    }
+
+    bool try_set_in_flight() {
+        bool value = false;
+        return _packet_in_flight.compare_exchange_strong(value, true);
+    }
+
+    void clear_in_flight() { _packet_in_flight = false; }
+
+    bool is_packet_in_flight() { return _packet_in_flight; }
+
+    void end_mark() {
+        DCHECK(_is_last_rpc == false);
+        _is_last_rpc = true;
+    }
+
+    void Run() override {
+        DCHECK(_packet_in_flight);
+        if (cntl.Failed()) {
+            LOG(WARNING) << "failed to send brpc batch, error=" << 
berror(cntl.ErrorCode())
+                         << ", error_text=" << cntl.ErrorText();
+            failed_handler(_is_last_rpc);
+        } else {
+            success_handler(result, _is_last_rpc);
+        }
+        clear_in_flight();
+    }
+
+    brpc::Controller cntl;
+    T result;
+
+private:
+    brpc::CallId cid;
+    std::atomic<bool> _packet_in_flight {false};
+    std::atomic<bool> _is_last_rpc {false};
+    std::function<void(bool)> failed_handler;
+    std::function<void(const T&, bool)> success_handler;
+};
+
+class IndexChannel;
+class VOlapTableSink;
+
+class VNodeChannel {
+public:
+    VNodeChannel(VOlapTableSink* parent, IndexChannel* index_channel, int64_t 
node_id);
+
+    ~VNodeChannel();
+
+    // called before open, used to add tablet located in this backend
+    void add_tablet(const TTabletWithPartition& tablet) { 
_all_tablets.emplace_back(tablet); }
+
+    void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& 
slave_nodes) {
+        _slave_tablet_nodes[tablet_id] = slave_nodes;
+    }
 
-    ~VNodeChannel() override;
+    void open();
 
-    Status init(RuntimeState* state) override;
+    Status init(RuntimeState* state);
 
-    Status open_wait() override;
+    Status open_wait();
 
     Status add_block(vectorized::Block* block,
                      const 
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
-                                     std::vector<int64_t>>& payload) override;
+                                     std::vector<int64_t>>& payload);
 
     int try_send_and_fetch_status(RuntimeState* state,
-                                  std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) override;
+                                  std::unique_ptr<ThreadPoolToken>& 
thread_pool_token);
 
     void try_send_block(RuntimeState* state);
 
-    void clear_all_blocks() override;
+    void clear_all_blocks();
 
     // two ways to stop channel:
     // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
     // 2. just cancel()
-    void mark_close() override;
+    void mark_close();
+
+    // two ways to stop channel:
+    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
+    // 2. just cancel()
+    Status close_wait(RuntimeState* state);
+
+    void cancel(const std::string& cancel_msg);
+
+    void time_report(std::unordered_map<int64_t, AddBatchCounter>* 
add_batch_counter_map,
+                     int64_t* serialize_batch_ns, int64_t* 
mem_exceeded_block_ns,
+                     int64_t* queue_push_lock_ns, int64_t* actual_consume_ns,
+                     int64_t* total_add_batch_exec_time_ns, int64_t* 
add_batch_exec_time_ns,
+                     int64_t* total_add_batch_num) const {
+        (*add_batch_counter_map)[_node_id] += _add_batch_counter;
+        (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
+        *serialize_batch_ns += _serialize_batch_ns;
+        *mem_exceeded_block_ns += _mem_exceeded_block_ns;
+        *queue_push_lock_ns += _queue_push_lock_ns;
+        *actual_consume_ns += _actual_consume_ns;
+        *add_batch_exec_time_ns = 
(_add_batch_counter.add_batch_execution_time_us * 1000);
+        *total_add_batch_exec_time_ns += *add_batch_exec_time_ns;
+        *total_add_batch_num += _add_batch_counter.add_batch_num;
+    }
+
+    int64_t node_id() const { return _node_id; }
+    std::string host() const { return _node_info.host; }
+    std::string name() const { return _name; }
+
+    Status none_of(std::initializer_list<bool> vars);
+
+    std::string channel_info() const {
+        return fmt::format("{}, {}, node={}:{}", _name, _load_info, 
_node_info.host,
+                           _node_info.brpc_port);
+    }
+
+    size_t get_pending_bytes() { return _pending_batches_bytes; }
 
 protected:
-    void _close_check() override;
+    void _close_check();
+    void _cancel_with_msg(const std::string& msg);
+
+    VOlapTableSink* _parent = nullptr;
+    IndexChannel* _index_channel = nullptr;
+    int64_t _node_id = -1;
+    std::string _load_info;
+    std::string _name;
+
+    std::shared_ptr<MemTracker> _node_channel_tracker;
+
+    TupleDescriptor* _tuple_desc = nullptr;
+    NodeInfo _node_info;
+
+    // this should be set in init() using config
+    int _rpc_timeout_ms = 60000;
+    int64_t _next_packet_seq = 0;
+    MonotonicStopWatch _timeout_watch;
+
+    // the timestamp when this node channel be marked closed and finished 
closed
+    uint64_t _close_time_ms = 0;
+
+    // user cancel or get some errors
+    std::atomic<bool> _cancelled {false};
+    doris::SpinLock _cancel_msg_lock;
+    std::string _cancel_msg = "";

Review Comment:
   warning: redundant string initialization [readability-redundant-string-init]
   
   ```suggestion
       std::string _cancel_msg;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to