imay commented on a change in pull request #3143:
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r417387499



##########
File path: be/src/exec/tablet_sink.h
##########
@@ -68,99 +154,126 @@ class NodeChannel {
 
     Status add_row(Tuple* tuple, int64_t tablet_id);
 
-    Status close(RuntimeState* state);
+    // two ways to stop channel:
+    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
+    // 2. just cancel()
+    Status mark_close();
     Status close_wait(RuntimeState* state);
 
     void cancel();
 
-    int64_t node_id() const { return _node_id; }
+    // return:
+    // 0: stopped, send finished(eos request has been sent), or any internal 
error;
+    // 1: running, haven't reach eos.
+    // only allow 1 rpc in flight
+    int try_send_and_fetch_status();
+
+    void time_report(std::unordered_map<int64_t, AddBatchCounter>& 
add_batch_counter_map,
+                     int64_t* serialize_batch_ns, int64_t* 
mem_exceeded_block_ns,
+                     int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) {
+        add_batch_counter_map[_node_id] += _add_batch_counter;
+        *serialize_batch_ns += _serialize_batch_ns;
+        *mem_exceeded_block_ns += _mem_exceeded_block_ns;
+        *queue_push_lock_ns += _queue_push_lock_ns;
+        *actual_consume_ns += _actual_consume_ns;
+    }
 
-    void set_failed() { _already_failed = true; }
-    bool already_failed() const { return _already_failed; }
+    int64_t node_id() const { return _node_id; }
     const NodeInfo* node_info() const { return _node_info; }
+    std::string print_load_info() const { return _load_info; }
+    std::string name() const {
+        return "NodeChannel[" + std::to_string(_index_id) + "-" + 
std::to_string(_node_id) + "]";

Review comment:
       Prefer strings::Substitute in gutils/strings/Substitute.h.

##########
File path: be/src/exec/tablet_sink.h
##########
@@ -68,99 +154,126 @@ class NodeChannel {
 
     Status add_row(Tuple* tuple, int64_t tablet_id);
 
-    Status close(RuntimeState* state);
+    // two ways to stop channel:
+    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
+    // 2. just cancel()
+    Status mark_close();
     Status close_wait(RuntimeState* state);
 
     void cancel();
 
-    int64_t node_id() const { return _node_id; }
+    // return:
+    // 0: stopped, send finished(eos request has been sent), or any internal 
error;
+    // 1: running, haven't reach eos.
+    // only allow 1 rpc in flight
+    int try_send_and_fetch_status();
+
+    void time_report(std::unordered_map<int64_t, AddBatchCounter>& 
add_batch_counter_map,

Review comment:
       If param will be modified, prefer pointer rather than reference.

##########
File path: be/src/exec/tablet_sink.cpp
##########
@@ -128,54 +131,136 @@ Status NodeChannel::open_wait() {
     _open_closure = nullptr;
 
     // add batch closure
-    _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>();
-    _add_batch_closure->ref();
+    _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
+    _add_batch_closure->addFailedHandler([this]() {
+        _cancelled = true;
+        LOG(WARNING) << "NodeChannel add batch req rpc failed, " << 
print_load_info()
+                     << ", node=" << node_info()->host << ":" << 
node_info()->brpc_port;
+    });
+
+    _add_batch_closure->addSuccessHandler(
+            [this](const PTabletWriterAddBatchResult& result, bool 
is_last_rpc) {
+                Status status(result.status());
+                if (status.ok()) {
+                    if (is_last_rpc) {
+                        for (auto& tablet : result.tablet_vec()) {
+                            TTabletCommitInfo commit_info;
+                            commit_info.tabletId = tablet.tablet_id();
+                            commit_info.backendId = _node_id;
+                            
_tablet_commit_infos.emplace_back(std::move(commit_info));
+                        }
+                        _add_batches_finished = true;
+                    }
+                } else {
+                    _cancelled = true;
+                    LOG(WARNING) << "NodeChannel add batch req success but 
status isn't ok, "
+                                 << print_load_info() << ", node=" << 
node_info()->host << ":"
+                                 << node_info()->brpc_port << ", errmsg=" << 
status.get_error_msg();
+                }
+
+                if (result.has_execution_time_us()) {
+                    _add_batch_counter.add_batch_execution_time_us += 
result.execution_time_us();
+                    _add_batch_counter.add_batch_wait_lock_time_us += 
result.wait_lock_time_us();
+                    _add_batch_counter.add_batch_num++;
+                }
+            });
 
     return status;
 }
 
 Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
-    auto row_no = _batch->add_row();
+    // If add_row() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
+    auto st = none_of({_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, can't add_row. 
cancelled/eos: ");
+    }
+
+    // We use OlapTableSink mem_tracker which has the same ancestor of _plan 
node,
+    // so in the ideal case, mem limit is a matter for _plan node.
+    // But there is still some unfinished things, we do mem limit here 
temporarily.
+    while (_parent->_mem_tracker->any_limit_exceeded()) {
+        SCOPED_RAW_TIMER(&_mem_exceeded_block_ns);
+        SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+
+    auto row_no = _cur_batch->add_row();
     if (row_no == RowBatch::INVALID_ROW_INDEX) {
-        RETURN_IF_ERROR(_send_cur_batch());
-        row_no = _batch->add_row();
+        {
+            SCOPED_RAW_TIMER(&_queue_push_lock_ns);
+            std::lock_guard<std::mutex> l(_pending_batches_lock);
+            //To simplify the add_row logic, postpone adding batch into req 
until the time of sending req
+            _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
+            _pending_batches_num++;
+        }
+
+        _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, 
_parent->_mem_tracker));
+        _cur_add_batch_request.clear_tablet_ids();
+
+        row_no = _cur_batch->add_row();
     }
     DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX);
-    auto tuple = input_tuple->deep_copy(*_tuple_desc, 
_batch->tuple_data_pool());
-    _batch->get_row(row_no)->set_tuple(0, tuple);
-    _batch->commit_last_row();
-    _add_batch_request.add_tablet_ids(tablet_id);
+    auto tuple = input_tuple->deep_copy(*_tuple_desc, 
_cur_batch->tuple_data_pool());
+    _cur_batch->get_row(row_no)->set_tuple(0, tuple);
+    _cur_batch->commit_last_row();
+    _cur_add_batch_request.add_tablet_ids(tablet_id);
     return Status::OK();
 }
 
-Status NodeChannel::close(RuntimeState* state) {
-    auto st = _close(state);
-    _batch.reset();
-    return st;
-}
+Status NodeChannel::mark_close() {
+    auto st = none_of({_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, can't mark as closed. 
cancelled/eos: ");
+    }
 
-Status NodeChannel::_close(RuntimeState* state) {
-    return _send_cur_batch(true);
+    _cur_add_batch_request.set_eos(true);
+    {
+        std::lock_guard<std::mutex> l(_pending_batches_lock);
+        _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
+        _pending_batches_num++;
+        DCHECK(_pending_batches.back().second.eos());
+    }
+
+    _eos_is_produced = true;
+
+    _cur_batch.reset();
+    return Status::OK();
 }
 
 Status NodeChannel::close_wait(RuntimeState* state) {
-    RETURN_IF_ERROR(_wait_in_flight_packet());
-    Status status(_add_batch_closure->result.status());
-    if (status.ok()) {
-        for (auto& tablet : _add_batch_closure->result.tablet_vec()) {
-            TTabletCommitInfo commit_info;
-            commit_info.tabletId = tablet.tablet_id();
-            commit_info.backendId = _node_id;
-            state->tablet_commit_infos().emplace_back(std::move(commit_info));
-        }
+    auto st = none_of({_cancelled, !_eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, skip waiting for close. 
cancelled/!eos: ");
     }
-    // clear batch after sendt
-    _batch.reset();
-    return status;
+
+    // waiting for finished, it may take a long time, so we could't set a 
timeout
+    // use log to make it easier
+    LOG(INFO) << name() << "start close_wait";
+    while (!_add_batches_finished && !_cancelled) {
+        SleepFor(MonoDelta::FromMilliseconds(1));
+    }
+    LOG(INFO) << name() << "close_wait done";

Review comment:
       Do we need these log?

##########
File path: be/src/exec/tablet_sink.h
##########
@@ -68,99 +154,126 @@ class NodeChannel {
 
     Status add_row(Tuple* tuple, int64_t tablet_id);
 
-    Status close(RuntimeState* state);
+    // two ways to stop channel:
+    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
+    // 2. just cancel()
+    Status mark_close();
     Status close_wait(RuntimeState* state);
 
     void cancel();
 
-    int64_t node_id() const { return _node_id; }
+    // return:
+    // 0: stopped, send finished(eos request has been sent), or any internal 
error;
+    // 1: running, haven't reach eos.
+    // only allow 1 rpc in flight
+    int try_send_and_fetch_status();
+
+    void time_report(std::unordered_map<int64_t, AddBatchCounter>& 
add_batch_counter_map,
+                     int64_t* serialize_batch_ns, int64_t* 
mem_exceeded_block_ns,
+                     int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) {
+        add_batch_counter_map[_node_id] += _add_batch_counter;
+        *serialize_batch_ns += _serialize_batch_ns;
+        *mem_exceeded_block_ns += _mem_exceeded_block_ns;
+        *queue_push_lock_ns += _queue_push_lock_ns;
+        *actual_consume_ns += _actual_consume_ns;
+    }
 
-    void set_failed() { _already_failed = true; }
-    bool already_failed() const { return _already_failed; }
+    int64_t node_id() const { return _node_id; }
     const NodeInfo* node_info() const { return _node_info; }
+    std::string print_load_info() const { return _load_info; }
+    std::string name() const {
+        return "NodeChannel[" + std::to_string(_index_id) + "-" + 
std::to_string(_node_id) + "]";
+    }
 
-private:
-    Status _send_cur_batch(bool eos = false);
-    // wait inflight packet finish, return error if inflight packet return 
failed
-    Status _wait_in_flight_packet();
-
-    Status _close(RuntimeState* state);
+    Status none_of(std::initializer_list<bool> vars);
 
 private:
     OlapTableSink* _parent = nullptr;
     int64_t _index_id = -1;
     int64_t _node_id = -1;
     int32_t _schema_hash = 0;
+    std::string _load_info;
 
     TupleDescriptor* _tuple_desc = nullptr;
     const NodeInfo* _node_info = nullptr;
 
-    bool _already_failed = false;
-    bool _has_in_flight_packet = false;
     // this should be set in init() using config
     int _rpc_timeout_ms = 60000;
     int64_t _next_packet_seq = 0;
 
-    std::unique_ptr<RowBatch> _batch;
+    // user cancel or get some errors
+    std::atomic<bool> _cancelled{false};
+
+    std::atomic<bool> _send_finished{false};
+    std::atomic<bool> _add_batches_finished{false};

Review comment:
       what's the difference between these two flags?
   Should comment to let us know




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to