yiguolei commented on code in PR #20771:
URL: https://github.com/apache/doris/pull/20771#discussion_r1233009684
##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1393,133 +1385,226 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
return Status::OK();
}
-Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
- if (_closed) {
- return _close_status;
+Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
+ Status status, const std::string& err_msg, const
std::shared_ptr<IndexChannel> ich,
+ const std::shared_ptr<VNodeChannel> nch) {
+ LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " <<
err_msg;
+ ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
+ // cancel the node channel in best effort
+ nch->cancel(err_msg);
+
+ // check if index has intolerable failure
+ Status index_st = ich->check_intolerable_failure();
+ if (!index_st.ok()) {
+ status = index_st;
+ } else if (Status st = ich->check_tablet_received_rows_consistency();
!st.ok()) {
+ status = st;
+ }
+ return status;
+}
+
+void VOlapTableSink::_cancel_all_channel(Status status, const std::string&
err_msg) {
+ for (auto channel : _channels) {
Review Comment:
auto& 否则shared ptr 可能会copy
channels 重命名一下吧,直接叫index channels
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]