This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 47b37d4b772 branch-3.1: [fix](editlog) add two inject case for batch
editlog #53943 (#54489)
47b37d4b772 is described below
commit 47b37d4b7726848bfbcda963289a42f8111f4e8c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Aug 9 14:02:55 2025 +0800
branch-3.1: [fix](editlog) add two inject case for batch editlog #53943
(#54489)
Cherry-picked from #53943
Co-authored-by: Yongqiang YANG <[email protected]>
---
.../java/org/apache/doris/persist/EditLog.java | 5 +-
.../doris/transaction/PublishVersionDaemon.java | 7 ++-
.../test_binlog_config_change.groovy | 2 +
.../test_flush_editlog_exception.groovy | 70 ++++++++++++++++++++++
.../transaction/test_publish_exception.groovy | 64 ++++++++++++++++++++
5 files changed, 146 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index e22aac264fe..d6daade3d4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -193,6 +193,10 @@ public class EditLog {
int itemNum = Math.max(1,
Math.min(Config.batch_edit_log_max_item_num, batch.size()));
JournalBatch journalBatch = new JournalBatch(itemNum);
+ if (DebugPointUtil.isEnable("EditLog.flushEditLog.exception")) {
+ // For debug purpose, throw an exception to test the edit log
flush
+ throw new RuntimeException("EditLog.flushEditLog.exception");
+ }
// Array to record pairs of logId and num
List<long[]> logIdNumPairs = new ArrayList<>();
for (EditLogItem req : batch) {
@@ -230,7 +234,6 @@ public class EditLog {
}
}
}
-
} catch (Throwable t) {
// Throwable contains all Exception and Error, such as IOException
and
// OutOfMemoryError
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index f22bfe4a60d..01d0197ac37 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -248,10 +248,11 @@ public class PublishVersionDaemon extends MasterDaemon {
dbExecutors.get((int) (transactionState.getDbId() %
Config.publish_thread_pool_num)).execute(() -> {
try {
tryFinishTxnSync(transactionState, globalTransactionMgr);
-
publishingTxnIds.remove(transactionState.getTransactionId());
} catch (Throwable e) {
LOG.warn("failed to finish dbId: {}, txnId: {}",
transactionState.getDbId(),
transactionState.getTransactionId(), e);
+ } finally {
+
publishingTxnIds.remove(transactionState.getTransactionId());
}
});
} catch (Throwable e) {
@@ -263,6 +264,10 @@ public class PublishVersionDaemon extends MasterDaemon {
}
private void tryFinishTxnSync(TransactionState transactionState,
GlobalTransactionMgrIface globalTransactionMgr) {
+ if
(DebugPointUtil.isEnable("PublishVersionDaemon.tryFinishTxnSync.fail")) {
+ throw new RuntimeException("finishTransaction failed for txnId: "
+ transactionState.getTransactionId());
+ }
+
try {
partitionVisibleVersions = Maps.newHashMap();
backendPartitions = Maps.newHashMap();
diff --git
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
index bde2f357005..787075d9aa6 100644
---
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
+++
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+import org.apache.doris.regression.suite.ClusterOptions
+
suite("test_binlog_config_change_index") {
def syncer = getSyncer()
diff --git
a/regression-test/suites/fault_injection_p0/test_flush_editlog_exception.groovy
b/regression-test/suites/fault_injection_p0/test_flush_editlog_exception.groovy
new file mode 100644
index 00000000000..43196693890
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_flush_editlog_exception.groovy
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_flush_editlog_exception", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+
+ docker(options) {
+
+ def tableName = "test_flush_editlog_exception"
+ // test txn X inverted index
+ sql "DROP TABLE IF EXISTS ${tableName}"
+
+
GetDebugPoint().enableDebugPointForAllFEs('EditLog.flushEditLog.exception')
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `user_id` bigint default 999,
+ `group_id` bigint,
+ `id` bigint,
+ `vv` variant,
+ INDEX idx_col1 (user_id) USING INVERTED
+ ) ENGINE=OLAP
+ UNIQUE KEY(user_id, group_id)
+ DISTRIBUTED BY HASH (user_id) BUCKETS 1
+ PROPERTIES(
+ "store_row_column" = "true",
+ "replication_num" = "1"
+ );
+ """
+
+ // Wait until SELECT returns error (i.e., Doris FE process exits due
to flushEditLog exception)
+ int maxWaitMs = 10000
+ int intervalMs = 200
+ long startTime = System.currentTimeMillis()
+ boolean gotError = false
+ while (true) {
+ try {
+ sql "SELECT * from ${tableName}"
+ } catch (Exception e) {
+ gotError = true
+ break
+ }
+ if (System.currentTimeMillis() - startTime > maxWaitMs) {
+ throw new IllegalStateException("Timeout waiting for SELECT to
return error")
+ }
+ Thread.sleep(intervalMs)
+ }
+ assert gotError
+ }
+}
diff --git
a/regression-test/suites/insert_p0/transaction/test_publish_exception.groovy
b/regression-test/suites/insert_p0/transaction/test_publish_exception.groovy
new file mode 100644
index 00000000000..873c22bceb1
--- /dev/null
+++ b/regression-test/suites/insert_p0/transaction/test_publish_exception.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_publish_exception", "nonConcurrent") {
+ def tableName = "test_publish_exception"
+ // test txn X inverted index
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `user_id` bigint default 999,
+ `group_id` bigint,
+ `id` bigint,
+ `vv` variant,
+ INDEX idx_col1 (user_id) USING INVERTED
+ ) ENGINE=OLAP
+ UNIQUE KEY(user_id, group_id)
+ DISTRIBUTED BY HASH (user_id) BUCKETS 1
+ PROPERTIES(
+ "store_row_column" = "true",
+ "replication_num" = "1"
+ );
+ """
+
DebugPoint.enableDebugPointForAllFEs('PublishVersionDaemon.tryFinishTxnSync.fail')
+ sql "begin"
+ sql """insert into ${tableName}
values(1,1,5,'{"b":"b"}'),(1,1,4,'{"b":"b"}'),(1,1,3,'{"b":"b"}')"""
+
+ test {
+ sql "commit"
+ exception "transaction commit successfully, BUT data will be visible
later"
+ }
+
+ def result = sql "SELECT * from ${tableName}"
+ assert result.size() == 0
+
+
DebugPoint.disableDebugPointForAllFEs('PublishVersionDaemon.tryFinishTxnSync.fail')
+ // Wait until the result size is 1 or timeout (10s)
+ int maxWaitMs = 10000
+ int intervalMs = 200
+ long startTime = System.currentTimeMillis()
+ while (true) {
+ def result2 = sql "SELECT * from ${tableName}"
+ if (result2.size() == 1) {
+ break
+ }
+ if (System.currentTimeMillis() - startTime > maxWaitMs) {
+ throw new IllegalStateException("Timeout waiting for result.size()
== 1")
+ }
+ Thread.sleep(intervalMs)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]