dataroaring commented on code in PR #52354:
URL: https://github.com/apache/doris/pull/52354#discussion_r2174390017


##########
be/src/vec/sink/writer/vtablet_writer.cpp:
##########
@@ -293,23 +293,56 @@ static Status 
cancel_channel_and_check_intolerable_failure(Status status,
 Status IndexChannel::close_wait(
         RuntimeState* state, WriterStats* writer_stats,
         std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
-        std::unordered_set<int64_t> unfinished_node_channel_ids) {
+        std::unordered_set<int64_t> unfinished_node_channel_ids,
+        bool need_wait_after_quorum_success) {
+    DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
+                    { return Status::TimedOut("injected timeout"); });
     Status status = Status::OK();
+    // 1. wait quorum success
     while (true) {
-        status = check_each_node_channel_close(&unfinished_node_channel_ids,
-                                               node_add_batch_counter_map, 
writer_stats, status);
-        if (!status.ok() || unfinished_node_channel_ids.empty()) {
-            LOG(INFO) << ", is all unfinished: " << 
unfinished_node_channel_ids.empty()
-                      << ", status: " << status << ", txn_id: " << 
_parent->_txn_id
+        RETURN_IF_ERROR(check_each_node_channel_close(
+                &unfinished_node_channel_ids, node_add_batch_counter_map, 
writer_stats, status));
+        bool quorum_success = _quorum_success(unfinished_node_channel_ids);
+        if (unfinished_node_channel_ids.empty() || quorum_success) {
+            LOG(INFO) << "quorum success, quorum_success: " << quorum_success
+                      << ", is all unfinished: " << 
unfinished_node_channel_ids.empty()
+                      << ", txn_id: " << _parent->_txn_id
                       << ", load_id: " << print_id(_parent->_load_id);
             break;
         }
         bthread_usleep(1000 * 10);
     }
 
-    DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
-                    { status = Status::TimedOut("injected timeout"); });
-
+    // 2. wait for all node channel to complete as much as possible
+    if (!unfinished_node_channel_ids.empty() && 
need_wait_after_quorum_success) {
+        int64_t max_wait_time_ms = 
_calc_max_wait_time_ms(unfinished_node_channel_ids);
+        while (true) {
+            
RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
+                                                          
node_add_batch_counter_map, writer_stats,
+                                                          status));
+            if (unfinished_node_channel_ids.empty()) {
+                break;
+            }
+            int64_t elapsed_ms = UnixMillis() - _start_time;
+            if (elapsed_ms > max_wait_time_ms ||
+                _parent->_load_channel_timeout_s - elapsed_ms / 1000 <
+                        config::quorum_success_load_timeout_remaining_seconds) 
{
+                // cancel unfinished node channel
+                std::stringstream unfinished_node_channel_host_str;
+                for (auto& it : unfinished_node_channel_ids) {
+                    unfinished_node_channel_host_str << 
_node_channels[it]->host() << ",";
+                    _node_channels[it]->cancel("timeout");

Review Comment:
   Need a case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to