This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 300582f2e55 [branch-2.1](routine-load) fix be core when partial table 
load failed (#35622)
300582f2e55 is described below

commit 300582f2e551b456d5ee5470d2db0735884525bc
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu May 30 09:35:36 2024 +0800

    [branch-2.1](routine-load) fix be core when partial table load failed 
(#35622)
---
 be/src/io/fs/multi_table_pipe.cpp                  |   6 +-
 be/src/runtime/fragment_mgr.cpp                    |   7 +
 .../routine_load/routine_load_task_executor.cpp    |  34 ++++-
 .../test_multi_table_load_error.groovy             | 161 +++++++++++++++++++++
 4 files changed, 202 insertions(+), 6 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 6a4da0188dd..f7a72a55f5d 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -32,6 +32,7 @@
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
+#include "util/debug_points.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/thrift_util.h"
 #include "util/time.h"
@@ -224,8 +225,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
               << ", ctx: " << _ctx->brief();
     _unplanned_pipes.clear();
 
-    _inflight_cnt += params.size();
     for (auto& plan : params) {
+        DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed",
+                        { return 
Status::Aborted("MultiTablePipe.exec_plans.failed"); });
         if (!plan.__isset.table_name ||
             _planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
             return Status::Aborted("Missing vital param: table_name");
@@ -248,6 +250,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
             CHECK(false);
         }
 
+        _inflight_cnt++;
+
         RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
                 plan, [this](RuntimeState* state, Status* status) {
                     {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 89aaac984f9..ba30323addf 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -76,6 +76,7 @@
 #include "runtime/workload_group/workload_group_manager.h"
 #include "runtime/workload_management/workload_query_info.h"
 #include "service/backend_options.h"
+#include "util/debug_points.h"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "util/hash_util.hpp"
@@ -841,6 +842,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         }
         g_fragmentmgr_prepare_latency << (duration_ns / 1000);
 
+        DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
+                        { return 
Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
+
         std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
         RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
                 params.local_params[0], params.query_id, params.query_options, 
&handler,
@@ -919,6 +923,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             }
             g_fragmentmgr_prepare_latency << (duration_ns / 1000);
 
+            DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
+                            { return 
Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
+
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
             RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
                     local_params, params.query_id, params.query_options, 
&handler,
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 9399ccdf773..137a2c37f68 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -317,6 +317,20 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
         }                                                                  \
     } while (false);
 
+#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg)                            \
+    do {                                                                   \
+        Status _status_ = (stmt);                                          \
+        if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
+            err_handler(ctx, _status_, err_msg);                           \
+            cb(ctx);                                                       \
+            _status_ = ctx->future.get();                                  \
+            if (!_status_.ok()) {                                          \
+                LOG(ERROR) << "failed to get future, " << ctx->brief();    \
+            }                                                              \
+            return;                                                        \
+        }                                                                  \
+    } while (false);
+
     LOG(INFO) << "begin to execute routine load task: " << ctx->brief();
 
     // create data consumer group
@@ -371,17 +385,27 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
             std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
 
-    // start to consume, this may block a while
-    HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
-
     if (ctx->is_multi_table) {
+        Status st;
         // plan the rest of unplanned data
         auto multi_table_pipe = 
std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
-        HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
-                     "multi tables task executes plan error");
+        // start to consume, this may block a while
+        st = consumer_grp->start_all(ctx, kafka_pipe);
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
+        }
+        st = multi_table_pipe->request_and_exec_plans();
+        if (!st.ok()) {
+            multi_table_pipe->handle_consume_finished();
+            HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan 
error");
+        }
         // need memory order
         multi_table_pipe->handle_consume_finished();
         HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
+    } else {
+        // start to consume, this may block a while
+        HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming 
failed");
     }
 
     // wait for all consumers finished
diff --git 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
new file mode 100644
index 00000000000..7217109c353
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_multi_table_load_eror","nonConcurrent") {
+    def kafkaCsvTpoics = [
+                  "multi_table_csv",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    def load_with_injection = { injection ->
+        def jobName = "test_multi_table_load_eror"
+        def tableName = "dup_tbl_basic_multi_table"
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            try {
+                GetDebugPoint().enableDebugPointForAllBEs(injection)
+                sql new 
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+                sql new 
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+                sql "sync"
+
+                sql """
+                    CREATE ROUTINE LOAD ${jobName}
+                    COLUMNS TERMINATED BY "|"
+                    PROPERTIES
+                    (
+                        "max_batch_interval" = "5",
+                        "max_batch_rows" = "300000",
+                        "max_batch_size" = "209715200"
+                    )
+                    FROM KAFKA
+                    (
+                        "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                        "kafka_topic" = "multi_table_csv",
+                        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                    );
+                """
+                sql "sync"
+
+                def count = 0
+                while (true) {
+                    sleep(1000)
+                    def res = sql "show routine load for ${jobName}"
+                    def state = res[0][8].toString()
+                    if (state != "RUNNING") {
+                        count++
+                        if (count > 60) {
+                            assertEquals(1, 2)
+                        } 
+                        continue;
+                    }
+                    log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                    break;
+                }
+
+                count = 0
+                while (true) {
+                    sleep(1000)
+                    def res = sql "show routine load for ${jobName}"
+                    def state = res[0][8].toString()
+                    if (state == "RUNNING") {
+                        count++
+                        if (count > 60) {
+                            
GetDebugPoint().disableDebugPointForAllBEs(injection)
+                            break;
+                        }
+                        continue;
+                    }
+                    log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                    assertEquals(1, 2)
+                }
+
+                count = 0
+                while (true) {
+                    sleep(1000)
+                    def res = sql "show routine load for ${jobName}"
+                    log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+                    def json = parseJson(res[0][14])
+                    if (json.loadedRows.toString() == "0") {
+                        count++
+                        if (count > 60) {
+                            assertEquals(1, 2)
+                        } 
+                        continue;
+                    }
+                    break;
+                }
+            } finally {
+                sql "stop routine load for ${jobName}"
+                sql "DROP TABLE IF EXISTS ${tableName}"
+            }
+        }
+    }
+
+    load_with_injection("FragmentMgr.exec_plan_fragment.failed")
+    load_with_injection("MultiTablePipe.exec_plans.failed")
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to