This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 3652fc31c3c [Pick 2.1] "Fix data loss when node channel been cancelled
before close wait (#36662)" (#36744)
3652fc31c3c is described below
commit 3652fc31c3cd7d45c6715d194e34c92d86927244
Author: plat1ko <[email protected]>
AuthorDate: Tue Jun 25 11:36:31 2024 +0800
[Pick 2.1] "Fix data loss when node channel been cancelled before close
wait (#36662)" (#36744)
## Proposed changes
Pick from https://github.com/apache/doris/pull/36662
---
be/src/vec/sink/writer/vtablet_writer.cpp | 39 +++++++++++++++++--------------
1 file changed, 21 insertions(+), 18 deletions(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 3aa9c799216..f385575e7c0 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1312,22 +1312,22 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}
-static Status cancel_channel_and_check_intolerable_failure(
- Status status, const std::string& err_msg, const
std::shared_ptr<IndexChannel> ich,
- const std::shared_ptr<VNodeChannel> nch) {
- LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " <<
err_msg;
- ich->mark_as_failed(nch.get(), err_msg, -1);
+static Status cancel_channel_and_check_intolerable_failure(Status status,
+ const std::string&
err_msg,
+ IndexChannel& ich,
VNodeChannel& nch) {
+ LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " <<
err_msg;
+ ich.mark_as_failed(&nch, err_msg, -1);
// cancel the node channel in best effort
- nch->cancel(err_msg);
+ nch.cancel(err_msg);
// check if index has intolerable failure
- Status index_st = ich->check_intolerable_failure();
+ Status index_st = ich.check_intolerable_failure();
if (!index_st.ok()) {
- status = index_st;
- } else if (Status st = ich->check_tablet_received_rows_consistency();
!st.ok()) {
- status = st;
- } else if (Status st = ich->check_tablet_filtered_rows_consistency();
!st.ok()) {
- status = st;
+ status = std::move(index_st);
+ } else if (Status st = ich.check_tablet_received_rows_consistency();
!st.ok()) {
+ status = std::move(st);
+ } else if (Status st = ich.check_tablet_filtered_rows_consistency();
!st.ok()) {
+ status = std::move(st);
}
return status;
}
@@ -1403,7 +1403,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
ch->mark_close(true);
if (ch->is_cancelled()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
+ std::move(status),
ch->get_cancel_msg(), *index_channel,
+ *ch);
}
});
if (!status.ok()) {
@@ -1419,7 +1420,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
<< "close1 wait finished!";
if (!s.ok()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, s.to_string(), index_channel,
ch);
+ std::move(status), s.to_string(),
*index_channel, *ch);
}
});
if (!status.ok()) {
@@ -1437,7 +1438,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
ch->mark_close();
if (ch->is_cancelled()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
+ std::move(status),
ch->get_cancel_msg(), *index_channel,
+ *ch);
}
});
} else { // not has_incremental_node_channel
@@ -1451,7 +1453,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
ch->mark_close();
if (ch->is_cancelled()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
+ std::move(status),
ch->get_cancel_msg(), *index_channel,
+ *ch);
}
});
}
@@ -1507,7 +1510,7 @@ Status VTabletWriter::close(Status exec_status) {
&total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_wait_exec_time_ns,
&wait_exec_time,
&total_add_batch_num](const
std::shared_ptr<VNodeChannel>& ch) {
- if (!status.ok() || ch->is_closed()) {
+ if (!status.ok() || (ch->is_closed() &&
!ch->is_cancelled())) {
return;
}
// in pipeline, all node channels are done or
canceled, will not block.
@@ -1515,7 +1518,7 @@ Status VTabletWriter::close(Status exec_status) {
auto s = ch->close_wait(_state);
if (!s.ok()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, s.to_string(), index_channel, ch);
+ std::move(status), s.to_string(),
*index_channel, *ch);
}
ch->time_report(&node_add_batch_counter_map,
&serialize_batch_ns,
&channel_stat, &queue_push_lock_ns,
&actual_consume_ns,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]