This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 47b37d4b772 branch-3.1: [fix](editlog) add two inject case for batch 
editlog #53943 (#54489)
47b37d4b772 is described below

commit 47b37d4b7726848bfbcda963289a42f8111f4e8c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Aug 9 14:02:55 2025 +0800

    branch-3.1: [fix](editlog) add two inject case for batch editlog #53943 
(#54489)
    
    Cherry-picked from #53943
    
    Co-authored-by: Yongqiang YANG <[email protected]>
---
 .../java/org/apache/doris/persist/EditLog.java     |  5 +-
 .../doris/transaction/PublishVersionDaemon.java    |  7 ++-
 .../test_binlog_config_change.groovy               |  2 +
 .../test_flush_editlog_exception.groovy            | 70 ++++++++++++++++++++++
 .../transaction/test_publish_exception.groovy      | 64 ++++++++++++++++++++
 5 files changed, 146 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index e22aac264fe..d6daade3d4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -193,6 +193,10 @@ public class EditLog {
             int itemNum = Math.max(1, 
Math.min(Config.batch_edit_log_max_item_num, batch.size()));
             JournalBatch journalBatch = new JournalBatch(itemNum);
 
+            if (DebugPointUtil.isEnable("EditLog.flushEditLog.exception")) {
+                // For debug purpose, throw an exception to test the edit log 
flush
+                throw new RuntimeException("EditLog.flushEditLog.exception");
+            }
             // Array to record pairs of logId and num
             List<long[]> logIdNumPairs = new ArrayList<>();
             for (EditLogItem req : batch) {
@@ -230,7 +234,6 @@ public class EditLog {
                     }
                 }
             }
-
         } catch (Throwable t) {
             // Throwable contains all Exception and Error, such as IOException 
and
             // OutOfMemoryError
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index f22bfe4a60d..01d0197ac37 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -248,10 +248,11 @@ public class PublishVersionDaemon extends MasterDaemon {
             dbExecutors.get((int) (transactionState.getDbId() % 
Config.publish_thread_pool_num)).execute(() -> {
                 try {
                     tryFinishTxnSync(transactionState, globalTransactionMgr);
-                    
publishingTxnIds.remove(transactionState.getTransactionId());
                 } catch (Throwable e) {
                     LOG.warn("failed to finish dbId: {}, txnId: {}", 
transactionState.getDbId(),
                             transactionState.getTransactionId(), e);
+                } finally {
+                    
publishingTxnIds.remove(transactionState.getTransactionId());
                 }
             });
         } catch (Throwable e) {
@@ -263,6 +264,10 @@ public class PublishVersionDaemon extends MasterDaemon {
     }
 
     private void tryFinishTxnSync(TransactionState transactionState, 
GlobalTransactionMgrIface globalTransactionMgr) {
+        if 
(DebugPointUtil.isEnable("PublishVersionDaemon.tryFinishTxnSync.fail")) {
+            throw new RuntimeException("finishTransaction failed for txnId: " 
+ transactionState.getTransactionId());
+        }
+
         try {
             partitionVisibleVersions = Maps.newHashMap();
             backendPartitions = Maps.newHashMap();
diff --git 
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
index bde2f357005..787075d9aa6 100644
--- 
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
+++ 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.apache.doris.regression.suite.ClusterOptions
+
 suite("test_binlog_config_change_index") {
 
     def syncer = getSyncer()
diff --git 
a/regression-test/suites/fault_injection_p0/test_flush_editlog_exception.groovy 
b/regression-test/suites/fault_injection_p0/test_flush_editlog_exception.groovy
new file mode 100644
index 00000000000..43196693890
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_flush_editlog_exception.groovy
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_flush_editlog_exception", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.enableDebugPoints()
+
+    docker(options) {
+
+        def tableName = "test_flush_editlog_exception"
+        // test txn X inverted index
+        sql "DROP TABLE IF EXISTS ${tableName}"
+
+        
GetDebugPoint().enableDebugPointForAllFEs('EditLog.flushEditLog.exception')
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `user_id` bigint default 999,
+                `group_id` bigint,
+                `id` bigint,
+                `vv` variant,
+                INDEX idx_col1 (user_id) USING INVERTED
+                ) ENGINE=OLAP
+            UNIQUE KEY(user_id, group_id)
+            DISTRIBUTED BY HASH (user_id) BUCKETS 1
+            PROPERTIES(
+                    "store_row_column" = "true",
+                    "replication_num" = "1"
+                    );
+        """
+
+        // Wait until SELECT returns error (i.e., Doris FE process exits due 
to flushEditLog exception)
+        int maxWaitMs = 10000
+        int intervalMs = 200
+        long startTime = System.currentTimeMillis()
+        boolean gotError = false
+        while (true) {
+            try {
+                sql "SELECT * from ${tableName}"
+            } catch (Exception e) {
+                gotError = true
+                break
+            }
+            if (System.currentTimeMillis() - startTime > maxWaitMs) {
+                throw new IllegalStateException("Timeout waiting for SELECT to 
return error")
+            }
+            Thread.sleep(intervalMs)
+        }
+        assert gotError
+    }
+}
diff --git 
a/regression-test/suites/insert_p0/transaction/test_publish_exception.groovy 
b/regression-test/suites/insert_p0/transaction/test_publish_exception.groovy
new file mode 100644
index 00000000000..873c22bceb1
--- /dev/null
+++ b/regression-test/suites/insert_p0/transaction/test_publish_exception.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_publish_exception", "nonConcurrent") {
+    def tableName = "test_publish_exception"
+    // test txn X inverted index
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `user_id` bigint default 999,
+            `group_id` bigint,
+            `id` bigint,
+            `vv` variant,
+            INDEX idx_col1 (user_id) USING INVERTED
+            ) ENGINE=OLAP
+        UNIQUE KEY(user_id, group_id)
+        DISTRIBUTED BY HASH (user_id) BUCKETS 1
+        PROPERTIES(
+                "store_row_column" = "true",
+                "replication_num" = "1"
+                );
+    """
+    
DebugPoint.enableDebugPointForAllFEs('PublishVersionDaemon.tryFinishTxnSync.fail')
+    sql "begin"
+    sql """insert into ${tableName} 
values(1,1,5,'{"b":"b"}'),(1,1,4,'{"b":"b"}'),(1,1,3,'{"b":"b"}')"""
+
+    test {
+        sql "commit"
+        exception "transaction commit successfully, BUT data will be visible 
later"
+    }
+
+    def result = sql "SELECT * from ${tableName}"
+    assert result.size() == 0
+
+    
DebugPoint.disableDebugPointForAllFEs('PublishVersionDaemon.tryFinishTxnSync.fail')
+    // Wait until the result size is 1 or timeout (10s)
+    int maxWaitMs = 10000
+    int intervalMs = 200
+    long startTime = System.currentTimeMillis()
+    while (true) {
+        def result2 = sql "SELECT * from ${tableName}"
+        if (result2.size() == 1) {
+            break
+        }
+        if (System.currentTimeMillis() - startTime > maxWaitMs) {
+            throw new IllegalStateException("Timeout waiting for result.size() 
== 1")
+        }
+        Thread.sleep(intervalMs)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to