This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 991dc7f  [fix][routine-load] fix bug that routine load cannot cancel 
task when append_data return error (#8457)
991dc7f is described below

commit 991dc7fc5cf53e359ea907d2c9d88f2916499a93
Author: caiconghui <[email protected]>
AuthorDate: Mon Mar 14 10:18:14 2022 +0800

    [fix][routine-load] fix bug that routine load cannot cancel task when 
append_data return error (#8457)
---
 be/src/runtime/routine_load/data_consumer_group.cpp | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 5f6c789..7242fbe 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -116,7 +116,6 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* 
ctx) {
 
     MonotonicStopWatch watch;
     watch.start();
-    Status st;
     bool eos = false;
     while (true) {
         if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
@@ -140,12 +139,10 @@ Status 
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
             // waiting all threads finished
             _thread_pool.shutdown();
             _thread_pool.join();
-
             if (!result_st.ok()) {
-                // some of consumers encounter errors, cancel this task
+                kafka_pipe->cancel(result_st.get_error_msg());
                 return result_st;
             }
-
             kafka_pipe->finish();
             ctx->kafka_info->cmt_offset = std::move(cmt_offset);
             ctx->receive_bytes = ctx->max_batch_size - left_bytes;
@@ -159,9 +156,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* 
ctx) {
                         << ", partition: " << msg->partition() << ", offset: " 
<< msg->offset()
                         << ", len: " << msg->len();
 
-            (kafka_pipe.get()->*append_data)(static_cast<const 
char*>(msg->payload()),
+            Status st = (kafka_pipe.get()->*append_data)(static_cast<const 
char*>(msg->payload()),
                                              static_cast<size_t>(msg->len()));
-
             if (st.ok()) {
                 left_rows--;
                 left_bytes -= msg->len();
@@ -172,6 +168,12 @@ Status 
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
                 // failed to append this msg, we must stop
                 LOG(WARNING) << "failed to append msg to pipe. grp: " << 
_grp_id;
                 eos = true;
+                {
+                    std::unique_lock<std::mutex> lock(_mutex);
+                    if (result_st.ok()) {
+                        result_st = st;
+                    }
+                }
             }
             delete msg;
         } else {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to