This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e783ef716f4 [fix](multi-table) fix unknown source slot descriptor when
load multi table (#25762)
e783ef716f4 is described below
commit e783ef716f41d63948e626555d7c9c66d6e9d435
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Oct 25 21:52:01 2023 +0800
[fix](multi-table) fix unknown source slot descriptor when load multi table
(#25762)
---
be/src/pipeline/pipeline_fragment_context.cpp | 14 +++-
be/src/pipeline/pipeline_fragment_context.h | 2 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 14 +++-
be/src/runtime/plan_fragment_executor.cpp | 14 +++-
be/src/runtime/plan_fragment_executor.h | 2 +
.../kafka/scripts/multi_table_csv1.csv | 3 +
.../kafka/scripts/multi_table_json1.json | 3 +
docker/thirdparties/run-thirdparties-docker.sh | 4 +-
.../load_p0/routine_load/test_routine_load.out | 14 +++-
.../load_p0/routine_load/test_routine_load.groovy | 89 ++++++++++++++++++++++
10 files changed, 144 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 226b1f06951..709e4df7652 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -254,13 +254,19 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
fragment_context->set_is_report_success(request.query_options.is_report_success);
}
- auto* desc_tbl = _query_ctx->desc_tbl;
- _runtime_state->set_desc_tbl(desc_tbl);
+ if (request.is_simplified_param) {
+ _desc_tbl = _query_ctx->desc_tbl;
+ } else {
+ DCHECK(request.__isset.desc_tbl);
+ RETURN_IF_ERROR(
+ DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl, &_desc_tbl));
+ }
+ _runtime_state->set_desc_tbl(_desc_tbl);
// 2. Create ExecNode to build pipeline with PipelineFragmentContext
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
ExecNode::create_tree(_runtime_state.get(),
_runtime_state->obj_pool(),
- request.fragment.plan, *desc_tbl,
&_root_plan));
+ request.fragment.plan, *_desc_tbl,
&_root_plan));
// Set senders of exchange nodes before pipeline build
std::vector<ExecNode*> exch_nodes;
@@ -316,7 +322,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, idx,
_root_plan->row_desc(),
- _runtime_state.get(), &_sink, *desc_tbl));
+ _runtime_state.get(), &_sink, *_desc_tbl));
}
_root_pipeline = fragment_context->add_pipeline();
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index bd2e156dfdf..ffbfaef987a 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -216,6 +216,8 @@ protected:
// profile reporting-related
report_status_callback _report_status_cb;
+ DescriptorTbl* _desc_tbl;
+
private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
std::vector<std::unique_ptr<PipelineTask>> _tasks;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 333eb9f8161..a1f69e6f02b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -202,8 +202,14 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
_runtime_state->set_load_job_id(request.load_job_id);
}
- auto* desc_tbl = _query_ctx->desc_tbl;
- _runtime_state->set_desc_tbl(desc_tbl);
+ if (request.is_simplified_param) {
+ _desc_tbl = _query_ctx->desc_tbl;
+ } else {
+ DCHECK(request.__isset.desc_tbl);
+ RETURN_IF_ERROR(
+ DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl, &_desc_tbl));
+ }
+ _runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
// 2. Build pipelines with operators in this fragment.
@@ -217,7 +223,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs,
- request, root_pipeline->output_row_desc(), _runtime_state.get(),
*desc_tbl,
+ request, root_pipeline->output_row_desc(), _runtime_state.get(),
*_desc_tbl,
root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));
@@ -407,7 +413,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_load_job_id(request.load_job_id);
}
- _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
+ _runtime_states[i]->set_desc_tbl(_desc_tbl);
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index fd1ccf09c8c..e96b086ad44 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -165,13 +165,19 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
}
// set up desc tbl
- DescriptorTbl* desc_tbl = _query_ctx->desc_tbl;
- _runtime_state->set_desc_tbl(desc_tbl);
+ if (request.is_simplified_param) {
+ _desc_tbl = _query_ctx->desc_tbl;
+ } else {
+ DCHECK(request.__isset.desc_tbl);
+ RETURN_IF_ERROR(
+ DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl, &_desc_tbl));
+ }
+ _runtime_state->set_desc_tbl(_desc_tbl);
// set up plan
DCHECK(request.__isset.fragment);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree(
- _runtime_state.get(), obj_pool(), request.fragment.plan,
*desc_tbl, &_plan));
+ _runtime_state.get(), obj_pool(), request.fragment.plan,
*_desc_tbl, &_plan));
// set #senders of exchange nodes before calling Prepare()
std::vector<ExecNode*> exch_nodes;
@@ -222,7 +228,7 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, params,
- row_desc(), runtime_state(), &_sink, *desc_tbl));
+ row_desc(), runtime_state(), &_sink, *_desc_tbl));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state()));
RuntimeProfile* sink_profile = _sink->profile();
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index 332c7fce591..bb39596ff10 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -242,6 +242,8 @@ private:
bool _group_commit = false;
+ DescriptorTbl* _desc_tbl;
+
ObjectPool* obj_pool() { return _runtime_state->obj_pool(); }
// typedef for TPlanFragmentExecParams.per_node_scan_ranges
diff --git
a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
new file mode 100644
index 00000000000..1df0d787733
--- /dev/null
+++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
@@ -0,0 +1,3 @@
+routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science",
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
+routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science",
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
+routine_load_mow_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science",
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
\ No newline at end of file
diff --git
a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
new file mode 100644
index 00000000000..0099b0b5432
--- /dev/null
+++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
@@ -0,0 +1,3 @@
+routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1",
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06":
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553",
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D",
"k16": "", "k17":
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme",
"k18": "\\N"}
+routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1",
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06":
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553",
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D",
"k16": "", "k17":
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme",
"k18": "\\N"}
+routine_load_mow_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1",
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06":
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553",
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D",
"k16": "", "k17":
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme",
"k18": "\\N"}
\ No newline at end of file
diff --git a/docker/thirdparties/run-thirdparties-docker.sh
b/docker/thirdparties/run-thirdparties-docker.sh
index de1422d1afc..d4ed772c77b 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -258,7 +258,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
local ip_host="$2"
local backup_dir=/home/work/pipline/backup_center
- declare -a topics=("basic_data" "basic_array_data"
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone"
"basic_array_data_timezone" "multi_table_csv")
+ declare -a topics=("basic_data" "basic_array_data"
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone"
"basic_array_data_timezone" "multi_table_csv" "multi_table_csv1")
for topic in "${topics[@]}"; do
while IFS= read -r line; do
@@ -269,7 +269,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
done < "${ROOT}/docker-compose/kafka/scripts/${topic}.csv"
done
- declare -a json_topics=("basic_data_json" "basic_array_data_json"
"basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json")
+ declare -a json_topics=("basic_data_json" "basic_array_data_json"
"basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json"
"multi_table_json1")
for json_topic in "${json_topics[@]}"; do
echo ${json_topics}
diff --git a/regression-test/data/load_p0/routine_load/test_routine_load.out
b/regression-test/data/load_p0/routine_load/test_routine_load.out
index 161af660b47..4288223ca02 100644
--- a/regression-test/data/load_p0/routine_load/test_routine_load.out
+++ b/regression-test/data/load_p0/routine_load/test_routine_load.out
@@ -986,4 +986,16 @@
49 2023-08-08 false \N 16275 -2144851675
-2303421957908954634 -46526938720058765 -13141.143
-6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01
2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
-- !sql_multi_table_one_data --
-8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N
\ No newline at end of file
+8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N
+
+-- !sql_multi_table --
+49 2023-08-08 false \N 16275 -2144851675
-2303421957908954634 -46526938720058765 -13141.143
-6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01
2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 {}
+
+-- !sql_multi_table --
+49 2023-08-08 false \N 16275 -2144851675
-2303421957908954634 -46526938720058765 -13141.143
-6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01
2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 {}
+
+-- !sql_multi_table --
+8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 \N
+
+-- !sql_multi_table --
+8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 \N
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index 4b659df4eff..d9560e312c5 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -31,6 +31,11 @@ suite("test_routine_load","p0") {
"dup_tbl_basic_multi_table",
]
+ def multiTables1 = [
+ "dup_tbl_basic",
+ "uniq_tbl_basic",
+ ]
+
def jobs = [
"dup_tbl_basic_job",
"uniq_tbl_basic_job",
@@ -127,6 +132,11 @@ suite("test_routine_load","p0") {
"multi_table_json",
]
+ def multiTableJobName1 = [
+ "multi_table_csv1",
+ "multi_table_json1",
+ ]
+
def formats = [
"csv",
"json",
@@ -980,4 +990,83 @@ suite("test_routine_load","p0") {
j++
}
}
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def j = 0
+ for (String jobName in multiTableJobName1) {
+ try {
+ for (String tableName in multiTables1) {
+ sql new
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+ sql new
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+ }
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "format" = "${formats[j]}",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${jobName}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ i = 0
+ for (String tableName in multiTables1) {
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state == "NEED_SCHEDULE") {
+ continue;
+ }
+ assertEquals(res[0][8].toString(), "RUNNING")
+ break;
+ }
+
+ def count = 0
+ def tableName1 = "routine_load_" + tableName
+ while (true) {
+ def res = sql "select count(*) from ${tableName1}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long
time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+
+ if (i <= 3) {
+ qt_sql_multi_table "select * from ${tableName1} order
by k00,k01"
+ } else {
+ qt_sql_multi_table "select * from ${tableName1} order
by k00"
+ }
+
+ i++
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ for (String tableName in multiTables1) {
+ sql new
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+ }
+ }
+ j++
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]