This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a35de811139 [branch-2.0][fix](multi-table) fix unknown source slot
descriptor when load multi table (#25762) (#26223)
a35de811139 is described below
commit a35de81113906f199cfbebe4ae48aad36513f0b9
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Nov 1 19:48:09 2023 +0800
[branch-2.0][fix](multi-table) fix unknown source slot descriptor when load
multi table (#25762) (#26223)
---
be/src/pipeline/pipeline_fragment_context.cpp | 14 +++-
be/src/pipeline/pipeline_fragment_context.h | 2 +
be/src/runtime/plan_fragment_executor.cpp | 14 ++--
be/src/runtime/plan_fragment_executor.h | 2 +
.../kafka/scripts/multi_table_csv1.csv | 3 +
.../kafka/scripts/multi_table_json1.json | 3 +
docker/thirdparties/run-thirdparties-docker.sh | 4 +-
.../load_p0/routine_load/test_routine_load.out | 14 +++-
.../load_p0/routine_load/test_routine_load.groovy | 89 ++++++++++++++++++++++
9 files changed, 131 insertions(+), 14 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index cd529ee835d..87578fbc665 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -246,13 +246,19 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
fragment_context->set_is_report_success(request.query_options.is_report_success);
}
- auto* desc_tbl = _query_ctx->desc_tbl;
- _runtime_state->set_desc_tbl(desc_tbl);
+ if (request.is_simplified_param) {
+ _desc_tbl = _query_ctx->desc_tbl;
+ } else {
+ DCHECK(request.__isset.desc_tbl);
+ RETURN_IF_ERROR(
+ DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl, &_desc_tbl));
+ }
+ _runtime_state->set_desc_tbl(_desc_tbl);
// 2. Create ExecNode to build pipeline with PipelineFragmentContext
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
ExecNode::create_tree(_runtime_state.get(),
_runtime_state->obj_pool(),
- request.fragment.plan, *desc_tbl,
&_root_plan));
+ request.fragment.plan, *_desc_tbl,
&_root_plan));
// Set senders of exchange nodes before pipeline build
std::vector<ExecNode*> exch_nodes;
@@ -308,7 +314,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, idx,
_root_plan->row_desc(),
- _runtime_state.get(), &_sink, *desc_tbl));
+ _runtime_state.get(), &_sink, *_desc_tbl));
}
_root_pipeline = fragment_context->add_pipeline();
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 287fac9e40a..ed79800ec60 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -200,6 +200,8 @@ private:
// If this is set to false, and '_is_report_success' is false as well,
// This executor will not report status to FE on being cancelled.
bool _is_report_on_cancel;
+
+ DescriptorTbl* _desc_tbl;
};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index de6f5f55810..df5f4b7d3e4 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -149,19 +149,19 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
}
// set up desc tbl
- DescriptorTbl* desc_tbl = nullptr;
- if (query_ctx != nullptr) {
- desc_tbl = query_ctx->desc_tbl;
+ if (request.is_simplified_param) {
+ _desc_tbl = query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
- RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl,
&desc_tbl));
+ RETURN_IF_ERROR(
+ DescriptorTbl::create(_runtime_state->obj_pool(),
request.desc_tbl, &_desc_tbl));
}
- _runtime_state->set_desc_tbl(desc_tbl);
+ _runtime_state->set_desc_tbl(_desc_tbl);
// set up plan
DCHECK(request.__isset.fragment);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree(
- _runtime_state.get(), obj_pool(), request.fragment.plan,
*desc_tbl, &_plan));
+ _runtime_state.get(), obj_pool(), request.fragment.plan,
*_desc_tbl, &_plan));
// set #senders of exchange nodes before calling Prepare()
std::vector<ExecNode*> exch_nodes;
@@ -212,7 +212,7 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, params,
- row_desc(), runtime_state(), &_sink, *desc_tbl));
+ row_desc(), runtime_state(), &_sink, *_desc_tbl));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state()));
RuntimeProfile* sink_profile = _sink->profile();
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index c95ddc75c17..f2a9f10d9a3 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -208,6 +208,8 @@ private:
OpentelemetrySpan _span;
+ DescriptorTbl* _desc_tbl;
+
ObjectPool* obj_pool() { return _runtime_state->obj_pool(); }
// typedef for TPlanFragmentExecParams.per_node_scan_ranges
diff --git
a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
new file mode 100644
index 00000000000..1df0d787733
--- /dev/null
+++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
@@ -0,0 +1,3 @@
+routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science",
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
+routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science",
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
+routine_load_mow_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science",
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
\ No newline at end of file
diff --git
a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
new file mode 100644
index 00000000000..0099b0b5432
--- /dev/null
+++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
@@ -0,0 +1,3 @@
+routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1",
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06":
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553",
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D",
"k16": "", "k17":
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme",
"k18": "\\N"}
+routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1",
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06":
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553",
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D",
"k16": "", "k17":
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme",
"k18": "\\N"}
+routine_load_mow_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1",
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06":
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553",
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D",
"k16": "", "k17":
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme",
"k18": "\\N"}
\ No newline at end of file
diff --git a/docker/thirdparties/run-thirdparties-docker.sh
b/docker/thirdparties/run-thirdparties-docker.sh
index 241ac6acc7b..e94ddbca9f8 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -257,7 +257,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
local container_id="$1"
local ip_host="$2"
- declare -a topics=("basic_data" "basic_array_data"
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone"
"basic_array_data_timezone" "multi_table_csv")
+ declare -a topics=("basic_data" "basic_array_data"
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone"
"basic_array_data_timezone" "multi_table_csv" "multi_table_csv1")
for topic in "${topics[@]}"; do
while IFS= read -r line; do
@@ -265,7 +265,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
done < "${ROOT}/docker-compose/kafka/scripts/${topic}.csv"
done
- declare -a json_topics=("basic_data_json" "basic_array_data_json"
"basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json")
+ declare -a json_topics=("basic_data_json" "basic_array_data_json"
"basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json"
"multi_table_json1")
for json_topic in "${json_topics[@]}"; do
echo ${json_topics}
diff --git a/regression-test/data/load_p0/routine_load/test_routine_load.out
b/regression-test/data/load_p0/routine_load/test_routine_load.out
index 161af660b47..4288223ca02 100644
--- a/regression-test/data/load_p0/routine_load/test_routine_load.out
+++ b/regression-test/data/load_p0/routine_load/test_routine_load.out
@@ -986,4 +986,16 @@
49 2023-08-08 false \N 16275 -2144851675
-2303421957908954634 -46526938720058765 -13141.143
-6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01
2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
-- !sql_multi_table_one_data --
-8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N
\ No newline at end of file
+8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N
+
+-- !sql_multi_table --
+49 2023-08-08 false \N 16275 -2144851675
-2303421957908954634 -46526938720058765 -13141.143
-6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01
2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 {}
+
+-- !sql_multi_table --
+49 2023-08-08 false \N 16275 -2144851675
-2303421957908954634 -46526938720058765 -13141.143
-6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01
2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 {}
+
+-- !sql_multi_table --
+8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 \N
+
+-- !sql_multi_table --
+8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18
2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
\N true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 \N
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index 4b659df4eff..d9560e312c5 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -31,6 +31,11 @@ suite("test_routine_load","p0") {
"dup_tbl_basic_multi_table",
]
+ def multiTables1 = [
+ "dup_tbl_basic",
+ "uniq_tbl_basic",
+ ]
+
def jobs = [
"dup_tbl_basic_job",
"uniq_tbl_basic_job",
@@ -127,6 +132,11 @@ suite("test_routine_load","p0") {
"multi_table_json",
]
+ def multiTableJobName1 = [
+ "multi_table_csv1",
+ "multi_table_json1",
+ ]
+
def formats = [
"csv",
"json",
@@ -980,4 +990,83 @@ suite("test_routine_load","p0") {
j++
}
}
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def j = 0
+ for (String jobName in multiTableJobName1) {
+ try {
+ for (String tableName in multiTables1) {
+ sql new
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+ sql new
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+ }
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "format" = "${formats[j]}",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${jobName}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ i = 0
+ for (String tableName in multiTables1) {
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state == "NEED_SCHEDULE") {
+ continue;
+ }
+ assertEquals(res[0][8].toString(), "RUNNING")
+ break;
+ }
+
+ def count = 0
+ def tableName1 = "routine_load_" + tableName
+ while (true) {
+ def res = sql "select count(*) from ${tableName1}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long
time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+
+ if (i <= 3) {
+ qt_sql_multi_table "select * from ${tableName1} order
by k00,k01"
+ } else {
+ qt_sql_multi_table "select * from ${tableName1} order
by k00"
+ }
+
+ i++
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ for (String tableName in multiTables1) {
+ sql new
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+ }
+ }
+ j++
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]