This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c0248ced8adaba5fccb5f75ce49859d05908ccea
Author: 924060929 <[email protected]>
AuthorDate: Fri Mar 27 19:08:33 2026 +0800

    [test](local shuffle) fix flaky profile fetch in FE/BE consistency test
    
    Replace fixed Thread.sleep(1500) with a stability-based polling loop:
    poll until two consecutive reads of the profile are identical, which
    indicates the asynchronous operator-metric writing is complete.
    
    The old fixed sleep occasionally fetched an incomplete profile (query ID
    appears in the header early, but pipeline operator metrics are written
    asynchronously), causing false MISMATCH reports.
---
 .../test_local_shuffle_fe_be_consistency.groovy    | 27 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
index 98ca9fff86d..1070694a18b 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
@@ -80,6 +80,28 @@ suite("test_local_shuffle_fe_be_consistency") {
     //  enableFePlanner=true  → enable_local_shuffle_planner=true  (FE plans 
exchanges)
     //  enableFePlanner=false → enable_local_shuffle_planner=false (BE plans 
natively)
     // ============================================================
+    // Poll until the profile for queryId is stable (two consecutive reads 
match).
+    // The query ID appears in the header early, but operator metrics are 
written asynchronously
+    // after the query finishes. A stable profile means writing is complete.
+    def waitForProfile = { String queryId ->
+        def maxAttempts = 30
+        def sleepMs = 300
+        String prev = ""
+        for (int i = 0; i < maxAttempts; i++) {
+            Thread.sleep(sleepMs)
+            try {
+                def text = getProfile(queryId)
+                if (text.contains(queryId) && text == prev) {
+                    return text  // stable across two consecutive reads → 
complete
+                }
+                prev = text
+            } catch (Exception ignored) {
+                prev = ""
+            }
+        }
+        return getProfile(queryId)
+    }
+
     def runAndGetSinkCounts = { String testSql, boolean enableFePlanner ->
         sql "set enable_profile=true"
         sql "set enable_local_shuffle_planner=${enableFePlanner}"
@@ -90,10 +112,7 @@ suite("test_local_shuffle_fe_be_consistency") {
         def queryIdResult = sql "select last_query_id()"
         def queryId = queryIdResult[0][0].toString()
 
-        // Wait for profile to be fully collected
-        Thread.sleep(1500)
-
-        def profileText = getProfile(queryId)
+        def profileText = waitForProfile(queryId)
         def counts = extractSinkExchangeCounts(profileText)
         logger.info("enable_local_shuffle_planner=${enableFePlanner}, 
query_id=${queryId}, LE sink counts=${counts}")
         return [queryId: queryId, counts: counts, profile: profileText]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to