This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 385589c92b1 [fix](regression)Fix unstable compaction related cases
(#46920) (#47003)
385589c92b1 is described below
commit 385589c92b1920c41394a1e0b6f35913d107c9d3
Author: qiye <[email protected]>
AuthorDate: Wed Jan 15 13:42:04 2025 +0800
[fix](regression)Fix unstable compaction related cases (#46920) (#47003)
bp #46920
---
regression-test/plugins/plugin_compaction.groovy | 10 ++-
...st_skip_index_compaction_fault_injection.groovy | 86 ++--------------------
..._index_change_with_cumulative_compaction.groovy | 33 +++++----
3 files changed, 33 insertions(+), 96 deletions(-)
diff --git a/regression-test/plugins/plugin_compaction.groovy
b/regression-test/plugins/plugin_compaction.groovy
index eaefa9a10d3..45dd99a97a3 100644
--- a/regression-test/plugins/plugin_compaction.groovy
+++ b/regression-test/plugins/plugin_compaction.groovy
@@ -57,8 +57,7 @@ Suite.metaClass.be_run_full_compaction_by_table_id = { String
ip, String port, S
}
logger.info("Added 'be_run_full_compaction' function to Suite")
-
-Suite.metaClass.trigger_and_wait_compaction = { String table_name, String
compaction_type, int timeout_seconds=300 ->
+Suite.metaClass.trigger_and_wait_compaction = { String table_name, String
compaction_type, int timeout_seconds=300, String[] ignored_errors=[] ->
if (!(compaction_type in ["cumulative", "base", "full"])) {
throw new IllegalArgumentException("invalid compaction type:
${compaction_type}, supported types: cumulative, base, full")
}
@@ -102,12 +101,15 @@ Suite.metaClass.trigger_and_wait_compaction = { String
table_name, String compac
assert exit_code == 0: "trigger compaction failed, exit code:
${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def trigger_status = parseJson(stdout.trim())
if (trigger_status.status.toLowerCase() != "success") {
- if (trigger_status.status.toLowerCase() == "already_exist") {
+ def status_lower = trigger_status.status.toLowerCase()
+ if (status_lower == "already_exist") {
triggered_tablets.add(tablet) // compaction already in queue,
treat it as successfully triggered
} else if (!auto_compaction_disabled) {
// ignore the error if auto compaction enabled
- } else if (trigger_status.status.contains("E-2000")) {
+ } else if (status_lower.contains("e-2000")) {
// ignore this tablet compaction.
+ } else if (ignored_errors.any { error ->
status_lower.contains(error.toLowerCase()) }) {
+ // ignore this tablet compaction if the error is in the
ignored_errors list
} else {
throw new Exception("trigger compaction failed, be host:
${be_host}, tablet id: ${tablet.TabletId}, status: ${trigger_status.status}")
}
diff --git
a/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
index 52d8d481c8d..ee18ffb7304 100644
---
a/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
@@ -19,8 +19,8 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_skip_index_compaction_fault_injection", "nonConcurrent") {
def isCloudMode = isCloudMode()
+ // branch-2.1 only support index compaction with index_format_v1
def tableName1 = "test_skip_index_compaction_fault_injection_1"
- def tableName2 = "test_skip_index_compaction_fault_injection_2"
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
@@ -46,29 +46,8 @@ suite("test_skip_index_compaction_fault_injection",
"nonConcurrent") {
);
"""
- sql "DROP TABLE IF EXISTS ${tableName2}"
- sql """
- CREATE TABLE ${tableName2} (
- `@timestamp` int(11) NULL COMMENT "",
- `clientip` varchar(20) NULL COMMENT "",
- `request` text NULL COMMENT "",
- `status` int(11) NULL COMMENT "",
- `size` int(11) NULL COMMENT "",
- INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
- INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT ''
- ) ENGINE=OLAP
- DUPLICATE KEY(`@timestamp`)
- COMMENT "OLAP"
- DISTRIBUTED BY RANDOM BUCKETS 1
- PROPERTIES (
- "replication_allocation" = "tag.location.default: 1",
- "disable_auto_compaction" = "true",
- "inverted_index_storage_format" = "V2"
- );
- """
-
boolean disableAutoCompaction = false
-
+
def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
@@ -76,49 +55,6 @@ suite("test_skip_index_compaction_fault_injection",
"nonConcurrent") {
}
}
- def trigger_full_compaction_on_tablets = { tablets ->
- for (def tablet : tablets) {
- String tablet_id = tablet.TabletId
- String backend_id = tablet.BackendId
- int times = 1
-
- String compactionStatus;
- do{
- def (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
- logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
- ++times
- sleep(2000)
- compactionStatus = parseJson(out.trim()).status.toLowerCase();
- } while (compactionStatus!="success" && times<=10 &&
compactionStatus!="e-6010")
-
-
- if (compactionStatus == "fail") {
- assertEquals(disableAutoCompaction, false)
- logger.info("Compaction was done automatically!")
- }
- if (disableAutoCompaction && compactionStatus!="e-6010") {
- assertEquals("success", compactionStatus)
- }
- }
- }
-
- def wait_full_compaction_done = { tablets ->
- for (def tablet in tablets) {
- boolean running = true
- do {
- Thread.sleep(1000)
- String tablet_id = tablet.TabletId
- String backend_id = tablet.BackendId
- def (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
- logger.info("Get compaction status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def compactionStatus = parseJson(out.trim())
- assertEquals("success", compactionStatus.status.toLowerCase())
- running = compactionStatus.run_status
- } while (running)
- }
- }
-
def get_rowset_count = { tablets ->
int rowsetCount = 0
for (def tablet in tablets) {
@@ -148,7 +84,7 @@ suite("test_skip_index_compaction_fault_injection",
"nonConcurrent") {
}
}
- def run_test = { tableName ->
+ def run_test = { tableName ->
sql """ INSERT INTO ${tableName} VALUES (1, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (2, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (3, "40.135.0.0", "GET
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
@@ -178,15 +114,13 @@ suite("test_skip_index_compaction_fault_injection",
"nonConcurrent") {
assert (rowsetCount == 11 * replicaNum)
// first
- trigger_full_compaction_on_tablets.call(tablets)
- wait_full_compaction_done.call(tablets)
+ trigger_and_wait_compaction(tableName, "full", 300, new String[]{"e-6010"})
rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 11 * replicaNum)
// second
- trigger_full_compaction_on_tablets.call(tablets)
- wait_full_compaction_done.call(tablets)
+ trigger_and_wait_compaction(tableName, "full", 300, new String[]{"e-6010"})
rowsetCount = get_rowset_count.call(tablets);
if (isCloudMode) {
@@ -202,7 +136,7 @@ suite("test_skip_index_compaction_fault_injection",
"nonConcurrent") {
String backend_id;
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
-
+
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@@ -226,15 +160,9 @@ suite("test_skip_index_compaction_fault_injection",
"nonConcurrent") {
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_reader")
}
- // try {
- //
GetDebugPoint().enableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
- // run_test.call(tableName2)
- // } finally {
- //
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
- // }
} finally {
if (has_update_be_config) {
set_be_config.call("inverted_index_compaction_enable",
invertedIndexCompactionEnable.toString())
}
}
-}
\ No newline at end of file
+}
diff --git
a/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
b/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
index ce12d1ede0c..4276edfdedf 100644
---
a/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
+++
b/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
@@ -40,21 +40,25 @@ suite("test_index_change_with_cumulative_compaction",
"nonConcurrent") {
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish
timeout")
}
- def trigger_compaction_with_retry = {table_name, compaction_type =
"cumulative", max_retries = 10, delay_ms = 2000 ->
- def retry_count = 0
- while (true) {
- try {
- trigger_and_wait_compaction(table_name, compaction_type)
- return // Success
- } catch (Exception e) {
- retry_count++
- if (retry_count >= max_retries) {
- throw new Exception("Failed to complete ${compaction_type}
compaction after ${max_retries} attempts", e)
+ def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW BUILD INDEX WHERE TableName =
"${table_name}";"""
+ def expected_finished_num = alter_res.size();
+ def finished_num = 0;
+ for (int i = 0; i < expected_finished_num; i++) {
+ logger.info(table_name + " build index job state: " +
alter_res[i][7] + i)
+ if (alter_res[i][7] == "FINISHED") {
+ ++finished_num;
}
- logger.warn("Compaction attempt ${retry_count} failed:
${e.getMessage()}")
- Thread.sleep(delay_ms)
}
+ if (finished_num == expected_finished_num) {
+ logger.info(table_name + " all build index jobs finished,
detail: " + alter_res)
+ break
+ }
+ useTime = t
+ sleep(delta_time)
}
+ assertTrue(useTime <= OpTimeout,
"wait_for_latest_build_index_on_partition_finish timeout")
}
try {
@@ -162,12 +166,15 @@ suite("test_index_change_with_cumulative_compaction",
"nonConcurrent") {
// build index
if (!isCloudMode()) {
sql "build index idx_user_id on ${tableName}"
+ wait_for_build_index_on_partition_finish(tableName, timeout)
sql "build index idx_date on ${tableName}"
+ wait_for_build_index_on_partition_finish(tableName, timeout)
sql "build index idx_city on ${tableName}"
+ wait_for_build_index_on_partition_finish(tableName, timeout)
}
// trigger compactions for all tablets in ${tableName}
- trigger_compaction_with_retry(tableName, "cumulative")
+ trigger_and_wait_compaction(tableName, "cumulative")
int rowCount = 0
for (def tablet in tablets) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]