This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 300582f2e55 [branch-2.1](routine-load) fix be core when partial table
load failed (#35622)
300582f2e55 is described below
commit 300582f2e551b456d5ee5470d2db0735884525bc
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu May 30 09:35:36 2024 +0800
[branch-2.1](routine-load) fix be core when partial table load failed
(#35622)
---
be/src/io/fs/multi_table_pipe.cpp | 6 +-
be/src/runtime/fragment_mgr.cpp | 7 +
.../routine_load/routine_load_task_executor.cpp | 34 ++++-
.../test_multi_table_load_error.groovy | 161 +++++++++++++++++++++
4 files changed, 202 insertions(+), 6 deletions(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index 6a4da0188dd..f7a72a55f5d 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -32,6 +32,7 @@
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "util/debug_points.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_util.h"
#include "util/time.h"
@@ -224,8 +225,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
<< ", ctx: " << _ctx->brief();
_unplanned_pipes.clear();
- _inflight_cnt += params.size();
for (auto& plan : params) {
+ DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed",
+ { return
Status::Aborted("MultiTablePipe.exec_plans.failed"); });
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
return Status::Aborted("Missing vital param: table_name");
@@ -248,6 +250,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
CHECK(false);
}
+ _inflight_cnt++;
+
RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
plan, [this](RuntimeState* state, Status* status) {
{
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 89aaac984f9..ba30323addf 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -76,6 +76,7 @@
#include "runtime/workload_group/workload_group_manager.h"
#include "runtime/workload_management/workload_query_info.h"
#include "service/backend_options.h"
+#include "util/debug_points.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/hash_util.hpp"
@@ -841,6 +842,9 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
+ DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
+ { return
Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
+
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
params.local_params[0], params.query_id, params.query_options,
&handler,
@@ -919,6 +923,9 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
+ DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
+ { return
Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
+
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
local_params, params.query_id, params.query_options,
&handler,
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 9399ccdf773..137a2c37f68 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -317,6 +317,20 @@ void
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
} \
} while (false);
+#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg) \
+ do { \
+ Status _status_ = (stmt); \
+ if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
+ err_handler(ctx, _status_, err_msg); \
+ cb(ctx); \
+ _status_ = ctx->future.get(); \
+ if (!_status_.ok()) { \
+ LOG(ERROR) << "failed to get future, " << ctx->brief(); \
+ } \
+ return; \
+ } \
+ } while (false);
+
LOG(INFO) << "begin to execute routine load task: " << ctx->brief();
// create data consumer group
@@ -371,17 +385,27 @@ void
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
- // start to consume, this may block a while
- HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
-
if (ctx->is_multi_table) {
+ Status st;
// plan the rest of unplanned data
auto multi_table_pipe =
std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
- HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
- "multi tables task executes plan error");
+ // start to consume, this may block a while
+ st = consumer_grp->start_all(ctx, kafka_pipe);
+ if (!st.ok()) {
+ multi_table_pipe->handle_consume_finished();
+ HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
+ }
+ st = multi_table_pipe->request_and_exec_plans();
+ if (!st.ok()) {
+ multi_table_pipe->handle_consume_finished();
+ HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan
error");
+ }
// need memory order
multi_table_pipe->handle_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
+ } else {
+ // start to consume, this may block a while
+ HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming
failed");
}
// wait for all consumers finished
diff --git
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
new file mode 100644
index 00000000000..7217109c353
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_multi_table_load_eror","nonConcurrent") {
+ def kafkaCsvTpoics = [
+ "multi_table_csv",
+ ]
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ def load_with_injection = { injection ->
+ def jobName = "test_multi_table_load_eror"
+ def tableName = "dup_tbl_basic_multi_table"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ sql new
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+ sql new
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+ sql "sync"
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "multi_table_csv",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state != "RUNNING") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ log.info("reason of state changed:
${res[0][17].toString()}".toString())
+ break;
+ }
+
+ count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state == "RUNNING") {
+ count++
+ if (count > 60) {
+
GetDebugPoint().disableDebugPointForAllBEs(injection)
+ break;
+ }
+ continue;
+ }
+ log.info("reason of state changed:
${res[0][17].toString()}".toString())
+ assertEquals(1, 2)
+ }
+
+ count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ log.info("routine load statistic:
${res[0][14].toString()}".toString())
+ def json = parseJson(res[0][14])
+ if (json.loadedRows.toString() == "0") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ break;
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+ }
+
+ load_with_injection("FragmentMgr.exec_plan_fragment.failed")
+ load_with_injection("MultiTablePipe.exec_plans.failed")
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]