This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8a0d940914e [fix](publish) Pick Fix publish failed because because
task is null (#37546)
8a0d940914e is described below
commit 8a0d940914ee8fd41acfe5ce08061fabc77da40a
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 11 15:22:04 2024 +0800
[fix](publish) Pick Fix publish failed because because task is null (#37546)
## Proposed changes
Pick https://github.com/apache/doris/pull/37531
This pr catch the exception to make the failed txn does not block the
other txns.
---
.../doris/transaction/PublishVersionDaemon.java | 114 +++++++++++++--------
1 file changed, 69 insertions(+), 45 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 22ca57f2399..a1861fb7f4d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -91,6 +91,13 @@ public class PublishVersionDaemon extends MasterDaemon {
LOG.warn("some transaction state need to publish, but no backend
exists");
return;
}
+ traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates,
allBackends);
+ tryFinishTxn(readyTransactionStates, infoService,
globalTransactionMgr, partitionVisibleVersions,
+ backendPartitions);
+ }
+
+ private void
traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState>
readyTransactionStates,
+ List<Long> allBackends) {
long createPublishVersionTaskTime = System.currentTimeMillis();
// every backend-transaction identified a single task
AgentBatchTask batchTask = new AgentBatchTask();
@@ -153,60 +160,77 @@ public class PublishVersionDaemon extends MasterDaemon {
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
}
+ }
- Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+ private void tryFinishTxn(List<TransactionState> readyTransactionStates,
+ SystemInfoService infoService, GlobalTransactionMgr
globalTransactionMgr,
+ Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>>
backendPartitions) {
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
- Stream<PublishVersionTask> publishVersionTaskStream =
transactionState
- .getPublishVersionTasks()
- .values()
- .stream()
- .peek(task -> {
- if (task.isFinished() &&
CollectionUtils.isEmpty(task.getErrorTablets())) {
- Map<Long, Long> tableIdToDeltaNumRows =
- task.getTableIdToDeltaNumRows();
- tableIdToDeltaNumRows.forEach((tableId, numRows)
-> {
- tableIdToTotalDeltaNumRows
- .computeIfPresent(tableId, (id,
orgNumRows) -> orgNumRows + numRows);
-
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
- });
- }
- });
- boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
- .anyMatch(task -> !task.isFinished() &&
infoService.checkBackendAlive(task.getBackendId()));
-
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+ try {
+ // try to finish the transaction, if failed just retry in next
loop
+ tryFinishOneTxn(transactionState, infoService,
globalTransactionMgr, partitionVisibleVersions,
+ backendPartitions);
+ } catch (Throwable t) {
+ LOG.error("errors while finish transaction: {}, publish tasks:
{}", transactionState,
+ transactionState.getPublishVersionTasks(), t);
+ }
+ } // end for readyTransactionStates
+ }
- boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask ||
transactionState.isPublishTimeout()
- ||
DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
- if (shouldFinishTxn) {
- try {
- // one transaction exception should not affect other
transaction
-
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
- transactionState.getTransactionId(),
partitionVisibleVersions, backendPartitions);
- } catch (Exception e) {
- LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
- }
- if (transactionState.getTransactionStatus() !=
TransactionStatus.VISIBLE) {
- // if finish transaction state failed, then update publish
version time, should check
- // to finish after some interval
- transactionState.updateSendTaskTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug("publish version for transaction {} failed",
transactionState);
+ private void tryFinishOneTxn(TransactionState transactionState,
+ SystemInfoService infoService, GlobalTransactionMgr
globalTransactionMgr,
+ Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>>
backendPartitions) {
+ Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+ Stream<PublishVersionTask> publishVersionTaskStream = transactionState
+ .getPublishVersionTasks()
+ .values()
+ .stream()
+ .peek(task -> {
+ if (task.isFinished() &&
CollectionUtils.isEmpty(task.getErrorTablets())) {
+ Map<Long, Long> tableIdToDeltaNumRows =
+ task.getTableIdToDeltaNumRows();
+ tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
+ tableIdToTotalDeltaNumRows
+ .computeIfPresent(tableId, (id,
orgNumRows) -> orgNumRows + numRows);
+ tableIdToTotalDeltaNumRows.putIfAbsent(tableId,
numRows);
+ });
}
+ });
+ boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
+ .anyMatch(task -> !task.isFinished() &&
infoService.checkBackendAlive(task.getBackendId()));
+
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+
+ boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask ||
transactionState.isPublishTimeout()
+ ||
DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
+ if (shouldFinishTxn) {
+ try {
+ // one transaction exception should not affect other
transaction
+
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
+ transactionState.getTransactionId(),
partitionVisibleVersions, backendPartitions);
+ } catch (Exception e) {
+ LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
+ }
+ if (transactionState.getTransactionStatus() !=
TransactionStatus.VISIBLE) {
+ // if finish transaction state failed, then update publish
version time, should check
+ // to finish after some interval
+ transactionState.updateSendTaskTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("publish version for transaction {} failed",
transactionState);
}
}
+ }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
- }
- transactionState.pruneAfterVisible();
- if (MetricRepo.isInit) {
- long publishTime =
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
- MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
- }
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
+ AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
}
- } // end for readyTransactionStates
+ transactionState.pruneAfterVisible();
+ if (MetricRepo.isInit) {
+ long publishTime =
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
+ MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
+ }
+ }
}
private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState
transactionState,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]