This is an automated email from the ASF dual-hosted git repository.

hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54266107479 [fix](test) stabilize routine load adaptive timeout check 
(#65092)
54266107479 is described below

commit 54266107479f7bc1dd2d3cd1daf8212fbd753bd1
Author: shuke <[email protected]>
AuthorDate: Wed Jul 1 20:09:46 2026 +0800

    [fix](test) stabilize routine load adaptive timeout check (#65092)
    
    ## Summary
    - stabilize test_routine_load_adaptive_param by driving small Kafka
    batches while polling adaptive timeout convergence
    - avoid racing the short live routine-load transaction window by
    checking committed transaction timeout via the task UUID label
    - keep the change limited to the routine-load test and
    RoutineLoadTestUtils helper used by this case
    
    ## Testing
    - [x] git diff --check origin/master...HEAD
    - [ ] Not run locally; this case requires a Kafka-enabled Doris
    regression environment. Buildall will be requested after PR creation.
---
 .../regression/util/RoutineLoadTestUtils.groovy    | 101 ++++++++++++++++++---
 .../test_routine_load_adaptive_param.groovy        |  12 ++-
 2 files changed, 95 insertions(+), 18 deletions(-)

diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index c1bd321607b..7dce3402613 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -91,13 +91,55 @@ class RoutineLoadTestUtils {
         while (true) {
             def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName = 
'${jobName}'")
             if (res.size() > 0) {
+                def txnId = res[0][1].toString()
+                def timeout = res[0][6].toString()
                 logger.info("res: ${res[0].toString()}")
-                logger.info("timeout: ${res[0][6].toString()}")
-                Assert.assertEquals(res[0][6].toString(), expectedTimeout)
+                logger.info("txnId: ${txnId}, timeout: ${timeout}, expected: 
${expectedTimeout}")
+                // A task whose txn has not begun yet (txnId == -1) may still 
carry the timeout
+                // computed in a previous schedule round; the adaptive timeout 
only converges
+                // after a subsequent task is scheduled. Poll until a stable 
task carries the
+                // expected timeout instead of asserting on a transient task.
+                if (txnId != "-1" && timeout == expectedTimeout) {
+                    Assert.assertEquals(expectedTimeout, timeout)
+                    break;
+                }
+            }
+            if (count > maxAttempts) {
+                Assert.fail("Timeout waiting for task timeout to converge to 
${expectedTimeout} for job ${jobName}")
                 break;
+            } else {
+                sleep(1000)
+                count++
+            }
+        }
+    }
+
+    // Verify that the adaptive task timeout converges to expectedTimeout when 
the job is caught up
+    // (EOF). The adaptive timeout is only (re)computed when a task is 
actually scheduled WITH data
+    // to consume; once a job drains its data the renewed task stays idle 
(txnId == -1) and keeps the
+    // timeout from the previous schedule round, so the EOF timeout is never 
observed on its own.
+    // Drive a fresh small batch each round to force an isEof task to be 
scheduled and recompute the
+    // timeout, then read whatever task is visible (the running task, or the 
renewed idle one that
+    // inherits the just-converged value). Unlike checkTaskTimeout we do NOT 
skip txnId == -1 here,
+    // because after EOF the converged value naturally settles on an idle task.
+    static void checkTaskTimeoutWithData(Closure sqlRunner, KafkaProducer 
producer, List<String> topics,
+                                         String jobName, String 
expectedTimeout, int maxAttempts = 60) {
+        def count = 0
+        while (true) {
+            sendTestDataToKafka(producer, topics)
+            def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName = 
'${jobName}'")
+            if (res.size() > 0) {
+                def txnId = res[0][1].toString()
+                def timeout = res[0][6].toString()
+                logger.info("res: ${res[0].toString()}")
+                logger.info("txnId: ${txnId}, timeout: ${timeout}, expected: 
${expectedTimeout}")
+                if (timeout == expectedTimeout) {
+                    Assert.assertEquals(expectedTimeout, timeout)
+                    break;
+                }
             }
             if (count > maxAttempts) {
-                Assert.assertEquals(1, 2)
+                Assert.fail("Timeout waiting for task timeout to converge to 
${expectedTimeout} for job ${jobName}")
                 break;
             } else {
                 sleep(1000)
@@ -173,27 +215,56 @@ class RoutineLoadTestUtils {
         }
     }
 
-    static void checkTxnTimeoutMatchesTaskTimeout(Closure sqlRunner, String 
jobName, String expectedTimeoutMs, int maxAttempts = 60) {
+    // Verify that the transaction a routine-load task begins carries the 
(adaptive) task timeout.
+    //
+    // Reading the timeout from a LIVE task txn (poll SHOW ROUTINE LOAD TASK 
until txnId != -1, then
+    // SHOW TRANSACTION WHERE id = txnId) is inherently racy: a small-batch 
routine-load txn begins and
+    // commits in well under the 1s poll interval, so the txnId != -1 window 
is sub-second and the poll
+    // almost never samples it. The converged adaptive timeout is correct the 
whole time; only the
+    // live-txn observation flakes.
+    //
+    // Instead join on the task UUID: SHOW ROUTINE LOAD TASK col[0] (TaskId) 
is exactly the FE
+    // transaction label (RoutineLoadTaskInfo.beginTxn sets label = 
printId(taskId)). Capture those
+    // UUIDs and read the timeout from the COMMITTED/VISIBLE transaction with 
that label. The txn
+    // timeout (SHOW TRANSACTION col[13]) is a persisted field, frozen at 
begin time and retained long
+    // after commit, so it is read without racing a live txn.
+    static void checkTxnTimeoutMatchesTaskTimeout(Closure sqlRunner, 
KafkaProducer producer, List<String> topics,
+                                                  String jobName, String 
expectedTimeoutMs, int maxAttempts = 60) {
         def count = 0
+        def seenTaskIds = new LinkedHashSet<String>()
         while (true) {
+            // Keep a task scheduled so a txn keeps being begun and committed 
for this job.
+            sendTestDataToKafka(producer, topics)
             def taskRes = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName 
= '${jobName}'")
             if (taskRes.size() > 0) {
-                def txnId = taskRes[0][1].toString()
-                logger.info("Task txnId: ${txnId}, task timeout: 
${taskRes[0][6].toString()}")
-                if (txnId != null && txnId != "null" && txnId != "-1") {
-                    // Get transaction timeout from SHOW TRANSACTION
-                    def txnRes = sqlRunner.call("SHOW TRANSACTION WHERE id = 
${txnId}")
-                    if (txnRes.size() > 0) {
-                        def txnTimeoutMs = txnRes[0][13].toString()
-                        logger.info("Transaction timeout (ms): 
${txnTimeoutMs}, expected: ${expectedTimeoutMs}")
+                def taskId = taskRes[0][0].toString()
+                logger.info("Task id: ${taskId}, txnId: 
${taskRes[0][1].toString()}, task timeout: ${taskRes[0][6].toString()}")
+                if (taskId != null && taskId != "null" && taskId != "") {
+                    seenTaskIds.add(taskId)
+                }
+            }
+            // The committed txn for a captured task is queryable by its label 
(the bare task UUID)
+            // whether or not it is currently running.
+            for (String label : seenTaskIds) {
+                def txnRes = null
+                try {
+                    txnRes = sqlRunner.call("SHOW TRANSACTION WHERE label = 
'${label}'")
+                } catch (Exception e) {
+                    // The task has not begun its txn yet, so the label does 
not exist; keep polling.
+                    continue
+                }
+                if (txnRes != null && txnRes.size() > 0) {
+                    def txnTimeoutMs = txnRes[0][13].toString()
+                    logger.info("Transaction label: ${label}, timeout (ms): 
${txnTimeoutMs}, expected: ${expectedTimeoutMs}")
+                    if (txnTimeoutMs == expectedTimeoutMs) {
                         Assert.assertEquals(expectedTimeoutMs, txnTimeoutMs)
-                        break
+                        return
                     }
                 }
             }
             if (count > maxAttempts) {
-                Assert.fail("Timeout waiting for task and transaction to be 
created")
-                break
+                Assert.fail("Timeout waiting for a committed transaction of 
job ${jobName} to carry timeout ${expectedTimeoutMs}")
+                return
             } else {
                 sleep(1000)
                 count++
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
index 8993962a104..49d901c31c9 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
@@ -74,8 +74,12 @@ suite("test_routine_load_adaptive_param","nonConcurrent") {
                 
                 logger.info("---test adaptively increase---")
                 RoutineLoadTestUtils.sendTestDataToKafka(producer, 
kafkaCsvTpoics)
-                RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "3600")
-                RoutineLoadTestUtils.checkTxnTimeoutMatchesTaskTimeout(runSql, 
job, "3600000")
+                // Drive data each round so an isEof=false task keeps being 
scheduled. The converged
+                // adaptive timeout (3600) lives on the renewed idle task 
(txnId == -1), so both checks
+                // poll by value (task timeout col, and the committed txn's 
persisted timeout looked up
+                // by task-UUID label) instead of racing a sub-second running 
task.
+                RoutineLoadTestUtils.checkTaskTimeoutWithData(runSql, 
producer, kafkaCsvTpoics, job, "3600")
+                RoutineLoadTestUtils.checkTxnTimeoutMatchesTaskTimeout(runSql, 
producer, kafkaCsvTpoics, job, "3600000")
                 RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 
2)
             } finally {
                 GetDebugPoint().disableDebugPointForAllFEs(injection)
@@ -84,7 +88,9 @@ suite("test_routine_load_adaptive_param","nonConcurrent") {
             logger.info("---test restore adaptively---")
             RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
             RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 4)
-            RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "100")
+            // After EOF the adaptive timeout only converges when an isEof 
task is scheduled with
+            // data, so keep feeding small batches until the task timeout 
restores to the job timeout.
+            RoutineLoadTestUtils.checkTaskTimeoutWithData(runSql, producer, 
kafkaCsvTpoics, job, "100")
         } finally {
             sql "stop routine load for ${job}"
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to