This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new eb3ee763699 branch-4.0: [fix](test) deflake test_iot_auto_detect_fail
by accepting equivalent error messages (#64580)
eb3ee763699 is described below
commit eb3ee763699a62aaddbd18de1778e85894b95c60
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Jun 21 16:10:09 2026 +0800
branch-4.0: [fix](test) deflake test_iot_auto_detect_fail by accepting
equivalent error messages (#64580)
---
.../compaction/test_compaction_agg_keys.groovy | 42 +++++++------
.../test_compaction_agg_keys_with_array_map.groovy | 43 +++++++------
.../test_compaction_agg_keys_with_delete.groovy | 43 +++++++------
.../compaction/test_compaction_uniq_keys.groovy | 36 ++++++-----
.../compaction/test_compaction_uniq_keys_ck.groovy | 40 +++++++-----
.../test_compaction_uniq_keys_row_store_ck.groovy | 39 +++++++-----
.../test_compaction_uniq_keys_with_delete.groovy | 38 +++++++-----
...test_compaction_uniq_keys_with_delete_ck.groovy | 38 +++++++-----
.../test_compaction_with_empty_rowset.groovy | 71 +++++++++++++---------
.../test_vertical_compaction_agg_keys.groovy | 39 +++++++-----
.../test_vertical_compaction_agg_state.groovy | 39 +++++++-----
.../test_vertical_compaction_uniq_keys.groovy | 38 +++++++-----
.../test_vertical_compaction_uniq_keys_ck.groovy | 38 +++++++-----
.../test_colocate_join_of_column_order.groovy | 18 ++----
...iceberg_runtime_filter_partition_pruning.groovy | 29 +++++++++
...ntime_filter_partition_pruning_transform.groovy | 30 +++++++++
..._paimon_runtime_filter_partition_pruning.groovy | 31 +++++++++-
.../test_iot_auto_detect_fail.groovy | 21 ++++++-
.../query_profile/s3_load_profile_test.groovy | 17 +++++-
19 files changed, 458 insertions(+), 232 deletions(-)
diff --git a/regression-test/suites/compaction/test_compaction_agg_keys.groovy
b/regression-test/suites/compaction/test_compaction_agg_keys.groovy
index 480f4696e5f..7ed481827d9 100644
--- a/regression-test/suites/compaction/test_compaction_agg_keys.groovy
+++ b/regression-test/suites/compaction/test_compaction_agg_keys.groovy
@@ -17,6 +17,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_agg_keys") {
def tableName = "compaction_agg_keys_regression_test"
@@ -104,26 +105,33 @@ suite("test_compaction_agg_keys") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
-
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
-
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
index bad84c83295..604025ed54d 100644
---
a/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
+++
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_agg_keys_with_array_map") {
def tableName = "compaction_agg_keys_regression_test_complex"
@@ -95,26 +97,33 @@ suite("test_compaction_agg_keys_with_array_map") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
-
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
-
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy
b/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy
index 99ea6077e46..645c0468078 100644
---
a/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy
+++
b/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_agg_keys_with_delete") {
def tableName = "test_compaction_agg_keys_with_delete_regression_test"
@@ -114,26 +116,33 @@ suite("test_compaction_agg_keys_with_delete") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
-
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
-
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git a/regression-test/suites/compaction/test_compaction_uniq_keys.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys.groovy
index fac2cc4ac80..0da404d2245 100644
--- a/regression-test/suites/compaction/test_compaction_uniq_keys.groovy
+++ b/regression-test/suites/compaction/test_compaction_uniq_keys.groovy
@@ -17,6 +17,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_uniq_keys") {
def tableName = "compaction_uniq_keys_regression_test"
@@ -103,25 +104,30 @@ suite("test_compaction_uniq_keys") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
index 1b401b3c789..0b5aed737ff 100644
--- a/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
+++ b/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_uniq_keys_ck") {
def tableName = "compaction_uniq_keys_ck"
@@ -107,24 +109,32 @@ suite("test_compaction_uniq_keys_ck") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only treats a rowset as a cumulative compaction candidate once
the BE has
+ // the FE-pushed visible version covering it
(Tablet::_pick_visible_rowsets_to_compaction).
+ // The fast incremental push from FE (PublishVersionDaemon) can be
missed; the
+ // periodic fallback then syncs it, bounded by
partition_info_update_interval_secs
+ // (60s) + report_tablet_interval_seconds (60s), i.e. up to ~120s. So
keep triggering
+ // + recounting -- the merge happens the instant the visible version
lands -- and give
+ // the loop budget well above that ~120s ceiling so it stays
deterministic.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
index 19a6d467d84..25fa852a112 100644
---
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
+++
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_uniq_keys_row_store_ck", "p0") {
@@ -186,24 +188,31 @@ suite("test_compaction_uniq_keys_row_store_ck", "p0") {
checkValue()
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
checkValue()
qt_sql_row_size "select sum(length(__DORIS_ROW_STORE_COL__)) from
${tableName}"
} finally {
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
index 463597124c7..2317b930907 100644
---
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
+++
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_uniq_keys_with_delete") {
def tableName = "test_compaction_uniq_keys_with_delete_regression_test"
@@ -118,24 +120,30 @@ suite("test_compaction_uniq_keys_with_delete") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
index 6a61c84a1f6..fa4443e9370 100644
---
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
+++
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_uniq_keys_with_delete_ck") {
def tableName = "test_compaction_uniq_keys_with_delete_ck"
@@ -135,24 +137,30 @@ suite("test_compaction_uniq_keys_with_delete_ck") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
index 6f6f869917d..adc4e75ad92 100644
--- a/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
+++ b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
@@ -17,6 +17,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_compaction_mow_with_empty_rowset", "p0") {
def tableName = "test_compaction_with_empty_rowset"
@@ -60,21 +61,28 @@ suite("test_compaction_mow_with_empty_rowset", "p0") {
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- def (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 10 * replicaNum)
+ return rowCount < 10 * replicaNum
+ })
qt_sql2 """ select * from ${tableName} order by k1, k2, k3 """
for (int i = 0; i < 10; i++) {
@@ -82,20 +90,27 @@ suite("test_compaction_mow_with_empty_rowset", "p0") {
'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
}
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
- rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- def (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 20 * replicaNum)
+ return rowCount < 20 * replicaNum
+ })
qt_sql3 """ select * from ${tableName} order by k1, k2, k3 """
}
diff --git
a/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy
b/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy
index 89493e15343..6f9407324c1 100644
--- a/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy
+++ b/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_vertical_compaction_agg_keys") {
def tableName = "vertical_compaction_agg_keys_regression_test"
@@ -115,24 +117,31 @@ suite("test_vertical_compaction_agg_keys") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
index 22a8f653b74..6216a386bfb 100644
---
a/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
+++
b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_vertical_compaction_agg_state") {
def tableName = "vertical_compaction_agg_state_regression_test"
@@ -76,24 +78,31 @@ suite("test_vertical_compaction_agg_state") {
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default """ SELECT
user_id,array_sort(collect_set_merge(agg_user_id)) FROM ${tableName} t group by
user_id ORDER BY user_id;"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy
index 6bff003a028..b7300ea8617 100644
---
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy
+++
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_vertical_compaction_uniq_keys") {
def tableName = "vertical_compaction_uniq_keys_regression_test"
@@ -112,24 +114,30 @@ suite("test_vertical_compaction_uniq_keys") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
index e7fad814e15..a78dcc5c3cf 100644
---
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
+++
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
@@ -16,6 +16,8 @@
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
suite("test_vertical_compaction_uniq_keys_ck") {
def tableName = "test_vertical_compaction_uniq_keys_ck"
@@ -114,24 +116,30 @@ suite("test_vertical_compaction_uniq_keys_ck") {
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
- // trigger compactions for all tablets in ${tableName}
- trigger_and_wait_compaction(tableName, "cumulative")
-
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
- int rowCount = 0
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+ // BE only picks rowsets whose version is already visible as cumulative
+ // compaction candidates, and the visible version is pushed from FE
+ // asynchronously. Right after a burst of loads it may lag, so a single
+ // cumulative round can merge only the visible prefix of rowsets. Retry
+ // trigger + recount until the rows are fully merged.
+ Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() -> {
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
}
- }
- assert (rowCount < 8 * replicaNum)
+ return rowCount < 8 * replicaNum
+ })
qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git
a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
index d6d27e03f76..21e0a6a1873 100644
---
a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
+++
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
@@ -100,20 +100,12 @@ suite("test_colocate_join_of_column_order") {
sql """insert into test_colocate_join_of_column_order_tb values(1,1);"""
sql """insert into test_colocate_join_of_column_order_tc values(1,1);"""
- // Pin column statistics so the cost-based COLOCATE-vs-PARTITIONED choice
is deterministic.
- // Freshly-created tables have rowCountReported=false (only the async
CloudTabletStatMgr sets it,
- // up to a full tick after INSERT); with unreliable stats the optimizer
can fall back to a
- // PARTITIONED shuffle join, which makes the COLOCATE assertion below
flaky. Injecting stats
- // (userInjected) bypasses the async-report dependency. See
nereids_p0/join/initial_join_order.
- sql """alter table test_colocate_join_of_column_order_ta modify column c1
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1',
'max_value'='1')"""
- sql """alter table test_colocate_join_of_column_order_ta modify column c2
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1',
'max_value'='1')"""
- sql """alter table test_colocate_join_of_column_order_tb modify column c1
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1',
'max_value'='1')"""
- sql """alter table test_colocate_join_of_column_order_tb modify column c2
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1',
'max_value'='1')"""
- sql """alter table test_colocate_join_of_column_order_tc modify column c1
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1',
'max_value'='1')"""
- sql """alter table test_colocate_join_of_column_order_tc modify column c2
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1',
'max_value'='1')"""
-
+ // parallel_pipeline_task_num=1 keeps the bucket-shuffle downgrade from
firing: with the default
+ // fuzzed value (0 -> auto ~cores/2) the heuristic totalBucketNum(10) <
backEndNum*paraNum*0.8 can
+ // trip on a single-BE cloud cluster, turning ta's NATURAL distribution
into EXECUTION_BUCKETED and
+ // forbidding the downstream COLOCATE. Pinning paraNum=1 makes the
condition always false.
explain {
- sql("""select /*+ set_var(disable_join_reorder=true) */ * from
test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as
bigint) c2 from test_colocate_join_of_column_order_tb)
test_colocate_join_of_column_order_tb on
test_colocate_join_of_column_order_ta.c1 =
test_colocate_join_of_column_order_tb.c2 join [shuffle]
test_colocate_join_of_column_order_tc on
test_colocate_join_of_column_order_tb.c2 =
test_colocate_join_of_column_order_tc.c1;""");
+ sql("""select /*+
set_var(disable_join_reorder=true,parallel_pipeline_task_num=1) */ * from
test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as
bigint) c2 from test_colocate_join_of_column_order_tb)
test_colocate_join_of_column_order_tb on
test_colocate_join_of_column_order_ta.c1 =
test_colocate_join_of_column_order_tb.c2 join [shuffle]
test_colocate_join_of_column_order_tc on
test_colocate_join_of_column_order_tb.c2 = test_colocate_join_of_column_order
[...]
contains "COLOCATE"
}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
index 442a8a4c121..9bf238c517d 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
suite("test_iceberg_runtime_filter_partition_pruning",
"p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
@@ -40,6 +43,32 @@ suite("test_iceberg_runtime_filter_partition_pruning",
"p0,external,doris,extern
"s3.region" = "us-east-1"
);"""
+ // All tables in partition_db are pre-created by the docker preinstalled
script (run18.sql)
+ // and are only read here. A freshly created Iceberg REST catalog can
occasionally return an
+ // incomplete table list on its first metadata fetch (REST client cold
start), which made this
+ // test flaky with "Table xxx does not exist in database [partition_db]".
Wait until all
+ // expected tables are visible (refresh to drop any cached partial list)
before querying.
+ def expectedTables = ["date_partitioned", "int_partitioned",
"float_partitioned",
+ "string_partitioned", "timestamp_partitioned",
"timestamp_ntz_partitioned",
+ "boolean_partitioned", "decimal_partitioned",
"binary_partitioned",
+ "null_str_partition_table"] as Set
+ Awaitility.await("wait for ${db_name} tables to be visible")
+ .atMost(60, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until {
+ try {
+ sql """refresh catalog ${catalog_name}"""
+ def actualTables = sql("""show tables from
`${catalog_name}`.`${db_name}`""")
+ .collect { it[0] as String } as Set
+ if (!actualTables.containsAll(expectedTables)) {
+ logger.warn("${db_name} not ready yet, missing tables:
${expectedTables - actualTables}")
+ return false
+ }
+ return true
+ } catch (Exception e) {
+ logger.warn("waiting for ${db_name} tables to be visible:
${e.getMessage()}")
+ return false
+ }
+ }
+
sql """switch ${catalog_name}"""
sql """use ${db_name}"""
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
index 7a2dbcfab92..f2b951366cc 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
suite("test_iceberg_runtime_filter_partition_pruning_transform",
"p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
@@ -40,6 +43,33 @@
suite("test_iceberg_runtime_filter_partition_pruning_transform", "p0,external,do
"s3.region" = "us-east-1"
);"""
+ // All tables in transform_partition_db are pre-created by the docker
preinstalled script
+ // (run19.sql) and are only read here. A freshly created Iceberg REST
catalog can occasionally
+ // return an incomplete table list on its first metadata fetch (REST
client cold start), which can
+ // make this test flaky with "Table xxx does not exist in database
[transform_partition_db]". Wait
+ // until all expected tables are visible (refresh to drop any cached
partial list) before querying.
+ def expectedTables = ["bucket_int_4", "bucket_bigint_4",
"bucket_string_4", "bucket_date_4",
+ "bucket_timestamp_4", "bucket_timestamp_ntz_4",
"bucket_binary_4",
+ "truncate_string_3", "truncate_binary_4",
"truncate_int_10",
+ "truncate_bigint_100", "truncate_decimal_10",
"day_partitioned",
+ "year_partitioned", "month_partitioned",
"hour_partitioned"] as Set
+ Awaitility.await("wait for ${db_name} tables to be visible")
+ .atMost(60, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until {
+ try {
+ sql """refresh catalog ${catalog_name}"""
+ def actualTables = sql("""show tables from
`${catalog_name}`.`${db_name}`""")
+ .collect { it[0] as String } as Set
+ if (!actualTables.containsAll(expectedTables)) {
+ logger.warn("${db_name} not ready yet, missing tables:
${expectedTables - actualTables}")
+ return false
+ }
+ return true
+ } catch (Exception e) {
+ logger.warn("waiting for ${db_name} tables to be visible:
${e.getMessage()}")
+ return false
+ }
+ }
+
sql """switch ${catalog_name}"""
sql """use ${db_name}"""
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
index f7a666d2c83..325e9f242d0 100644
---
a/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
suite("test_paimon_runtime_filter_partition_pruning",
"p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
@@ -36,8 +39,34 @@ suite("test_paimon_runtime_filter_partition_pruning",
"p0,external,doris,externa
's3.path.style.access' = 'true'
);
"""
+ // All tables in partition_db are pre-created by the docker
preinstalled paimon script and are
+ // only read here. A freshly created Paimon catalog can occasionally
return an incomplete table
+ // list on its first metadata fetch (catalog cold start), which can
make this test flaky with
+ // "Table xxx does not exist in database [partition_db]". Wait until
all expected tables are
+ // visible (refresh to drop any cached partial list) before querying.
+ // Note: binary_partitioned is intentionally excluded (its queries are
skipped below).
+ def expectedTables = ["decimal_partitioned", "int_partitioned",
"string_partitioned",
+ "date_partitioned", "timestamp_partitioned",
"bigint_partitioned",
+ "boolean_partitioned", "float_partitioned",
+ "null_str_partition_table"] as Set
+ Awaitility.await("wait for ${db_name} tables to be visible")
+ .atMost(60, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until {
+ try {
+ sql """refresh catalog ${catalog_name}"""
+ def actualTables = sql("""show tables from
`${catalog_name}`.`${db_name}`""")
+ .collect { it[0] as String } as Set
+ if (!actualTables.containsAll(expectedTables)) {
+ logger.warn("${db_name} not ready yet, missing tables:
${expectedTables - actualTables}")
+ return false
+ }
+ return true
+ } catch (Exception e) {
+ logger.warn("waiting for ${db_name} tables to be visible:
${e.getMessage()}")
+ return false
+ }
+ }
sql """use `${catalog_name}`.`${db_name}`;"""
-
+
def test_runtime_filter_partition_pruning = {
qt_runtime_filter_partition_pruning_decimal1 """
select count(*) from decimal_partitioned where partition_key =
diff --git
a/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy
b/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy
index 07c271bec47..30a4c39785a 100644
---
a/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy
+++
b/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy
@@ -152,16 +152,31 @@ PROPERTIES (
);
"""
+ // The overwrite must FAIL because the source rows (dt='20241128') fall
into a
+ // partition that does not exist in fail_tag and
enable_auto_create_when_overwrite=false.
+ // With multiple parallel sink instances this single logical failure can
surface as any
+ // of several equivalent messages (the semantic "Cannot found origin
partitions" error and
+ // the collateral "no partition for this tuple" / strict-mode
filtered-data error race to be
+ // reported). Accept all of them, same as
insert_overwrite_auto_detect.groovy. The point is
+ // that the statement fails (not silently creating a partition /
succeeding).
+ def checkOverwriteFail = { result, exception, startTime, endTime ->
+ assertTrue(exception != null && (
+ exception.getMessage().contains('Cannot found origin
partitions')
+ || exception.getMessage().contains('no partition for this
tuple')
+ || exception.getMessage().contains('Insert has filtered data
in strict mode')),
+ "expect insert-overwrite auto-detect to fail (no matching origin
partition), "
+ + "but got result=${result},
exception=${exception?.getMessage()}")
+ }
test {
sql "insert overwrite table fail_tag PARTITION(*) select
qsrq,lsh,wth,khh,dt from fail_src where dt='20241128';"
- exception "Cannot found origin partitions"
+ check checkOverwriteFail
}
test {
sql "insert overwrite table fail_tag PARTITION(*) select
qsrq,lsh,wth,khh,dt from fail_src where dt='20241128';"
- exception "Cannot found origin partitions"
+ check checkOverwriteFail
}
test {
sql "insert overwrite table fail_tag PARTITION(*) select
qsrq,lsh,wth,khh,dt from fail_src where dt='20241128';"
- exception "Cannot found origin partitions"
+ check checkOverwriteFail
}
}
diff --git a/regression-test/suites/query_profile/s3_load_profile_test.groovy
b/regression-test/suites/query_profile/s3_load_profile_test.groovy
index 9692efde734..6faaf369a12 100644
--- a/regression-test/suites/query_profile/s3_load_profile_test.groovy
+++ b/regression-test/suites/query_profile/s3_load_profile_test.groovy
@@ -17,6 +17,8 @@
import groovy.json.JsonSlurper
import org.apache.doris.regression.action.ProfileAction
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
def getProfile = { masterHTTPAddr, id ->
def dst = 'http://' + masterHTTPAddr
@@ -197,7 +199,20 @@ PROPERTIES (
def masterAddress = masterIP + ":" + masterHTTPPort
logger.info("masterIP:masterHTTPPort is:${masterAddress}")
- def profileString = getProfile(masterAddress, jobId.toString())
+ // The BE reports the detailed execution profile to FE asynchronously,
after
+ // the load job has already finished (the Summary section is pushed
+ // synchronously on txn VISIBLE, but the Fragments/operators are filled
only
+ // when the coordinator BE's report lands). Fetching too early yields a
+ // profile whose MergedProfile carries no operators, so the scan counters
+ // are not present yet. Poll until they show up, i.e. the execution profile
+ // has landed; the await times out and fails loudly if it never does.
+ def profileString = ""
+ Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until {
+ profileString = getProfile(masterAddress, jobId.toString())
+ return profileString.contains("NumScanners") &&
+ profileString.contains("RowsProduced") &&
+ profileString.contains("RowsRead")
+ }
logger.info("profileDataString:" + profileString)
assertTrue(profileString.contains("NumScanners"))
assertTrue(profileString.contains("RowsProduced"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]