This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 60e195964bf branch-3.1: [enhance](job) delay load task schedule when 
transaction fail #57092 (#57130)
60e195964bf is described below

commit 60e195964bf8788432fb62a212cd8966531eaba2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 14:55:28 2025 +0800

    branch-3.1: [enhance](job) delay load task schedule when transaction fail 
#57092 (#57130)
    
    Cherry-picked from #57092
    
    Co-authored-by: hui lai <[email protected]>
---
 .../load/routineload/KafkaRoutineLoadJob.java      |   3 +-
 .../doris/load/routineload/RoutineLoadJob.java     |   8 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |  10 ++
 .../load/routineload/RoutineLoadTaskScheduler.java |   4 +-
 .../regression/util/RoutineLoadTestUtils.groovy    | 152 +++++++++++++++++++++
 .../test_routine_load_delay_schedule.groovy        |  83 +++++++++++
 6 files changed, 253 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 896f1a1bcd9..ac92536f57f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -352,11 +352,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo) {
+    protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo, boolean delaySchedule) {
         KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
         // add new task
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
                 ((KafkaProgress) 
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()), 
isMultiTable());
+        kafkaTaskInfo.setDelaySchedule(delaySchedule);
         // remove old task
         routineLoadTaskInfoList.remove(routineLoadTaskInfo);
         // add new task
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 197c2c536fa..e050c6204df 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -804,7 +804,7 @@ public abstract class RoutineLoadJob
                     // and after renew, the previous task is removed from 
routineLoadTaskInfoList,
                     // so task can no longer be committed successfully.
                     // the already committed task will not be handled here.
-                    RoutineLoadTaskInfo newTask = 
unprotectRenewTask(routineLoadTaskInfo);
+                    RoutineLoadTaskInfo newTask = 
unprotectRenewTask(routineLoadTaskInfo, false);
                     
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
                 }
             }
@@ -984,7 +984,7 @@ public abstract class RoutineLoadJob
         return 0L;
     }
 
-    abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo);
+    abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo, boolean delaySchedule);
 
     // call before first scheduling
     // derived class can override this.
@@ -1204,7 +1204,7 @@ public abstract class RoutineLoadJob
             }
 
             // create new task
