This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 991dc7f [fix][routine-load] fix bug that routine load cannot cancel
task when append_data return error (#8457)
991dc7f is described below
commit 991dc7fc5cf53e359ea907d2c9d88f2916499a93
Author: caiconghui <[email protected]>
AuthorDate: Mon Mar 14 10:18:14 2022 +0800
[fix][routine-load] fix bug that routine load cannot cancel task when
append_data return error (#8457)
---
be/src/runtime/routine_load/data_consumer_group.cpp | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 5f6c789..7242fbe 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -116,7 +116,6 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext*
ctx) {
MonotonicStopWatch watch;
watch.start();
- Status st;
bool eos = false;
while (true) {
if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
@@ -140,12 +139,10 @@ Status
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
// waiting all threads finished
_thread_pool.shutdown();
_thread_pool.join();
-
if (!result_st.ok()) {
- // some of consumers encounter errors, cancel this task
+ kafka_pipe->cancel(result_st.get_error_msg());
return result_st;
}
-
kafka_pipe->finish();
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
@@ -159,9 +156,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext*
ctx) {
<< ", partition: " << msg->partition() << ", offset: "
<< msg->offset()
<< ", len: " << msg->len();
- (kafka_pipe.get()->*append_data)(static_cast<const
char*>(msg->payload()),
+ Status st = (kafka_pipe.get()->*append_data)(static_cast<const
char*>(msg->payload()),
static_cast<size_t>(msg->len()));
-
if (st.ok()) {
left_rows--;
left_bytes -= msg->len();
@@ -172,6 +168,12 @@ Status
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
// failed to append this msg, we must stop
LOG(WARNING) << "failed to append msg to pipe. grp: " <<
_grp_id;
eos = true;
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+ if (result_st.ok()) {
+ result_st = st;
+ }
+ }
}
delete msg;
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]