github-actions[bot] commented on code in PR #35622:
URL: https://github.com/apache/doris/pull/35622#discussion_r1619677119


##########
be/src/runtime/routine_load/routine_load_task_executor.cpp:
##########
@@ -371,17 +385,27 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
             std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
 
-    // start to consume, this may block a while
-    HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
-
     if (ctx->is_multi_table) {
+        Status st;
         // plan the rest of unplanned data
         auto multi_table_pipe = 
std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
-        HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
-                     "multi tables task executes plan error");
+        // start to consume, this may block a while
+        st = consumer_grp->start_all(ctx, kafka_pipe);
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");

Review Comment:
   warning: boolean expression can be simplified by DeMorgan's theorem 
[readability-simplify-boolean-expr]
   ```cpp
               HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
               ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/runtime/routine_load/routine_load_task_executor.cpp:322:** expanded 
from macro 'HANDLE_MULTI_TABLE_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
               ^
   ```
   **be/src/common/compiler_util.h:35:** expanded from macro 'UNLIKELY'
   ```cpp
   #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
                                            ^
   ```
   
   </details>
   



##########
be/src/runtime/routine_load/routine_load_task_executor.cpp:
##########
@@ -371,17 +385,27 @@
     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
             std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
 
-    // start to consume, this may block a while
-    HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
-
     if (ctx->is_multi_table) {
+        Status st;
         // plan the rest of unplanned data
         auto multi_table_pipe = 
std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
-        HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
-                     "multi tables task executes plan error");
+        // start to consume, this may block a while
+        st = consumer_grp->start_all(ctx, kafka_pipe);
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
+        }
+        st = multi_table_pipe->request_and_exec_plans();
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan 
error");
+        }
         // need memory order
         multi_table_pipe->handle_consume_finished();
         HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
+    } else {
+        // start to consume, this may block a while
+        HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming 
failed");

Review Comment:
   warning: boolean expression can be simplified by DeMorgan's theorem 
[readability-simplify-boolean-expr]
   ```cpp
           HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming 
failed");
           ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/runtime/routine_load/routine_load_task_executor.cpp:312:** expanded 
from macro 'HANDLE_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
               ^
   ```
   **be/src/common/compiler_util.h:35:** expanded from macro 'UNLIKELY'
   ```cpp
   #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
                                            ^
   ```
   
   </details>
   



##########
be/src/runtime/routine_load/routine_load_task_executor.cpp:
##########
@@ -371,17 +385,27 @@
     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
             std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
 
-    // start to consume, this may block a while
-    HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
-
     if (ctx->is_multi_table) {
+        Status st;
         // plan the rest of unplanned data
         auto multi_table_pipe = 
std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
-        HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
-                     "multi tables task executes plan error");
+        // start to consume, this may block a while
+        st = consumer_grp->start_all(ctx, kafka_pipe);
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
+        }
+        st = multi_table_pipe->request_and_exec_plans();
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan 
error");

Review Comment:
   warning: boolean expression can be simplified by DeMorgan's theorem 
[readability-simplify-boolean-expr]
   ```cpp
               HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan 
error");
               ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/runtime/routine_load/routine_load_task_executor.cpp:322:** expanded 
from macro 'HANDLE_MULTI_TABLE_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
               ^
   ```
   **be/src/common/compiler_util.h:35:** expanded from macro 'UNLIKELY'
   ```cpp
   #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
                                            ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to