This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b6476e3840 [enhance](job) delay load task schedule when transaction 
fail (#57092)
1b6476e3840 is described below

commit 1b6476e3840a159af35053ac0c53ef713ebdb9ca
Author: hui lai <[email protected]>
AuthorDate: Sun Oct 19 16:52:23 2025 +0800

    [enhance](job) delay load task schedule when transaction fail (#57092)
    
    Now, when transaction fail, routine load task will retry as soon as
    possible, when meet some temporarily unrecoverable errors like
    `too_many_version`, it will retry too many times in a short time and
    take huge pressure to upstream system like Kafka.
    
    To solve this problem, we delay load task schedule when transaction fail
    to reduce retry times when meet error and restore normal schedule if
    transactions resume normal execution.
---
 .../load/routineload/KafkaRoutineLoadJob.java      |   3 +-
 .../doris/load/routineload/RoutineLoadJob.java     |   8 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |  10 ++
 .../load/routineload/RoutineLoadTaskScheduler.java |   4 +-
 .../regression/util/RoutineLoadTestUtils.groovy    | 152 +++++++++++++++++++++
 .../test_routine_load_delay_schedule.groovy        |  83 +++++++++++
 6 files changed, 253 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index c2441e972cc..b708f3eb07e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -356,11 +356,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo) {
+    protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo, boolean delaySchedule) {
         KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
         // add new task
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
                 ((KafkaProgress) 
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()), 
isMultiTable());
+        kafkaTaskInfo.setDelaySchedule(delaySchedule);
         // remove old task
         routineLoadTaskInfoList.remove(routineLoadTaskInfo);
         // add new task
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 4102d6ccbdb..d768da582a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -807,7 +807,7 @@ public abstract class RoutineLoadJob
                     // and after renew, the previous task is removed from 
routineLoadTaskInfoList,
                     // so task can no longer be committed successfully.
                     // the already committed task will not be handled here.
-                    RoutineLoadTaskInfo newTask = 
unprotectRenewTask(routineLoadTaskInfo);
+                    RoutineLoadTaskInfo newTask = 
unprotectRenewTask(routineLoadTaskInfo, false);
                     
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
                 }
             }
@@ -987,7 +987,7 @@ public abstract class RoutineLoadJob
         return 0L;
     }
 
-    abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo);
+    abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo, boolean delaySchedule);
 
     // call before first scheduling
     // derived class can override this.
@@ -1243,7 +1243,7 @@ public abstract class RoutineLoadJob
             }
 
             // create new task
