xinyiZzz commented on code in PR #20771:
URL: https://github.com/apache/doris/pull/20771#discussion_r1241060022


##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1393,133 +1385,225 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
     return Status::OK();
 }
 
-Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
-        return _close_status;
+Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
+        Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
+        const std::shared_ptr<VNodeChannel> nch) {
+    LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
+    // cancel the node channel in best effort
+    nch->cancel(err_msg);
+
+    // check if index has intolerable failure
+    Status index_st = ich->check_intolerable_failure();
+    if (!index_st.ok()) {
+        status = index_st;
+    } else if (Status st = ich->check_tablet_received_rows_consistency(); 
!st.ok()) {
+        status = st;
+    }
+    return status;
+}
+
+void VOlapTableSink::_cancel_all_channel(Status status) {
+    for (const auto& index_channel : _channels) {
+        index_channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
+            ch->cancel(status.to_string());
+        });
+    }
+    LOG(INFO) << fmt::format(
+            "close olap table sink. load_id={}, txn_id={}, canceled all node 
channels due to "
+            "error: {}",
+            print_id(_load_id), _txn_id, status);
+}
+
+bool VOlapTableSink::is_pending_finish() {
+    if (_pending_finish) {
+        bool pending_finish = false;

Review Comment:
   done



##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1393,133 +1385,225 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
     return Status::OK();
 }
 
-Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
-        return _close_status;
+Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
+        Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
+        const std::shared_ptr<VNodeChannel> nch) {
+    LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
+    // cancel the node channel in best effort
+    nch->cancel(err_msg);
+
+    // check if index has intolerable failure
+    Status index_st = ich->check_intolerable_failure();
+    if (!index_st.ok()) {
+        status = index_st;
+    } else if (Status st = ich->check_tablet_received_rows_consistency(); 
!st.ok()) {
+        status = st;
+    }
+    return status;
+}
+
+void VOlapTableSink::_cancel_all_channel(Status status) {
+    for (const auto& index_channel : _channels) {
+        index_channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
+            ch->cancel(status.to_string());
+        });
+    }
+    LOG(INFO) << fmt::format(
+            "close olap table sink. load_id={}, txn_id={}, canceled all node 
channels due to "
+            "error: {}",
+            print_id(_load_id), _txn_id, status);
+}
+
+bool VOlapTableSink::is_pending_finish() {
+    if (_pending_finish) {
+        bool pending_finish = false;
+        for (const auto& index_channel : _channels) {
+            index_channel->for_each_node_channel(
+                    [&pending_finish](const std::shared_ptr<VNodeChannel>& ch) 
{
+                        pending_finish |= ch->is_pending_finish();
+                    });
+        }
+        _pending_finish = pending_finish;
+    }
+    return _pending_finish;
+}
+
+void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
+    if (!_pending_finish) {
+        return;
     }
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
     if (status.ok()) {
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
         SCOPED_TIMER(_profile->total_time_counter());
-        // BE id -> add_batch method counter
-        std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
-        int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, 
actual_consume_ns = 0,
-                total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 
0,
-                total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, 
total_add_batch_num = 0,
-                num_node_channels = 0;
-        VNodeChannelStat channel_stat;
         {
-            if (config::enable_lazy_open_partition) {
-                for (auto index_channel : _channels) {
-                    index_channel->for_each_node_channel(
-                            [](const std::shared_ptr<VNodeChannel>& ch) {
-                                ch->open_partition_wait();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to