This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 60e195964bf branch-3.1: [enhance](job) delay load task schedule when
transaction fail #57092 (#57130)
60e195964bf is described below
commit 60e195964bf8788432fb62a212cd8966531eaba2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 14:55:28 2025 +0800
branch-3.1: [enhance](job) delay load task schedule when transaction fail
#57092 (#57130)
Cherry-picked from #57092
Co-authored-by: hui lai <[email protected]>
---
.../load/routineload/KafkaRoutineLoadJob.java | 3 +-
.../doris/load/routineload/RoutineLoadJob.java | 8 +-
.../load/routineload/RoutineLoadTaskInfo.java | 10 ++
.../load/routineload/RoutineLoadTaskScheduler.java | 4 +-
.../regression/util/RoutineLoadTestUtils.groovy | 152 +++++++++++++++++++++
.../test_routine_load_delay_schedule.groovy | 83 +++++++++++
6 files changed, 253 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 896f1a1bcd9..ac92536f57f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -352,11 +352,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo) {
+ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo, boolean delaySchedule) {
KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
// add new task
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
((KafkaProgress)
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()),
isMultiTable());
+ kafkaTaskInfo.setDelaySchedule(delaySchedule);
// remove old task
routineLoadTaskInfoList.remove(routineLoadTaskInfo);
// add new task
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 197c2c536fa..e050c6204df 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -804,7 +804,7 @@ public abstract class RoutineLoadJob
// and after renew, the previous task is removed from
routineLoadTaskInfoList,
// so task can no longer be committed successfully.
// the already committed task will not be handled here.
- RoutineLoadTaskInfo newTask =
unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newTask =
unprotectRenewTask(routineLoadTaskInfo, false);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
}
}
@@ -984,7 +984,7 @@ public abstract class RoutineLoadJob
return 0L;
}
- abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo);
+ abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo, boolean delaySchedule);
// call before first scheduling
// derived class can override this.
@@ -1204,7 +1204,7 @@ public abstract class RoutineLoadJob
}
// create new task
- RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo, false);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
} finally {
writeUnlock();
@@ -1356,7 +1356,7 @@ public abstract class RoutineLoadJob
if (state == JobState.RUNNING) {
if (txnStatus == TransactionStatus.ABORTED) {
- RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newRoutineLoadTaskInfo =
unprotectRenewTask(routineLoadTaskInfo, true);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
} else if (txnStatus == TransactionStatus.COMMITTED) {
// this txn is just COMMITTED, create new task when the this
txn is VISIBLE
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 0c662ce765d..2708bbb6ebe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -38,6 +38,8 @@ import
org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.collect.Lists;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -76,6 +78,10 @@ public abstract class RoutineLoadTaskInfo {
protected boolean isEof = false;
+ @Getter
+ @Setter
+ protected boolean delaySchedule = false;
+
// this status will be set when corresponding transaction's status is
changed.
// so that user or other logic can know the status of the corresponding
txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
@@ -153,6 +159,10 @@ public abstract class RoutineLoadTaskInfo {
return isEof;
}
+ public boolean needDedalySchedule() {
+ return delaySchedule || isEof;
+ }
+
public boolean isTimeout() {
if (txnStatus == TransactionStatus.COMMITTED || txnStatus ==
TransactionStatus.VISIBLE) {
// the corresponding txn is already finished, this task can not be
treated as timeout.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index d1b5a6f73e8..7d576ce594b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -104,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
RoutineLoadTaskInfo routineLoadTaskInfo =
needScheduleTasksQueue.take();
// try to delay scheduling tasks that are perceived as Eof to
MaxBatchInterval
// to avoid to much small transaction
- if (routineLoadTaskInfo.getIsEof()) {
+ if (routineLoadTaskInfo.needDedalySchedule()) {
RoutineLoadJob routineLoadJob =
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
if (System.currentTimeMillis() -
routineLoadTaskInfo.getLastScheduledTime()
< routineLoadJob.getMaxBatchIntervalS() * 1000) {
@@ -258,7 +258,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
}
// for other errors (network issues, BE restart, etc.), reschedule
immediately
- RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo);
+ RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
addTaskInQueue(newTask);
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
new file mode 100644
index 00000000000..9a5e27d2680
--- /dev/null
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.regression.util
+
+import groovy.json.JsonSlurper
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.junit.Assert
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+class RoutineLoadTestUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(RoutineLoadTestUtils.class)
+
+ static boolean isKafkaTestEnabled(context) {
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ return enabled != null && enabled.equalsIgnoreCase("true")
+ }
+
+ static String getKafkaBroker(context) {
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ return "${externalEnvIp}:${kafka_port}"
+ }
+
+ static KafkaProducer createKafkaProducer(String kafkaBroker) {
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker)
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+ def producer = new KafkaProducer<>(props)
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ try {
+ logger.info("Kafka connecting: ${kafkaBroker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+ return producer
+ }
+
+ static void sendTestDataToKafka(KafkaProducer producer, List<String>
topics, List<String> testData = null) {
+ if (testData == null) {
+ testData = [
+ "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+ "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+ ]
+ }
+ for (String topic in topics) {
+ testData.each { line ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(topic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ static void checkTaskTimeout(Closure sqlRunner, String jobName, String
expectedTimeout, int maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName =
'${jobName}'")
+ if (res.size() > 0) {
+ logger.info("res: ${res[0].toString()}")
+ logger.info("timeout: ${res[0][6].toString()}")
+ Assert.assertEquals(res[0][6].toString(), expectedTimeout)
+ break;
+ }
+ if (count > maxAttempts) {
+ Assert.assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ }
+
+ static int waitForTaskFinish(Closure sqlRunner, String job, String
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ def res = sqlRunner.call("show routine load for ${job}")
+ def routineLoadState = res[0][8].toString()
+ def statistic = res[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+ def rowCount = sqlRunner.call("select count(*) from ${tableName}")
+ if (routineLoadState == "RUNNING" && rowCount[0][0] >
expectedMinRows) {
+ break
+ }
+ if (count > maxAttempts) {
+ Assert.assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ return count
+ }
+
+ static void waitForTaskAbort(Closure sqlRunner, String job, int
maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ def res = sqlRunner.call("show routine load for ${job}")
+ def statistic = res[0][14].toString()
+ logger.info("Routine load statistic: ${statistic}")
+ def jsonSlurper = new JsonSlurper()
+ def json = jsonSlurper.parseText(res[0][14])
+ if (json.abortedTaskNum > 1) {
+ break
+ }
+ if (count > maxAttempts) {
+ Assert.assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
new file mode 100644
index 00000000000..b2e8dbef85b
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for
+// the specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_delay_schedule","nonConcurrent") {
+ def kafkaCsvTopics = [
+ "test_routine_load_delay_schedule",
+ ]
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_delay_schedule"
+ def job = "test_delay_schedule"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "10"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def injection =
"RowsetBuilder.check_tablet_version_count.too_many_version"
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTopics)
+ RoutineLoadTestUtils.waitForTaskAbort(runSql, job)
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job,
tableName, 0)
+ logger.info("wait count: " + count)
+ assertTrue(count > 5, "task should be delayed for scheduling")
+ } finally {
+ sql "stop routine load for ${job}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]