This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 3652fc31c3c [Pick 2.1] "Fix data loss when node channel been cancelled 
before close wait (#36662)" (#36744)
3652fc31c3c is described below

commit 3652fc31c3cd7d45c6715d194e34c92d86927244
Author: plat1ko <[email protected]>
AuthorDate: Tue Jun 25 11:36:31 2024 +0800

    [Pick 2.1] "Fix data loss when node channel been cancelled before close 
wait (#36662)" (#36744)
    
    ## Proposed changes
    
    Pick from https://github.com/apache/doris/pull/36662
---
 be/src/vec/sink/writer/vtablet_writer.cpp | 39 +++++++++++++++++--------------
 1 file changed, 21 insertions(+), 18 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 3aa9c799216..f385575e7c0 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1312,22 +1312,22 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
-static Status cancel_channel_and_check_intolerable_failure(
-        Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
-        const std::shared_ptr<VNodeChannel> nch) {
-    LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << 
err_msg;
-    ich->mark_as_failed(nch.get(), err_msg, -1);
+static Status cancel_channel_and_check_intolerable_failure(Status status,
+                                                           const std::string& 
err_msg,
+                                                           IndexChannel& ich, 
VNodeChannel& nch) {
+    LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich.mark_as_failed(&nch, err_msg, -1);
     // cancel the node channel in best effort
-    nch->cancel(err_msg);
+    nch.cancel(err_msg);
 
     // check if index has intolerable failure
-    Status index_st = ich->check_intolerable_failure();
+    Status index_st = ich.check_intolerable_failure();
     if (!index_st.ok()) {
-        status = index_st;
-    } else if (Status st = ich->check_tablet_received_rows_consistency(); 
!st.ok()) {
-        status = st;
-    } else if (Status st = ich->check_tablet_filtered_rows_consistency(); 
!st.ok()) {
-        status = st;
+        status = std::move(index_st);
+    } else if (Status st = ich.check_tablet_received_rows_consistency(); 
!st.ok()) {
+        status = std::move(st);
+    } else if (Status st = ich.check_tablet_filtered_rows_consistency(); 
!st.ok()) {
+        status = std::move(st);
     }
     return status;
 }
@@ -1403,7 +1403,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                             ch->mark_close(true);
                             if (ch->is_cancelled()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                                        std::move(status), 
ch->get_cancel_msg(), *index_channel,
+                                        *ch);
                             }
                         });
                 if (!status.ok()) {
@@ -1419,7 +1420,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                                        << "close1 wait finished!";
                             if (!s.ok()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, s.to_string(), index_channel, 
ch);
+                                        std::move(status), s.to_string(), 
*index_channel, *ch);
                             }
                         });
                 if (!status.ok()) {
@@ -1437,7 +1438,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                             ch->mark_close();
                             if (ch->is_cancelled()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                                        std::move(status), 
ch->get_cancel_msg(), *index_channel,
+                                        *ch);
                             }
                         });
             } else { // not has_incremental_node_channel
@@ -1451,7 +1453,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                             ch->mark_close();
                             if (ch->is_cancelled()) {
                                 status = 
cancel_channel_and_check_intolerable_failure(
-                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                                        std::move(status), 
ch->get_cancel_msg(), *index_channel,
+                                        *ch);
                             }
                         });
             }
@@ -1507,7 +1510,7 @@ Status VTabletWriter::close(Status exec_status) {
                      &total_add_batch_exec_time_ns, &add_batch_exec_time, 
&total_wait_exec_time_ns,
                      &wait_exec_time,
                      &total_add_batch_num](const 
std::shared_ptr<VNodeChannel>& ch) {
-                        if (!status.ok() || ch->is_closed()) {
+                        if (!status.ok() || (ch->is_closed() && 
!ch->is_cancelled())) {
                             return;
                         }
                         // in pipeline, all node channels are done or 
canceled, will not block.
@@ -1515,7 +1518,7 @@ Status VTabletWriter::close(Status exec_status) {
                         auto s = ch->close_wait(_state);
                         if (!s.ok()) {
                             status = 
cancel_channel_and_check_intolerable_failure(
-                                    status, s.to_string(), index_channel, ch);
+                                    std::move(status), s.to_string(), 
*index_channel, *ch);
                         }
                         ch->time_report(&node_add_batch_counter_map, 
&serialize_batch_ns,
                                         &channel_stat, &queue_push_lock_ns, 
&actual_consume_ns,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to