This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new f6040674ca5 branch-3.1: [fix](load) fix multi table load plan fail
after restart master Fe or leader change #53799 (#53830)
f6040674ca5 is described below
commit f6040674ca551ce784a0da305c1d5d8af4db32c3
Author: hui lai <[email protected]>
AuthorDate: Fri Jul 25 14:10:06 2025 +0800
branch-3.1: [fix](load) fix multi table load plan fail after restart master
Fe or leader change #53799 (#53830)
pick #53799
---
.../doris/load/routineload/KafkaTaskInfo.java | 2 +-
.../doris/load/routineload/RoutineLoadJob.java | 6 -
.../test_multi_table_load_restart.groovy | 148 +++++++++++++++++++++
3 files changed, 149 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index e3292dc671f..0474f0d4fdc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -109,7 +109,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN);
}
tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode());
- tRoutineLoadTask.setQualifiedUser(routineLoadJob.getQualifiedUser());
+
tRoutineLoadTask.setQualifiedUser(routineLoadJob.getUserIdentity().getQualifiedUser());
tRoutineLoadTask.setCloudCluster(routineLoadJob.getCloudCluster());
return tRoutineLoadTask;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 6dfd7beb7f6..0f9c9636b97 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -281,7 +281,6 @@ public abstract class RoutineLoadJob
protected byte escape = 0;
// use for cloud cluster mode
- protected String qualifiedUser;
protected String cloudCluster;
public void setTypeRead(boolean isTypeRead) {
@@ -332,7 +331,6 @@ public abstract class RoutineLoadJob
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
- this.qualifiedUser = ConnectContext.get().getQualifiedUser();
try {
this.cloudCluster = ConnectContext.get().getCloudCluster();
} catch (ComputeGroupException e) {
@@ -761,10 +759,6 @@ public abstract class RoutineLoadJob
this.comment = comment;
}
- public String getQualifiedUser() {
- return qualifiedUser;
- }
-
public String getCloudCluster() {
return cloudCluster;
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_restart.groovy
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_restart.groovy
new file mode 100644
index 00000000000..d89f513eb5d
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_restart.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_multi_table_load_restart","docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ docker(options) {
+ def kafkaCsvTpoics = [
+ "test_multi_table_load_restart",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def testData = [
+
"test_multi_table_load_restart|1,test_data_1,2023-01-01,value1,2023-01-01
10:00:00,extra1"
+ ]
+ testData.each { line ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null,
line)
+ producer.send(record)
+ }
+ }
+
+ def tableName = "test_multi_table_load_restart"
+ def job = "test_multi_table_load_restart"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default:
1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def testData = [
+
"test_multi_table_load_restart|2,test_data_1,2023-01-01,value1,2023-01-01
10:00:00,extra1"
+ ]
+ testData.each { line ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null,
line)
+ producer.send(record)
+ }
+ }
+ producer.close()
+
+ def count = 0
+ def maxWaitCount = 60
+ while (count < maxWaitCount) {
+ def state = sql "show routine load for ${job}"
+ def routineLoadState = state[0][8].toString()
+ def statistic = state[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+ def rowCount = sql "select count(*) from ${tableName}"
+ if (routineLoadState == "RUNNING" && rowCount[0][0] == 2) {
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ } catch (Exception e) {
+ logger.error("Test failed with exception: ${e.message}")
+ } finally {
+ try {
+ sql "stop routine load for ${job}"
+ } catch (Exception e) {
+ logger.warn("Failed to stop routine load job:
${e.message}")
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]