This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f53cf052d84 [fix](group commit) Fix
test_group_commit_async_wal_msg_fault_injection case (#35313)
f53cf052d84 is described below
commit f53cf052d84335c09b2680259b1366343fc90c9a
Author: meiyi <[email protected]>
AuthorDate: Fri May 24 09:57:24 2024 +0800
[fix](group commit) Fix test_group_commit_async_wal_msg_fault_injection
case (#35313)
---
.../exec/group_commit_block_sink_operator.cpp | 27 ++++++++++++----------
.../exec/group_commit_block_sink_operator.h | 1 +
be/src/runtime/group_commit_mgr.cpp | 10 ++++----
be/src/vec/sink/group_commit_block_sink.cpp | 26 +++++++++++----------
...oup_commit_async_wal_msg_fault_injection.groovy | 12 ++++++----
5 files changed, 43 insertions(+), 33 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 4e9969d9570..a4d5270de2f 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -36,6 +36,7 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState*
state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
+ _table_id = p._table_id;
_group_commit_mode = p._group_commit_mode;
_vpartition = std::make_unique<doris::VOlapTablePartitionParam>(p._schema,
p._partition);
RETURN_IF_ERROR(_vpartition->init());
@@ -211,18 +212,20 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
_is_block_appended = true;
_blocks.clear();
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
{
- if (_load_block_queue) {
- _remove_estimated_wal_bytes();
- _load_block_queue->remove_load_id(p._load_id);
- }
- if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
- std ::chrono ::seconds(60)) == std ::future_status
::ready) {
- auto st =
ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
- ExecEnv::GetInstance()->group_commit_mgr()->debug_promise =
std::promise<Status>();
- ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
-
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
- LOG(INFO) << "debug future output: " << st.to_string();
- RETURN_IF_ERROR(st);
+ if (dp->param<int64_t>("table_id", -1) == _table_id) {
+ if (_load_block_queue) {
+ _remove_estimated_wal_bytes();
+ _load_block_queue->remove_load_id(p._load_id);
+ }
+ if
(ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
+ std ::chrono ::seconds(60)) == std ::future_status
::ready) {
+ auto st =
ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
+ ExecEnv::GetInstance()->group_commit_mgr()->debug_promise =
std::promise<Status>();
+ ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
+
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
+ LOG(INFO) << "debug future output: " << st.to_string();
+ RETURN_IF_ERROR(st);
+ }
}
});
return Status::OK();
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index be0426d1ebf..1b9103c0e8e 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -60,6 +60,7 @@ private:
size_t _estimated_wal_bytes = 0;
TGroupCommitMode::type _group_commit_mode;
Bitmap _filter_bitmap;
+ int64_t _table_id;
};
class GroupCommitBlockSinkOperatorX final
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 5b369f3ece0..d740b7fd92f 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -447,10 +447,12 @@ Status
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
}
LOG(INFO) << ss.str();
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
{
- std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string();
- LOG(INFO) << "debug promise set: " << msg;
- ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
- Status ::InternalError(msg));
+ if (dp->param<int64_t>("table_id", -1) == table_id) {
+ std ::string msg =
_exec_env->wal_mgr()->get_wal_dirs_info_string();
+ LOG(INFO) << "table_id" << std::to_string(table_id) << " set debug
promise: " << msg;
+
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
+ Status ::InternalError(msg));
+ }
};);
return st;
}
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index 8aa60bb3f22..87f1c9cb2c4 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -276,18 +276,20 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState*
state,
_is_block_appended = true;
_blocks.clear();
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
{
- if (_load_block_queue) {
- _remove_estimated_wal_bytes();
- _load_block_queue->remove_load_id(_load_id);
- }
- if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
- std ::chrono ::seconds(60)) == std ::future_status
::ready) {
- auto st =
ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
- ExecEnv::GetInstance()->group_commit_mgr()->debug_promise =
std::promise<Status>();
- ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
-
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
- LOG(INFO) << "debug future output: " << st.to_string();
- RETURN_IF_ERROR(st);
+ if (dp->param<int64_t>("table_id", -1) == _table_id) {
+ if (_load_block_queue) {
+ _remove_estimated_wal_bytes();
+ _load_block_queue->remove_load_id(_load_id);
+ }
+ if
(ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
+ std ::chrono ::seconds(60)) == std ::future_status
::ready) {
+ auto st =
ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
+ ExecEnv::GetInstance()->group_commit_mgr()->debug_promise =
std::promise<Status>();
+ ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
+
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
+ LOG(INFO) << "debug future output: " << st.to_string();
+ RETURN_IF_ERROR(st);
+ }
}
});
return Status::OK();
diff --git
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
index c2523c49092..97e36c41a65 100644
---
a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy
@@ -16,8 +16,7 @@
// under the License.
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
-
-
+ def dbName = "regression_test_fault_injection_p0"
def tableName = "wal_test"
// test successful group commit async load
@@ -34,10 +33,11 @@
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
"""
GetDebugPoint().clearDebugPointsForAllBEs()
+ def tableId = getTableId(dbName, tableName)
def exception = false;
try {
-
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
[table_id:"${tableId}"])
streamLoad {
table "${tableName}"
set 'column_separator', ','
@@ -70,10 +70,11 @@
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
"""
GetDebugPoint().clearDebugPointsForAllBEs()
+ tableId = getTableId(dbName, tableName)
exception = false;
try {
-
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
[table_id:"${tableId}"])
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_st")
streamLoad {
table "${tableName}"
@@ -108,10 +109,11 @@
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
"""
GetDebugPoint().clearDebugPointsForAllBEs()
+ tableId = getTableId(dbName, tableName)
exception = false;
try {
-
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg",
[table_id:"${tableId}"])
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
streamLoad {
table "${tableName}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]