This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d7e172b84eb [fix](publish) Catch exception in genPublishTask to make 
one failed txn does not block the other txns (#37724)
d7e172b84eb is described below

commit d7e172b84eb5436942421b7561b1bf4d7fc35fb9
Author: meiyi <[email protected]>
AuthorDate: Wed Jul 17 17:21:40 2024 +0800

    [fix](publish) Catch exception in genPublishTask to make one failed txn 
does not block the other txns (#37724)
    
    ## Proposed changes
    
    If any exception(such as NullPointerException) is thrown in
    `genPublishTask` when publish, the publish for all txns will fail.
    This pr catch the exception to make the failed txn does not block other
    txns.
---
 .../doris/transaction/PublishVersionDaemon.java    | 11 ++++++-
 .../data/insert_p0/txn_insert_inject_case.out      | 14 +++++++++
 .../suites/insert_p0/txn_insert_inject_case.groovy | 36 +++++++++++++++++++++-
 3 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 82e954b6a1b..3d852e8efbb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -109,7 +109,12 @@ public class PublishVersionDaemon extends MasterDaemon {
             if (transactionState.hasSendTask()) {
                 continue;
             }
-            genPublishTask(allBackends, transactionState, 
createPublishVersionTaskTime, beIdToBaseTabletIds, batchTask);
+            try {
+                genPublishTask(allBackends, transactionState, 
createPublishVersionTaskTime, beIdToBaseTabletIds,
+                        batchTask);
+            } catch (Throwable t) {
+                LOG.error("errors while generate publish task for transaction: 
{}", transactionState, t);
+            }
         }
         if (!batchTask.getAllTasks().isEmpty()) {
             AgentTaskExecutor.submit(batchTask);
@@ -127,6 +132,10 @@ public class PublishVersionDaemon extends MasterDaemon {
             publishBackends = Sets.newHashSet();
             publishBackends.addAll(allBackends);
         }
+        if (transactionState.getTransactionId() == 
DebugPointUtil.getDebugParamOrDefault(
+                "PublishVersionDaemon.genPublishTask.failed", "txnId", -1L)) {
+            throw new NullPointerException("genPublishTask failed for txnId: " 
+ transactionState.getTransactionId());
+        }
 
         if (transactionState.getSubTxnIds() != null) {
             for (Entry<Long, TableCommitInfo> entry : 
transactionState.getSubTxnIdToTableCommitInfo().entrySet()) {
diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out 
b/regression-test/data/insert_p0/txn_insert_inject_case.out
index 799229be54a..b5b736b0b91 100644
--- a/regression-test/data/insert_p0/txn_insert_inject_case.out
+++ b/regression-test/data/insert_p0/txn_insert_inject_case.out
@@ -7,3 +7,17 @@
 2      3.3     xyz     [1]     [1, 0]
 2      3.3     xyz     [1]     [1, 0]
 
+-- !select2 --
+
+-- !select3 --
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+101    2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+
+-- !select4 --
+\N     \N      \N      [null]  [null, 0]
+102    2.2     abc     []      []
+3      2.2     abc     []      []
+4      3.3     xyz     [1]     [1, 0]
+
diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy 
b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
index 083f01b4a8a..50a7729656a 100644
--- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
@@ -151,6 +151,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
     }
 
     // 2. commit failed
+    sql """ truncate table ${table}_0 """
     def dbName = "regression_test_insert_p0"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
@@ -174,7 +175,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
             statement.execute("commit")
             assertTrue(false, "commit should fail")
         } catch (Exception e) {
-            logger.error("commit failed", e);
+            logger.info("commit failed " + e.getMessage())
         }
     } finally {
         
GetDebugPoint().disableDebugPointForAllFEs('DatabaseTransactionMgr.commitTransaction.failed')
@@ -185,4 +186,37 @@ suite("txn_insert_inject_case", "nonConcurrent") {
     assertEquals(1, txn_info.size())
     assertEquals("ABORTED", txn_info[0].get("TransactionStatus"))
     assertTrue(txn_info[0].get("Reason").contains("DebugPoint: 
DatabaseTransactionMgr.commitTransaction.failed"))
+
+    // 3. one txn publish failed
+    sql """ truncate table ${table}_0 """
+    txn_id = 0
+    try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+         Statement statement = conn.createStatement()) {
+        statement.execute("begin")
+        statement.execute("insert into ${table}_0 select * from ${table}_1;")
+        txn_id = get_txn_id_from_server_info((((StatementImpl) 
statement).results).getServerInfo())
+        
GetDebugPoint().enableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed',
 [txnId:txn_id])
+        statement.execute("insert into ${table}_0 select * from ${table}_2;")
+        statement.execute("commit")
+
+        sql """insert into ${table}_0 values(100, 2.2, "abc", [], [])"""
+        sql """insert into ${table}_1 values(101, 2.2, "abc", [], [])"""
+        sql """insert into ${table}_2 values(102, 2.2, "abc", [], [])"""
+        order_qt_select2 """select * from ${table}_0"""
+        order_qt_select3 """select * from ${table}_1"""
+        order_qt_select4 """select * from ${table}_2"""
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed')
+        def rowCount = 0
+        for (int i = 0; i < 20; i++) {
+            def result = sql "select count(*) from ${table}_0"
+            logger.info("rowCount: " + result + ", retry: " + i)
+            rowCount =  result[0][0]
+            if (rowCount >= 7) {
+                break
+            }
+            sleep(1000)
+        }
+        assertEquals(7, rowCount)
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to