This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 75b57f30d78 [fix](group commit) fix repaly wal check label status
(#38883)
75b57f30d78 is described below
commit 75b57f30d78c2bfe83dbbb1e3475250e69d10819
Author: meiyi <[email protected]>
AuthorDate: Wed Aug 7 09:56:15 2024 +0800
[fix](group commit) fix repaly wal check label status (#38883)
## Proposed changes
When replay wal, it firstly abort the txn with the label but does not
check the abort result.
And when begin txn of replay, if FE returns `LabelAlreadyUsedException`,
it consider the load is success in previous group commit load or repaly
wal, and delete this wal directly.
But `LabelAlreadyUsedException` means there is a txn with this label,
the txn may be in `PREPARE / RUNNING / COMMITTED / VISIBLE` status(the
abort txn in first step may fail), so replay wal should check both
`LabelAlreadyUsedException` and txn status is `COMMITTED / VISIBLE`.
This pr also add a case for replay wal with schema change.
---
be/src/olap/wal/wal_table.cpp | 10 ++-
be/src/runtime/group_commit_mgr.cpp | 4 +
.../doris/common/LabelAlreadyUsedException.java | 3 +-
.../apache/doris/service/FrontendServiceImpl.java | 3 +
.../group_commit/group_commit_wal_msg.csv | 5 ++
.../group_commit/replay_wal_restart_fe.groovy | 80 ++++++++++++++++++
.../test_group_commit_replay_wal.groovy | 96 ++++++++++++++++++++++
7 files changed, 197 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 5f1ade097d2..0de5ee42488 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -91,7 +91,8 @@ Status WalTable::_relay_wal_one_by_one() {
auto msg = st.msg();
if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
st.is<ErrorCode::NOT_FOUND>() ||
st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
- msg.find("LabelAlreadyUsedException") != msg.npos) {
+ (msg.find("LabelAlreadyUsedException") != msg.npos &&
+ (msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") !=
msg.npos))) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
// delete wal
@@ -163,8 +164,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string&
label) {
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
request.__set_label(label);
- std::string reason = "relay wal with label " + label;
- request.__set_reason(reason);
+ request.__set_reason("relay wal with label " + label);
TLoadTxnRollbackResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -192,6 +192,10 @@ Status WalTable::_replay_wal_internal(const std::string&
wal) {
[[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
}
#endif
+ DBUG_EXECUTE_IF("WalTable.replay_wals.stop", {
+ // LOG(INFO) << "WalTable.replay_wals.stop";
+ return Status::InternalError("WalTable.replay_wals.stop");
+ });
return _replay_one_wal_with_streamload(wal_id, wal, label);
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 6a6061d42cf..e60c3804246 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -435,6 +435,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
Status result_status;
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{ status = Status::InternalError(""); });
+ DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error",
+ { status = Status::InternalError("load_error"); });
if (status.ok()) {
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error",
{ status = Status::InternalError(""); });
@@ -476,6 +478,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
.error(result_status);
retry_times++;
}
+
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error",
+ { result_status =
Status::InternalError("commit_success_and_rpc_error"); });
} else {
// abort txn
TLoadTxnRollbackRequest request;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
index d739f2032f3..8c508809d59 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
@@ -38,7 +38,8 @@ public class LabelAlreadyUsedException extends DdlException {
}
public LabelAlreadyUsedException(TransactionState txn) {
- super("Label [" + txn.getLabel() + "] has already been used, relate to
txn [" + txn.getTransactionId() + "]");
+ super("Label [" + txn.getLabel() + "] has already been used, relate to
txn [" + txn.getTransactionId()
+ + "], status [" + txn.getTransactionStatus() + "].");
switch (txn.getTransactionStatus()) {
case UNKNOWN:
case PREPARE:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 56d2cb01e6a..88925770640 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1842,6 +1842,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
try {
+ if
(DebugPointUtil.isEnable("FrontendServiceImpl.loadTxnRollback.error")) {
+ throw new
UserException("FrontendServiceImpl.loadTxnRollback.error");
+ }
loadTxnRollbackImpl(request);
} catch (MetaNotFoundException e) {
LOG.warn("failed to rollback txn, id: {}, label: {}",
request.getTxnId(), request.getLabel(), e);
diff --git
a/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv
b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv
new file mode 100644
index 00000000000..6ab7bd6bcdf
--- /dev/null
+++ b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv
@@ -0,0 +1,5 @@
+1,1
+2,2
+3,3
+4,4
+5,5
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy
b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy
new file mode 100644
index 00000000000..d39bdd9d4a9
--- /dev/null
+++ b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("replay_wal_restart_fe") {
+ def check_schema_change = { state ->
+ for (int i = 0; i < 30; i++) {
+ def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName = 'tbl_2' order by CreateTime desc;"
+ assertTrue(jobs.size() >= 1)
+ logger.info("alter job: ${jobs[0]}")
+ if (jobs[0].State == state) {
+ break
+ }
+ sleep(1000)
+ }
+ }
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.feConfigs.add('sys_log_verbose_modules=org.apache.doris')
+ options.beConfigs.add('sys_log_verbose_modules=*')
+ options.beConfigs.add('enable_java_support=false')
+ docker(options) {
+ def result = sql 'SELECT DATABASE()'
+
+ // group commit load error and stop replay
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
+ GetDebugPoint().enableDebugPointForAllBEs("WalTable.replay_wals.stop")
+
+ // 1 wal need to replay
+ sql 'CREATE TABLE tbl_2 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1)
BUCKETS 1 PROPERTIES ( "replication_num" = "1",
"group_commit_interval_ms"="1000")'
+ sql 'SET GROUP_COMMIT = ASYNC_MODE'
+ sql 'INSERT INTO tbl_2 VALUES (1, 2)'
+
+ // do schema change
+ sql 'ALTER TABLE tbl_2 ORDER BY (k2, k1)'
+ check_schema_change('RUNNING')
+
+ // stop be, restart fe, start be
+ cluster.stopBackends()
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+ check_schema_change('RUNNING')
+ cluster.startBackends()
+
+ // check schema change status and row count
+ check_schema_change('FINISHED')
+ for (int i = 0; i < 30; i++) {
+ result = sql "select count(*) from tbl_2"
+ logger.info("rowCount: ${result}")
+ if (result[0][0] >= 1) {
+ break
+ }
+ sleep(1000)
+ }
+ order_qt_select_1 'SELECT * FROM tbl_2'
+ }
+}
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
new file mode 100644
index 00000000000..da7bcadc11a
--- /dev/null
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_group_commit_replay_wal", "nonConcurrent") {
+ def tableName = "test_group_commit_replay_wal"
+
+ def getRowCount = { expectedRowCount ->
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+ {
+ def result = sql "select count(*) from ${tableName}"
+ logger.info("table: ${tableName}, rowCount: ${result}")
+ return result[0][0] == expectedRowCount
+ }
+ )
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k`)
+ BUCKETS 5
+ properties("replication_num" = "1", "group_commit_interval_ms"="2000")
+ """
+
+ // 1. load success but commit rpc timeout
+ // 2. should skip replay because of fe throw LabelAlreadyUsedException and
txn status is VISIBLE
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error")
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'group_commit', 'async_mode'
+ unset 'label'
+ file 'group_commit_wal_msg.csv'
+ time 10000
+ }
+ getRowCount(5)
+ // check wal count is 0
+ sleep(5000)
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(false)
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ // load fail and abort fail, wal should not be deleted and retry
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
+
GetDebugPoint().enableDebugPointForAllFEs("FrontendServiceImpl.loadTxnRollback.error")
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'group_commit', 'async_mode'
+ unset 'label'
+ file 'group_commit_wal_msg.csv'
+ time 10000
+ }
+ getRowCount(5)
+ sleep(10000) // wal replay but all failed
+ getRowCount(5)
+ // check wal count is 1
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ getRowCount(10)
+ // check wal count is 0
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(false)
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]