This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 4c9ebbb3b92 [fix](cloud) cloud group commit should skip repaly wal if
label is already used and the txn state is committed or visible (#41262)
(#41461)
4c9ebbb3b92 is described below
commit 4c9ebbb3b92c080491a9be78886601f534d85b1c
Author: meiyi <[email protected]>
AuthorDate: Thu Oct 10 22:27:04 2024 +0800
[fix](cloud) cloud group commit should skip repaly wal if label is already
used and the txn state is committed or visible (#41262) (#41461)
pick https://github.com/apache/doris/pull/41262
---
be/src/olap/wal/wal_table.cpp | 4 +-
.../test_group_commit_replay_wal.groovy | 46 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index e45157626a0..d16daf26375 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -92,8 +92,8 @@ Status WalTable::_relay_wal_one_by_one() {
auto msg = st.msg();
if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
st.is<ErrorCode::NOT_FOUND>() ||
st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
- (msg.find("LabelAlreadyUsedException") != msg.npos &&
- (msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") !=
msg.npos))) {
+ (msg.find("has already been used") != msg.npos &&
+ (msg.find("COMMITTED") != msg.npos || msg.find("VISIBLE") !=
msg.npos))) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
// delete wal
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
index da7bcadc11a..9def3306f7c 100644
---
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
@@ -93,4 +93,50 @@ suite("test_group_commit_replay_wal", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
}
+
+ // check wal count is 0
+ String[][] backends = sql """ show backends """
+ assertTrue(backends.size() > 0)
+ String backendId;
+ def backendIdToBackendIP = [:]
+ def backendIdToBackendBrpcPort = [:]
+ for (String[] backend in backends) {
+ if (backend[9].equals("true")) {
+ backendIdToBackendIP.put(backend[0], backend[1])
+ backendIdToBackendBrpcPort.put(backend[0], backend[5])
+ }
+ }
+
+ backendId = backendIdToBackendIP.keySet()[0]
+ def getMetricsMethod = { check_func ->
+ httpTest {
+ endpoint backendIdToBackendIP.get(backendId) + ":" +
backendIdToBackendBrpcPort.get(backendId)
+ uri "/brpc_metrics"
+ op "get"
+ check check_func
+ }
+ }
+
+ int wal_count = -1
+ for (int i = 0; i < 50; i++) {
+ getMetricsMethod.call() {
+ respCode, body ->
+ logger.info("test wal count resp Code {}",
"${respCode}".toString())
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def strs = out.split('\n')
+ for (String line in strs) {
+ if (line.startsWith("wal_total_count")) {
+ logger.info("find: {}", line)
+ wal_count = line.replaceAll("wal_total_count ",
"").toInteger()
+ break
+ }
+ }
+ }
+ if (wal_count == 0) {
+ break
+ }
+ sleep(2000)
+ }
+ assertEquals(0, wal_count)
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]