This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e45cf763965 branch-4.0: [fix](job) fix routine load task transaction
timeout error #59999 (#60057)
e45cf763965 is described below
commit e45cf763965482f0072a7b4b262ddde4d6271c34
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jan 20 23:47:41 2026 +0800
branch-4.0: [fix](job) fix routine load task transaction timeout error
#59999 (#60057)
Cherry-picked from #59999
Co-authored-by: hui lai <[email protected]>
---
.../doris/load/routineload/KafkaTaskInfo.java | 14 ++++++++---
.../load/routineload/RoutineLoadTaskInfo.java | 3 +++
.../load/routineload/RoutineLoadTaskScheduler.java | 4 ++++
.../regression/util/RoutineLoadTestUtils.groovy | 28 ++++++++++++++++++++++
.../test_routine_load_adaptive_param.groovy | 1 +
5 files changed, 47 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index fafb34d960d..149c1eacf0a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -120,6 +120,17 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
return tRoutineLoadTask;
}
+ @Override
+ public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
+ if (!isEof) {
+ long maxBatchIntervalS =
Math.max(routineLoadJob.getMaxBatchIntervalS(),
+ Config.routine_load_adaptive_min_batch_interval_sec);
+ this.timeoutMs = maxBatchIntervalS *
Config.routine_load_task_timeout_multiplier * 1000;
+ } else {
+ this.timeoutMs = routineLoadJob.getTimeout() * 1000;
+ }
+ }
+
private void adaptiveBatchParam(TRoutineLoadTask tRoutineLoadTask,
RoutineLoadJob routineLoadJob) {
long maxBatchIntervalS = routineLoadJob.getMaxBatchIntervalS();
long maxBatchRows = routineLoadJob.getMaxBatchRows();
@@ -128,9 +139,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
maxBatchIntervalS = Math.max(maxBatchIntervalS,
Config.routine_load_adaptive_min_batch_interval_sec);
maxBatchRows = Math.max(maxBatchRows,
RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS);
maxBatchSize = Math.max(maxBatchSize,
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE);
- this.timeoutMs = maxBatchIntervalS *
Config.routine_load_task_timeout_multiplier * 1000;
- } else {
- this.timeoutMs = routineLoadJob.getTimeout() * 1000;
}
tRoutineLoadTask.setMaxIntervalS(maxBatchIntervalS);
tRoutineLoadTask.setMaxBatchRows(maxBatchRows);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 9ed0ddf4c38..efdfb4586db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -197,6 +197,9 @@ public abstract class RoutineLoadTaskInfo {
abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
+ public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
+ }
+
// begin the txn of this task
// return true if begin successfully, return false if begin failed.
// throw exception if unrecoverable errors happen.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 7d576ce594b..8be721fba58 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -165,6 +165,10 @@ public class RoutineLoadTaskScheduler extends MasterDaemon
{
throw e;
}
+ // update adaptive timeout before beginTxn to ensure transaction
timeout matches task timeout
+ RoutineLoadJob routineLoadJob =
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
+ routineLoadTaskInfo.updateAdaptiveTimeout(routineLoadJob);
+
// begin txn
try {
if (!routineLoadTaskInfo.beginTxn()) {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index a6120241a5e..f3f6641d3f6 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -149,4 +149,32 @@ class RoutineLoadTestUtils {
}
}
}
+
+ static void checkTxnTimeoutMatchesTaskTimeout(Closure sqlRunner, String
jobName, String expectedTimeoutMs, int maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ def taskRes = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName
= '${jobName}'")
+ if (taskRes.size() > 0) {
+ def txnId = taskRes[0][1].toString()
+ logger.info("Task txnId: ${txnId}, task timeout:
${taskRes[0][6].toString()}")
+ if (txnId != null && txnId != "null" && txnId != "-1") {
+ // Get transaction timeout from SHOW TRANSACTION
+ def txnRes = sqlRunner.call("SHOW TRANSACTION WHERE id =
${txnId}")
+ if (txnRes.size() > 0) {
+ def txnTimeoutMs = txnRes[0][13].toString()
+ logger.info("Transaction timeout (ms):
${txnTimeoutMs}, expected: ${expectedTimeoutMs}")
+ Assert.assertEquals(expectedTimeoutMs, txnTimeoutMs)
+ break
+ }
+ }
+ }
+ if (count > maxAttempts) {
+ Assert.fail("Timeout waiting for task and transaction to be
created")
+ break
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ }
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
index 31ab6278df1..318aa94a7be 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
@@ -77,6 +77,7 @@ suite("test_routine_load_adaptive_param","nonConcurrent") {
logger.info("---test adaptively increase---")
RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "3600")
+ RoutineLoadTestUtils.checkTxnTimeoutMatchesTaskTimeout(runSql,
job, "3600000")
RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 2)
// test restore adaptively
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]