This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7ee271b98fa [fix](publish) Pick Catch exception in genPublishTask to
make one failed txn does not block the other txns (#37724) (#38044)
7ee271b98fa is described below
commit 7ee271b98fa569389aff4da6d824bb1f094b9ca5
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 18 14:47:32 2024 +0800
[fix](publish) Pick Catch exception in genPublishTask to make one failed
txn does not block the other txns (#37724) (#38044)
Pick https://github.com/apache/doris/pull/37724
---
.../doris/transaction/PublishVersionDaemon.java | 196 ++++++++++++---------
1 file changed, 112 insertions(+), 84 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index d5d72869117..b83a0974301 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -73,6 +73,12 @@ public class PublishVersionDaemon extends MasterDaemon {
LOG.warn("some transaction state need to publish, but no backend
exists");
return;
}
+ traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates,
allBackends);
+ tryFinishTxn(readyTransactionStates, infoService,
globalTransactionMgr);
+ }
+
+ private void
traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState>
readyTransactionStates,
+ List<Long> allBackends) {
long createPublishVersionTaskTime = System.currentTimeMillis();
// every backend-transaction identified a single task
AgentBatchTask batchTask = new AgentBatchTask();
@@ -81,99 +87,121 @@ public class PublishVersionDaemon extends MasterDaemon {
if (transactionState.hasSendTask()) {
continue;
}
- List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
- for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
-
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
- }
- List<TPartitionVersionInfo> partitionVersionInfos = new
ArrayList<>(partitionCommitInfos.size());
- for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
- TPartitionVersionInfo versionInfo = new
TPartitionVersionInfo(commitInfo.getPartitionId(),
- commitInfo.getVersion(), 0);
- partitionVersionInfos.add(versionInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("try to publish version info partitionid [{}],
version [{}]",
- commitInfo.getPartitionId(),
- commitInfo.getVersion());
- }
- }
- Set<Long> publishBackends =
transactionState.getPublishVersionTasks().keySet();
- // public version tasks are not persisted in catalog, so
publishBackends may be empty.
- // so we have to try publish to all backends;
- if (publishBackends.isEmpty()) {
- // could not just add to it, should new a new object, or the
back map will destroyed
- publishBackends = Sets.newHashSet();
- publishBackends.addAll(allBackends);
+ try {
+ genPublishTask(allBackends, transactionState,
createPublishVersionTaskTime, batchTask);
+ } catch (Throwable t) {
+ LOG.error("errors while generate publish task for transaction:
{}", transactionState, t);
}
-
- for (long backendId : publishBackends) {
- PublishVersionTask task = new PublishVersionTask(backendId,
- transactionState.getTransactionId(),
- transactionState.getDbId(),
- partitionVersionInfos,
- createPublishVersionTaskTime);
- // add to AgentTaskQueue for handling finish report.
- // not check return value, because the add will success
- AgentTaskQueue.addTask(task);
- batchTask.addTask(task);
- transactionState.addPublishVersionTask(backendId, task);
- }
- transactionState.setSendedTask();
- LOG.info("send publish tasks for transaction: {}, db: {}",
transactionState.getTransactionId(),
- transactionState.getDbId());
}
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
}
+ }
- Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
- // try to finish the transaction, if failed just retry in next loop
- for (TransactionState transactionState : readyTransactionStates) {
- Stream<PublishVersionTask> publishVersionTaskStream =
transactionState
- .getPublishVersionTasks()
- .values()
- .stream()
- .peek(task -> {
- if (task.isFinished() &&
CollectionUtils.isEmpty(task.getErrorTablets())) {
- Map<Long, Long> tableIdToDeltaNumRows =
- task.getTableIdToDeltaNumRows();
- tableIdToDeltaNumRows.forEach((tableId, numRows)
-> {
- tableIdToTotalDeltaNumRows
- .computeIfPresent(tableId, (id,
orgNumRows) -> orgNumRows + numRows);
-
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
- });
- }
- });
- boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
- .anyMatch(task -> !task.isFinished() &&
infoService.checkBackendAlive(task.getBackendId()));
-
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
-
- boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask ||
transactionState.isPublishTimeout();
- if (shouldFinishTxn) {
- try {
- // one transaction exception should not affect other
transaction
-
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
- transactionState.getTransactionId());
- } catch (Exception e) {
- LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
- }
- if (transactionState.getTransactionStatus() !=
TransactionStatus.VISIBLE) {
- // if finish transaction state failed, then update publish
version time, should check
- // to finish after some interval
- transactionState.updateSendTaskTime();
- LOG.debug("publish version for transaction {} failed",
transactionState);
- }
+ private void genPublishTask(List<Long> allBackends, TransactionState
transactionState,
+ long createPublishVersionTaskTime, AgentBatchTask batchTask) {
+ List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
+ for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
+
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
+ }
+ List<TPartitionVersionInfo> partitionVersionInfos = new
ArrayList<>(partitionCommitInfos.size());
+ for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
+ TPartitionVersionInfo versionInfo = new
TPartitionVersionInfo(commitInfo.getPartitionId(),
+ commitInfo.getVersion(), 0);
+ partitionVersionInfos.add(versionInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try to publish version info partitionid [{}],
version [{}]",
+ commitInfo.getPartitionId(),
+ commitInfo.getVersion());
}
+ }
+ Set<Long> publishBackends =
transactionState.getPublishVersionTasks().keySet();
+ // public version tasks are not persisted in catalog, so
publishBackends may be empty.
+ // so we have to try publish to all backends;
+ if (publishBackends.isEmpty()) {
+ // could not just add to it, should new a new object, or the back
map will destroyed
+ publishBackends = Sets.newHashSet();
+ publishBackends.addAll(allBackends);
+ }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
- }
- transactionState.pruneAfterVisible();
- if (MetricRepo.isInit) {
- long publishTime =
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
- MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
- }
+ for (long backendId : publishBackends) {
+ PublishVersionTask task = new PublishVersionTask(backendId,
+ transactionState.getTransactionId(),
+ transactionState.getDbId(),
+ partitionVersionInfos,
+ createPublishVersionTaskTime);
+ // add to AgentTaskQueue for handling finish report.
+ // not check return value, because the add will success
+ AgentTaskQueue.addTask(task);
+ batchTask.addTask(task);
+ transactionState.addPublishVersionTask(backendId, task);
+ }
+ transactionState.setSendedTask();
+ LOG.info("send publish tasks for transaction: {}, db: {}",
transactionState.getTransactionId(),
+ transactionState.getDbId());
+ }
+
+ private void tryFinishTxn(List<TransactionState> readyTransactionStates,
+ SystemInfoService infoService, GlobalTransactionMgr
globalTransactionMgr) {
+ for (TransactionState transactionState : readyTransactionStates) {
+ try {
+ // try to finish the transaction, if failed just retry in next
loop
+ tryFinishOneTxn(transactionState, infoService,
globalTransactionMgr);
+ } catch (Throwable t) {
+ LOG.error("errors while finish transaction: {}, publish tasks:
{}", transactionState,
+ transactionState.getPublishVersionTasks(), t);
}
} // end for readyTransactionStates
}
+
+ private void tryFinishOneTxn(TransactionState transactionState,
SystemInfoService infoService,
+ GlobalTransactionMgr globalTransactionMgr) {
+ Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+ Stream<PublishVersionTask> publishVersionTaskStream = transactionState
+ .getPublishVersionTasks()
+ .values()
+ .stream()
+ .peek(task -> {
+ if (task.isFinished() &&
CollectionUtils.isEmpty(task.getErrorTablets())) {
+ Map<Long, Long> tableIdToDeltaNumRows =
+ task.getTableIdToDeltaNumRows();
+ tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
+ tableIdToTotalDeltaNumRows
+ .computeIfPresent(tableId, (id,
orgNumRows) -> orgNumRows + numRows);
+ tableIdToTotalDeltaNumRows.putIfAbsent(tableId,
numRows);
+ });
+ }
+ });
+ boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
+ .anyMatch(task -> !task.isFinished() &&
infoService.checkBackendAlive(task.getBackendId()));
+
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+
+ boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask ||
transactionState.isPublishTimeout();
+ if (shouldFinishTxn) {
+ try {
+ // one transaction exception should not affect other
transaction
+
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
+ transactionState.getTransactionId());
+ } catch (Exception e) {
+ LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
+ }
+ if (transactionState.getTransactionStatus() !=
TransactionStatus.VISIBLE) {
+ // if finish transaction state failed, then update publish
version time, should check
+ // to finish after some interval
+ transactionState.updateSendTaskTime();
+ LOG.debug("publish version for transaction {} failed",
transactionState);
+ }
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
+ AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
+ }
+ transactionState.pruneAfterVisible();
+ if (MetricRepo.isInit) {
+ long publishTime =
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
+ MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]