This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 198496bcc57 [Enhancement](group commit) Add fault injection case for
group commit
198496bcc57 is described below
commit 198496bcc5764e5d6620f216d847ab7d445650ac
Author: abmdocrt <[email protected]>
AuthorDate: Fri Apr 19 13:05:19 2024 +0800
[Enhancement](group commit) Add fault injection case for group commit
---
be/src/runtime/group_commit_mgr.cpp | 11 +++++-
...oup_commit_async_wal_msg_fault_injection.groovy | 40 +++++++++++++++++++++-
2 files changed, 49 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index ed69a1be550..5ec20c10aef 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -333,6 +333,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
RuntimeState* state) {
Status st;
Status result_status;
+ DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
+ { status = Status::InternalError(""); });
if (status.ok()) {
// commit txn
TLoadTxnCommitRequest request;
@@ -368,6 +370,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
},
10000L);
result_status = Status::create<false>(result.status);
+ DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{
+ std ::string msg = "abort txn";
+ LOG(INFO) << "debug promise set: " << msg;
+
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
+ Status ::InternalError(msg));
+ return status;
+ });
}
std::shared_ptr<LoadBlockQueue> load_block_queue;
{
@@ -392,7 +401,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
// status: exec_plan_fragment result
// st: commit txn rpc status
// result_status: commit txn result
- DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
+ DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st",
{ st = Status::InternalError(""); });
if (status.ok() && st.ok() &&
(result_status.ok() ||
result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
diff --git
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
index cdf537749cc..c2523c49092 100644
---
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
@@ -74,7 +74,7 @@
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
exception = false;
try {
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
-
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
streamLoad {
table "${tableName}"
set 'column_separator', ','
@@ -88,6 +88,44 @@
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains('estimated wal bytes 0 Bytes'))
exception = true;
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
+ assertTrue(exception)
+ }
+
+ // test group commit abort txn
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k`)
+ BUCKETS 5
+ properties("replication_num" = "1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ exception = false;
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'group_commit', 'async_mode'
+ unset 'label'
+ file 'group_commit_wal_msg.csv'
+ time 10000
+ }
+ assertFalse(true);
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains('abort txn'))
+ exception = true;
} finally {
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]