This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1ae23a32063 [Fix](group commit) Fix wal mem back pressure fault
injection case (#29493)
1ae23a32063 is described below
commit 1ae23a32063818212cd59fe05cfc62b2ea36c898
Author: abmdocrt <[email protected]>
AuthorDate: Sat Jan 6 18:39:26 2024 +0800
[Fix](group commit) Fix wal mem back pressure fault injection case (#29493)
---
be/src/runtime/group_commit_mgr.cpp | 2 +-
...st_wal_mem_back_pressure_fault_injection.groovy | 112 +++++++++------------
2 files changed, 49 insertions(+), 65 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 1bf189c384d..2838ebbed51 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -34,7 +34,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
RETURN_IF_ERROR(status);
auto start = std::chrono::steady_clock::now();
while (!runtime_state->is_cancelled() && status.ok() &&
- _all_block_queues_bytes->load(std::memory_order_relaxed) >
+ _all_block_queues_bytes->load(std::memory_order_relaxed) >=
config::group_commit_queue_mem_limit) {
_put_cond.wait_for(l,
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
diff --git
a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
index 473abb6bb2a..c3cfe5fa0c7 100644
---
a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
@@ -20,53 +20,17 @@
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
def tableName = "wal_test"
sql """ DROP TABLE IF EXISTS ${tableName} """
- sql """
- CREATE TABLE IF NOT EXISTS `wal_baseall` (
- `k0` boolean null comment "",
- `k1` tinyint(4) null comment "",
- `k2` smallint(6) null comment "",
- `k3` int(11) null comment "",
- `k4` bigint(20) null comment "",
- `k5` decimal(9, 3) null comment "",
- `k6` char(5) null comment "",
- `k10` date null comment "",
- `k11` datetime null comment "",
- `k7` varchar(20) null comment "",
- `k8` double max null comment "",
- `k9` float sum null comment "",
- `k12` string replace null comment "",
- `k13` largeint(40) replace null comment ""
- ) engine=olap
- DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
- """
sql """
- CREATE TABLE IF NOT EXISTS `wal_test` (
- `k0` boolean null comment "",
- `k1` tinyint(4) null comment "",
- `k2` smallint(6) null comment "",
- `k3` int(11) null comment "",
- `k4` bigint(20) null comment "",
- `k5` decimal(9, 3) null comment "",
- `k6` char(5) null comment "",
- `k10` date null comment "",
- `k11` datetime null comment "",
- `k7` varchar(20) null comment "",
- `k8` double max null comment "",
- `k9` float sum null comment "",
- `k12` string replace_if_not_null null comment "",
- `k13` largeint(40) replace null comment ""
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
) engine=olap
- DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ DISTRIBUTED BY HASH(`k`)
+ BUCKETS 5
+ properties("replication_num" = "1")
"""
- streamLoad {
- table "wal_baseall"
- db "regression_test_fault_injection_p0"
- set 'column_separator', ','
- file "baseall.txt"
- }
-
def enable_back_pressure = {
try {
def fes = sql_return_maparray "show frontends"
@@ -76,14 +40,23 @@
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
def be = bes[0]
def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
logger.info("observer url: " + url)
- StringBuilder sb = new StringBuilder();
- sb.append("curl -X POST
http://${fe.Host}:${fe.HttpPort}")
- sb.append("/rest/v2/manager/node/set_config/be")
- sb.append(" -H \"Content-Type: application/json\" -H
\"Authorization: Basic cm9vdDo= \"")
- sb.append(""" -d
\"{\\"group_commit_queue_mem_limit\\": {\\"node\\":
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\":
\\"false\\"}}\"""")
- String command = sb.toString()
- logger.info(command)
- def process = command.execute()
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H
\"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\":
{\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\":
\\"0\\",\\"persist\\": \\"false\\"}}\"""")
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+
+ sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H
\"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d
\"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\":
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\":
\\"false\\"}}\"""")
+ command = sb.toString()
+ logger.info(command)
+ process = command.execute()
} finally {
}
}
@@ -97,14 +70,23 @@
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
def be = bes[0]
def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
logger.info("observer url: " + url)
- StringBuilder sb = new StringBuilder();
- sb.append("curl -X POST
http://${fe.Host}:${fe.HttpPort}")
- sb.append("/rest/v2/manager/node/set_config/be")
- sb.append(" -H \"Content-Type: application/json\" -H
\"Authorization: Basic cm9vdDo= \"")
- sb.append(""" -d
\"{\\"group_commit_queue_mem_limit\\": {\\"node\\":
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\":
\\"false\\"}}\"""")
- String command = sb.toString()
- logger.info(command)
- def process = command.execute()
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H
\"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\":
{\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\":
\\"67108864\\",\\"persist\\": \\"false\\"}}\"""")
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+
+ sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H
\"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d
\"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\":
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"10000\\",\\"persist\\":
\\"false\\"}}\"""")
+ command = sb.toString()
+ logger.info(command)
+ process = command.execute()
} finally {
}
}
@@ -114,28 +96,30 @@
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
def thread1 = new Thread({
sql """ set group_commit = async_mode; """
try {
- sql """insert into ${tableName} select * from wal_baseall where k1
<= 3"""
+ sql """insert into ${tableName} values(1,1)"""
} catch (Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains('Communications link failure'))
- }
- disable_back_pressure()
- finish = true
+ } finally {
+ finish = true
+ }
})
thread1.start()
- for(int i = 0;i<10;i++){
+ while(!finish){
def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
- if (item[11].toString() == "insert into ${tableName} select * from
wal_baseall where k1 <= 3".toString()){
+ if (item[11].toString() == "".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
+ sleep(1000)
}
+ disable_back_pressure()
thread1.join()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]