This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e45cf763965 branch-4.0: [fix](job) fix routine load task transaction 
timeout error #59999 (#60057)
e45cf763965 is described below

commit e45cf763965482f0072a7b4b262ddde4d6271c34
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jan 20 23:47:41 2026 +0800

    branch-4.0: [fix](job) fix routine load task transaction timeout error 
#59999 (#60057)
    
    Cherry-picked from #59999
    
    Co-authored-by: hui lai <[email protected]>
---
 .../doris/load/routineload/KafkaTaskInfo.java      | 14 ++++++++---
 .../load/routineload/RoutineLoadTaskInfo.java      |  3 +++
 .../load/routineload/RoutineLoadTaskScheduler.java |  4 ++++
 .../regression/util/RoutineLoadTestUtils.groovy    | 28 ++++++++++++++++++++++
 .../test_routine_load_adaptive_param.groovy        |  1 +
 5 files changed, 47 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index fafb34d960d..149c1eacf0a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -120,6 +120,17 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         return tRoutineLoadTask;
     }
 
+    @Override
+    public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
+        if (!isEof) {
+            long maxBatchIntervalS = 
Math.max(routineLoadJob.getMaxBatchIntervalS(),
+                    Config.routine_load_adaptive_min_batch_interval_sec);
+            this.timeoutMs = maxBatchIntervalS * 
Config.routine_load_task_timeout_multiplier * 1000;
+        } else {
+            this.timeoutMs = routineLoadJob.getTimeout() * 1000;
+        }
+    }
+
     private void adaptiveBatchParam(TRoutineLoadTask tRoutineLoadTask, 
RoutineLoadJob routineLoadJob) {
         long maxBatchIntervalS = routineLoadJob.getMaxBatchIntervalS();
         long maxBatchRows = routineLoadJob.getMaxBatchRows();
@@ -128,9 +139,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
             maxBatchIntervalS = Math.max(maxBatchIntervalS, 
Config.routine_load_adaptive_min_batch_interval_sec);
             maxBatchRows = Math.max(maxBatchRows, 
RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS);
             maxBatchSize = Math.max(maxBatchSize, 
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE);
-            this.timeoutMs = maxBatchIntervalS * 
Config.routine_load_task_timeout_multiplier * 1000;
-        } else {
-            this.timeoutMs = routineLoadJob.getTimeout() * 1000;
         }
         tRoutineLoadTask.setMaxIntervalS(maxBatchIntervalS);
         tRoutineLoadTask.setMaxBatchRows(maxBatchRows);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 9ed0ddf4c38..efdfb4586db 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -197,6 +197,9 @@ public abstract class RoutineLoadTaskInfo {
 
     abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
 
+    public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
+    }
+
     // begin the txn of this task
     // return true if begin successfully, return false if begin failed.
     // throw exception if unrecoverable errors happen.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 7d576ce594b..8be721fba58 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -165,6 +165,10 @@ public class RoutineLoadTaskScheduler extends MasterDaemon 
{
             throw e;
         }
 
+        // update adaptive timeout before beginTxn to ensure transaction 
timeout matches task timeout
+        RoutineLoadJob routineLoadJob = 
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
+        routineLoadTaskInfo.updateAdaptiveTimeout(routineLoadJob);
+
         // begin txn
         try {
             if (!routineLoadTaskInfo.beginTxn()) {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index a6120241a5e..f3f6641d3f6 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -149,4 +149,32 @@ class RoutineLoadTestUtils {
             }
         }
     }
+
+    static void checkTxnTimeoutMatchesTaskTimeout(Closure sqlRunner, String 
jobName, String expectedTimeoutMs, int maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            def taskRes = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName 
= '${jobName}'")
+            if (taskRes.size() > 0) {
+                def txnId = taskRes[0][1].toString()
+                logger.info("Task txnId: ${txnId}, task timeout: 
${taskRes[0][6].toString()}")
+                if (txnId != null && txnId != "null" && txnId != "-1") {
+                    // Get transaction timeout from SHOW TRANSACTION
+                    def txnRes = sqlRunner.call("SHOW TRANSACTION WHERE id = 
${txnId}")
+                    if (txnRes.size() > 0) {
+                        def txnTimeoutMs = txnRes[0][13].toString()
+                        logger.info("Transaction timeout (ms): 
${txnTimeoutMs}, expected: ${expectedTimeoutMs}")
+                        Assert.assertEquals(expectedTimeoutMs, txnTimeoutMs)
+                        break
+                    }
+                }
+            }
+            if (count > maxAttempts) {
+                Assert.fail("Timeout waiting for task and transaction to be 
created")
+                break
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+    }
 }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
index 31ab6278df1..318aa94a7be 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
@@ -77,6 +77,7 @@ suite("test_routine_load_adaptive_param","nonConcurrent") {
             logger.info("---test adaptively increase---")
             RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
             RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "3600")
+            RoutineLoadTestUtils.checkTxnTimeoutMatchesTaskTimeout(runSql, 
job, "3600000")
             RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 2)
 
             // test restore adaptively


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to