-            RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo);
+            RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo, false);
             
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
         } finally {
             writeUnlock();
@@ -1356,7 +1356,7 @@ public abstract class RoutineLoadJob
 
         if (state == JobState.RUNNING) {
             if (txnStatus == TransactionStatus.ABORTED) {
-                RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo);
+                RoutineLoadTaskInfo newRoutineLoadTaskInfo = 
unprotectRenewTask(routineLoadTaskInfo, true);
                 
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo);
             } else if (txnStatus == TransactionStatus.COMMITTED) {
                 // this txn is just COMMITTED, create new task when the this 
txn is VISIBLE
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 0c662ce765d..2708bbb6ebe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -38,6 +38,8 @@ import 
org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.collect.Lists;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -76,6 +78,10 @@ public abstract class RoutineLoadTaskInfo {
 
     protected boolean isEof = false;
 
+    @Getter
+    @Setter
+    protected boolean delaySchedule = false;
+
     // this status will be set when corresponding transaction's status is 
changed.
     // so that user or other logic can know the status of the corresponding 
txn.
     protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
@@ -153,6 +159,10 @@ public abstract class RoutineLoadTaskInfo {
         return isEof;
     }
 
+    public boolean needDedalySchedule() {
+        return delaySchedule || isEof;
+    }
+
     public boolean isTimeout() {
         if (txnStatus == TransactionStatus.COMMITTED || txnStatus == 
TransactionStatus.VISIBLE) {
             // the corresponding txn is already finished, this task can not be 
treated as timeout.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index d1b5a6f73e8..7d576ce594b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -104,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             RoutineLoadTaskInfo routineLoadTaskInfo = 
needScheduleTasksQueue.take();
             // try to delay scheduling tasks that are perceived as Eof to 
MaxBatchInterval
             // to avoid to much small transaction
-            if (routineLoadTaskInfo.getIsEof()) {
+            if (routineLoadTaskInfo.needDedalySchedule()) {
                 RoutineLoadJob routineLoadJob = 
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
                 if (System.currentTimeMillis() - 
routineLoadTaskInfo.getLastScheduledTime()
                         < routineLoadJob.getMaxBatchIntervalS() * 1000) {
@@ -258,7 +258,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
         }
 
         // for other errors (network issues, BE restart, etc.), reschedule 
immediately
-        RoutineLoadTaskInfo newTask = 
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo);
+        RoutineLoadTaskInfo newTask = 
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
         addTaskInQueue(newTask);
     }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
new file mode 100644
index 00000000000..9a5e27d2680
--- /dev/null
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.regression.util
+
+import groovy.json.JsonSlurper
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.junit.Assert
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+class RoutineLoadTestUtils {
+    private static final Logger logger = 
LoggerFactory.getLogger(RoutineLoadTestUtils.class)
+
+    static boolean isKafkaTestEnabled(context) {
+        String enabled = context.config.otherConfigs.get("enableKafkaTest")
+        return enabled != null && enabled.equalsIgnoreCase("true")
+    }
+
+    static String getKafkaBroker(context) {
+        String kafka_port = context.config.otherConfigs.get("kafka_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        return "${externalEnvIp}:${kafka_port}"
+    }
+
+    static KafkaProducer createKafkaProducer(String kafkaBroker) {
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker)
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+        def producer = new KafkaProducer<>(props)
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        try {
+            logger.info("Kafka connecting: ${kafkaBroker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e
+        }
+        logger.info("Kafka connect success")
+        return producer
+    }
+
+    static void sendTestDataToKafka(KafkaProducer producer, List<String> 
topics, List<String> testData = null) {
+        if (testData == null) {
+            testData = [
+                "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+                "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+            ]
+        }
+        for (String topic in topics) {
+            testData.each { line ->
+                logger.info("Sending data to kafka: ${line}")
+                def record = new ProducerRecord<>(topic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    static void checkTaskTimeout(Closure sqlRunner, String jobName, String 
expectedTimeout, int maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName = 
'${jobName}'")
+            if (res.size() > 0) {
+                logger.info("res: ${res[0].toString()}")
+                logger.info("timeout: ${res[0][6].toString()}")
+                Assert.assertEquals(res[0][6].toString(), expectedTimeout)
+                break;
+            }
+            if (count > maxAttempts) {
+                Assert.assertEquals(1, 2)
+                break;
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+    }
+
+    static int waitForTaskFinish(Closure sqlRunner, String job, String 
tableName, int expectedMinRows = 0, int maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            def res = sqlRunner.call("show routine load for ${job}")
+            def routineLoadState = res[0][8].toString()
+            def statistic = res[0][14].toString()
+            logger.info("Routine load state: ${routineLoadState}")
+            logger.info("Routine load statistic: ${statistic}")
+            def rowCount = sqlRunner.call("select count(*) from ${tableName}")
+            if (routineLoadState == "RUNNING" && rowCount[0][0] > 
expectedMinRows) {
+                break
+            }
+            if (count > maxAttempts) {
+                Assert.assertEquals(1, 2)
+                break;
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+        return count
+    }
+
+    static void waitForTaskAbort(Closure sqlRunner, String job, int 
maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            def res = sqlRunner.call("show routine load for ${job}")
+            def statistic = res[0][14].toString()
+            logger.info("Routine load statistic: ${statistic}")
+            def jsonSlurper = new JsonSlurper()
+            def json = jsonSlurper.parseText(res[0][14])
+            if (json.abortedTaskNum > 1) {
+                break
+            }
+            if (count > maxAttempts) {
+                Assert.assertEquals(1, 2)
+                break;
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
new file mode 100644
index 00000000000..b2e8dbef85b
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_delay_schedule.groovy
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for
+// the specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_delay_schedule","nonConcurrent") {
+    def kafkaCsvTopics = [
+                  "test_routine_load_delay_schedule",
+                ]
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        def tableName = "test_routine_load_delay_schedule"
+        def job = "test_delay_schedule"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def injection = 
"RowsetBuilder.check_tablet_version_count.too_many_version"
+            try {
+                GetDebugPoint().enableDebugPointForAllBEs(injection)
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
kafkaCsvTopics)
+                RoutineLoadTestUtils.waitForTaskAbort(runSql, job)
+            } finally {
+                GetDebugPoint().disableDebugPointForAllBEs(injection)
+            }
+            def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job, 
tableName, 0)
+            logger.info("wait count: " + count)
+            assertTrue(count > 5, "task should be delayed for scheduling")
+        } finally {
+            sql "stop routine load for ${job}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to