-            RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo);
+            RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo, false);
             
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
         } finally {
             writeUnlock();
@@ -1395,7 +1395,7 @@ public abstract class RoutineLoadJob
 
         if (state == JobState.RUNNING) {
             if (txnStatus == TransactionStatus.ABORTED) {
-                RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo);
+                RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo, true);
                 
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
             } else if (txnStatus == TransactionStatus.COMMITTED) {
                 // this txn is just COMMITTED, create new task when the this 
txn is VISIBLE
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 0c662ce765d..2708bbb6ebe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -38,6 +38,8 @@ import 
org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.collect.Lists;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -76,6 +78,10 @@ public abstract class RoutineLoadTaskInfo {
 
     protected boolean isEof = false;
 
+    @Getter
+    @Setter
+    protected boolean delaySchedule = false;
+
     // this status will be set when corresponding transaction's status is 
changed.
     // so that user or other logic can know the status of the corresponding 
txn.
     protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
@@ -153,6 +159,10 @@ public abstract class RoutineLoadTaskInfo {
         return isEof;
     }
 
+    public boolean needDedalySchedule() {
+        return delaySchedule || isEof;
+    }
+
     public boolean isTimeout() {
         if (txnStatus == TransactionStatus.COMMITTED || txnStatus == 
TransactionStatus.VISIBLE) {
             // the corresponding txn is already finished, this task can not be 
treated as timeout.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index d1b5a6f73e8..7d576ce594b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -104,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             RoutineLoadTaskInfo routineLoadTaskInfo = 
needScheduleTasksQueue.take();
             // try to delay scheduling tasks that are perceived as Eof to 
MaxBatchInterval
             // to avoid to much small transaction
-            if (routineLoadTaskInfo.getIsEof()) {
+            if (routineLoadTaskInfo.needDedalySchedule()) {
                 RoutineLoadJob routineLoadJob = 
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
                 if (System.currentTimeMillis() - 
routineLoadTaskInfo.getLastScheduledTime()
                         < routineLoadJob.getMaxBatchIntervalS() * 1000) {
@@ -258,7 +258,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
         }
 
         // for other errors (network issues, BE restart, etc.), reschedule 
immediately
-        RoutineLoadTaskInfo newTask = 
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo);
+        RoutineLoadTaskInfo newTask = 
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
         addTaskInQueue(newTask);
     }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
new file mode 100644
index 00000000000..9a5e27d2680
--- /dev/null
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.regression.util
+
+import groovy.json.JsonSlurper
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.junit.Assert
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+class RoutineLoadTestUtils {
+    private static final Logger logger = 
LoggerFactory.getLogger(RoutineLoadTestUtils.class)
+
+    static boolean isKafkaTestEnabled(context) {
+        String enabled = context.config.otherConfigs.get("enableKafkaTest")
+        return enabled != null && enabled.equalsIgnoreCase("true")
+    }
+
+    static String getKafkaBroker(context) {
+        String kafka_port = context.config.otherConfigs.get("kafka_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        return "${externalEnvIp}:${kafka_port}"
+    }
+
+    static KafkaProducer createKafkaProducer(String kafkaBroker) {
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker)
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+        def producer = new KafkaProducer<>(props)
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        try {
+            logger.info("Kafka connecting: ${kafkaBroker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e
+        }
+        logger.info("Kafka connect success")
+        return producer
+    }
+
+    static void sendTestDataToKafka(KafkaProducer producer, List<String> 
topics, List<String> testData = null) {
+        if (testData == null) {
+            testData = [
+                "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+                "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+            ]
+        }
+        for (String topic in topics) {
+            testData.each { line ->
+                logger.info("Sending data to kafka: ${line}")
+                def record = new ProducerRecord<>(topic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    static void checkTaskTimeout(Closure sqlRunner, String jobName, String 
expectedTimeout, int maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName = 
'${jobName}'")
+            if (res.size() > 0) {
+                logger.info("res: ${res[0].toString()}")
+                logger.info("timeout: ${res[0][6].toString()}")
+                Assert.assertEquals(res[0][6].toString(), expectedTimeout)
+                break;
+            }
+            if (count > maxAttempts) {
+                Assert.assertEquals(1, 2)
+                break;
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+    }
+
+    static int waitForTaskFinish(Closure sqlRunner, String job, String 
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            def res = sqlRunner.call("show routine load for ${job}")
+            def routineLoadState = res[0][8].toString()
+            def statistic = res[0][14].toString()
+            logger.info("Routine load state: ${routineLoadState}")
+            logger.info("Routine load statistic: ${statistic}")
+            def rowCount = sqlRunner.call("select count(*) from ${tableName}")
+            if (routineLoadState == "RUNNING" && rowCount[0][0] > 
expectedMinRows) {
+                break
+            }
+            if (count > maxAttempts) {
+                Assert.assertEquals(1, 2)
+                break;
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+        return count
+    }
+
+    static void waitForTaskAbort(Closure sqlRunner, String job, int 
maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            def res = sqlRunner.call("show routine load for ${job}")
+            def statistic = res[0][14].toString()
+            logger.info("Routine load statistic: ${statistic}")
+            def jsonSlurper = new JsonSlurper()
+            def json = jsonSlurper.parseText(res[0][14])
+            if (json.abortedTaskNum > 1) {
+                break
+            }
+            if (count > maxAttempts) {
+                Assert.assertEquals(1, 2)
+                break;
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
new file mode 100644
index 00000000000..b2e8dbef85b
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for
+// the specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_delay_schedule","nonConcurrent") {
+    def kafkaCsvTopics = [
+                  "test_routine_load_delay_schedule",
+                ]
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        def tableName = "test_routine_load_delay_schedule"
+        def job = "test_delay_schedule"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def injection = 
"RowsetBuilder.check_tablet_version_count.too_many_version"
+            try {
+                GetDebugPoint().enableDebugPointForAllBEs(injection)
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
kafkaCsvTopics)
+                RoutineLoadTestUtils.waitForTaskAbort(runSql, job)
+            } finally {
+                GetDebugPoint().disableDebugPointForAllBEs(injection)
+            }
+            def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job, 
tableName, 0)
+            logger.info("wait count: " + count)
+            assertTrue(count > 5, "task should be delayed for scheduling")
+        } finally {
+            sql "stop routine load for ${job}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to