yujun777 commented on code in PR #28079:
URL: https://github.com/apache/doris/pull/28079#discussion_r1582063717
##########
fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java:
##########
@@ -118,59 +123,87 @@ private void publishVersion() {
}
}
- Set<Long> publishBackends =
transactionState.getPublishVersionTasks().keySet();
- // public version tasks are not persisted in catalog, so
publishBackends may be empty.
- // so we have to try publish to all backends;
- if (publishBackends.isEmpty()) {
- // could not just add to it, should new a new object, or the
back map will destroyed
- publishBackends = Sets.newHashSet();
- publishBackends.addAll(allBackends);
- }
-
- for (long backendId : publishBackends) {
- PublishVersionTask task = new PublishVersionTask(backendId,
- transactionState.getTransactionId(),
- transactionState.getDbId(),
- partitionVersionInfos,
- createPublishVersionTaskTime);
-
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId,
Collections.emptySet()));
- // add to AgentTaskQueue for handling finish report.
- // not check return value, because the add will success
- AgentTaskQueue.addTask(task);
- batchTask.addTask(task);
- transactionState.addPublishVersionTask(backendId, task);
- }
- transactionState.setSendedTask();
- LOG.info("send publish tasks for transaction: {}, db: {}",
transactionState.getTransactionId(),
- transactionState.getDbId());
+ genPublishTask(allBackends, transactionState,
partitionVersionInfos,
+ createPublishVersionTaskTime, beIdToBaseTabletIds,
batchTask);
}
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
}
+ }
+ private static void genPublishTask(List<Long> allBackends,
TransactionState transactionState,
+ List<TPartitionVersionInfo>
partitionVersionInfos,
+ long createPublishVersionTaskTime,
+ Map<Long, Set<Long>>
beIdToBaseTabletIds, AgentBatchTask batchTask) {
+ Set<Long> publishBackends =
transactionState.getPublishVersionTasks().keySet();
+ // public version tasks are not persisted in catalog, so
publishBackends may be empty.
+ // so we have to try publish to all backends;
+ if (publishBackends.isEmpty()) {
+ // could not just add to it, should new a new object, or the back
map will destroyed
+ publishBackends = Sets.newHashSet();
+ publishBackends.addAll(allBackends);
+ }
+
+ for (long backendId : publishBackends) {
+ PublishVersionTask task = new PublishVersionTask(backendId,
+ transactionState.getTransactionId(),
+ transactionState.getDbId(),
+ partitionVersionInfos,
+ createPublishVersionTaskTime);
+ task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId,
Collections.emptySet()));
+ // add to AgentTaskQueue for handling finish report.
+ // not check return value, because the add will success
+ AgentTaskQueue.addTask(task);
+ batchTask.addTask(task);
+ transactionState.addPublishVersionTask(backendId, task);
+ }
+ transactionState.setSendedTask();
+ LOG.info("send publish tasks for transaction: {}, db: {}",
transactionState.getTransactionId(),
+ transactionState.getDbId());
+ }
+
+ private static void tryFinishTxn(List<TransactionState>
readyTransactionStates,
+ SystemInfoService infoService,
GlobalTransactionMgrIface globalTransactionMgr) {
Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
- Stream<PublishVersionTask> publishVersionTaskStream =
transactionState
- .getPublishVersionTasks()
- .values()
- .stream()
- .peek(task -> {
- if (task.isFinished() &&
CollectionUtils.isEmpty(task.getErrorTablets())) {
- Map<Long, Long> tableIdToDeltaNumRows =
- task.getTableIdToDeltaNumRows();
- tableIdToDeltaNumRows.forEach((tableId, numRows)
-> {
- tableIdToTotalDeltaNumRows
- .computeIfPresent(tableId, (id,
orgNumRows) -> orgNumRows + numRows);
-
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
- });
- }
- });
- boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
- .anyMatch(task -> !task.isFinished() &&
infoService.checkBackendAlive(task.getBackendId()));
+ AtomicBoolean hasBackendAliveAndUnfinishedTask = new
AtomicBoolean(false);
+ Set<Long> notFinishTaskBe = Sets.newHashSet();
+ transactionState.getPublishVersionTasks().forEach((beId, task) -> {
+ if (task.isFinished()) {
+ if (CollectionUtils.isEmpty(task.getErrorTablets())) {
+ Map<Long, Long> tableIdToDeltaNumRows =
task.getTableIdToDeltaNumRows();
+ tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
+ tableIdToTotalDeltaNumRows
+ .computeIfPresent(tableId, (id, orgNumRows) ->
orgNumRows + numRows);
+ tableIdToTotalDeltaNumRows.putIfAbsent(tableId,
numRows);
+ });
+ }
+ } else {
+ if (infoService.checkBackendAlive(task.getBackendId())) {
+ hasBackendAliveAndUnfinishedTask.set(true);
+ }
+ notFinishTaskBe.add(beId);
+ }
+ });
+
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+ LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe,
transactionState);
Review Comment:
delete this log or add check LOG.isDebugEnabled because
transactionState.toString make a lot work
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]