This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d7e172b84eb [fix](publish) Catch exception in genPublishTask to make
one failed txn does not block the other txns (#37724)
d7e172b84eb is described below
commit d7e172b84eb5436942421b7561b1bf4d7fc35fb9
Author: meiyi <[email protected]>
AuthorDate: Wed Jul 17 17:21:40 2024 +0800
[fix](publish) Catch exception in genPublishTask to make one failed txn
does not block the other txns (#37724)
## Proposed changes
If any exception(such as NullPointerException) is thrown in
`genPublishTask` when publish, the publish for all txns will fail.
This pr catch the exception to make the failed txn does not block other
txns.
---
.../doris/transaction/PublishVersionDaemon.java | 11 ++++++-
.../data/insert_p0/txn_insert_inject_case.out | 14 +++++++++
.../suites/insert_p0/txn_insert_inject_case.groovy | 36 +++++++++++++++++++++-
3 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 82e954b6a1b..3d852e8efbb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -109,7 +109,12 @@ public class PublishVersionDaemon extends MasterDaemon {
if (transactionState.hasSendTask()) {
continue;
}
- genPublishTask(allBackends, transactionState,
createPublishVersionTaskTime, beIdToBaseTabletIds, batchTask);
+ try {
+ genPublishTask(allBackends, transactionState,
createPublishVersionTaskTime, beIdToBaseTabletIds,
+ batchTask);
+ } catch (Throwable t) {
+ LOG.error("errors while generate publish task for transaction:
{}", transactionState, t);
+ }
}
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
@@ -127,6 +132,10 @@ public class PublishVersionDaemon extends MasterDaemon {
publishBackends = Sets.newHashSet();
publishBackends.addAll(allBackends);
}
+ if (transactionState.getTransactionId() ==
DebugPointUtil.getDebugParamOrDefault(
+ "PublishVersionDaemon.genPublishTask.failed", "txnId", -1L)) {
+ throw new NullPointerException("genPublishTask failed for txnId: "
+ transactionState.getTransactionId());
+ }
if (transactionState.getSubTxnIds() != null) {
for (Entry<Long, TableCommitInfo> entry :
transactionState.getSubTxnIdToTableCommitInfo().entrySet()) {
diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out
b/regression-test/data/insert_p0/txn_insert_inject_case.out
index 799229be54a..b5b736b0b91 100644
--- a/regression-test/data/insert_p0/txn_insert_inject_case.out
+++ b/regression-test/data/insert_p0/txn_insert_inject_case.out
@@ -7,3 +7,17 @@
2 3.3 xyz [1] [1, 0]
2 3.3 xyz [1] [1, 0]
+-- !select2 --
+
+-- !select3 --
+\N \N \N [null] [null, 0]
+1 2.2 abc [] []
+101 2.2 abc [] []
+2 3.3 xyz [1] [1, 0]
+
+-- !select4 --
+\N \N \N [null] [null, 0]
+102 2.2 abc [] []
+3 2.2 abc [] []
+4 3.3 xyz [1] [1, 0]
+
diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
index 083f01b4a8a..50a7729656a 100644
--- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
@@ -151,6 +151,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
}
// 2. commit failed
+ sql """ truncate table ${table}_0 """
def dbName = "regression_test_insert_p0"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
@@ -174,7 +175,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
statement.execute("commit")
assertTrue(false, "commit should fail")
} catch (Exception e) {
- logger.error("commit failed", e);
+ logger.info("commit failed " + e.getMessage())
}
} finally {
GetDebugPoint().disableDebugPointForAllFEs('DatabaseTransactionMgr.commitTransaction.failed')
@@ -185,4 +186,37 @@ suite("txn_insert_inject_case", "nonConcurrent") {
assertEquals(1, txn_info.size())
assertEquals("ABORTED", txn_info[0].get("TransactionStatus"))
assertTrue(txn_info[0].get("Reason").contains("DebugPoint:
DatabaseTransactionMgr.commitTransaction.failed"))
+
+ // 3. one txn publish failed
+ sql """ truncate table ${table}_0 """
+ txn_id = 0
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement statement = conn.createStatement()) {
+ statement.execute("begin")
+ statement.execute("insert into ${table}_0 select * from ${table}_1;")
+ txn_id = get_txn_id_from_server_info((((StatementImpl)
statement).results).getServerInfo())
+
GetDebugPoint().enableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed',
[txnId:txn_id])
+ statement.execute("insert into ${table}_0 select * from ${table}_2;")
+ statement.execute("commit")
+
+ sql """insert into ${table}_0 values(100, 2.2, "abc", [], [])"""
+ sql """insert into ${table}_1 values(101, 2.2, "abc", [], [])"""
+ sql """insert into ${table}_2 values(102, 2.2, "abc", [], [])"""
+ order_qt_select2 """select * from ${table}_0"""
+ order_qt_select3 """select * from ${table}_1"""
+ order_qt_select4 """select * from ${table}_2"""
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed')
+ def rowCount = 0
+ for (int i = 0; i < 20; i++) {
+ def result = sql "select count(*) from ${table}_0"
+ logger.info("rowCount: " + result + ", retry: " + i)
+ rowCount = result[0][0]
+ if (rowCount >= 7) {
+ break
+ }
+ sleep(1000)
+ }
+ assertEquals(7, rowCount)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]