dataroaring commented on code in PR #32980:
URL: https://github.com/apache/doris/pull/32980#discussion_r1546035141
##########
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java:
##########
@@ -2290,4 +2480,177 @@ protected void
updateMultiTableRunningTransactionTableIds(long transactionId, Li
}
idToRunningTransactionState.get(transactionId).setTableIdList(tableIds);
}
+
+ private PublishResult finishCheckQuorumReplicas(TransactionState
transactionState, Set<Long> errorReplicaIds) {
+ long now = System.currentTimeMillis();
+ long firstPublishVersionTime =
transactionState.getFirstPublishVersionTime();
+ boolean allowPublishOneSucc = false;
+ if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
+ && now >= firstPublishVersionTime +
Config.publish_wait_time_second * 1000L) {
+ allowPublishOneSucc = true;
+ }
+ List<String> logs = Lists.newArrayList();
+
+ Map<Long, List<PublishVersionTask>> publishTasks =
transactionState.getPublishVersionTasks();
+ PublishResult publishResult = PublishResult.QUORUM_SUCC;
+ for (SubTransactionState subTransactionState :
transactionState.getSubTransactionStates()) {
+ long subTxnId = subTransactionState.getSubTransactionId();
+ TableCommitInfo tableCommitInfo =
transactionState.getSubTxnIdToTableCommitInfo().get(subTxnId);
+ if (tableCommitInfo == null) {
+ continue;
+ }
+ OlapTable table = (OlapTable) subTransactionState.getTable();
+ long tableId = table.getId();
+ for (Entry<Long, PartitionCommitInfo> entry :
tableCommitInfo.getIdToPartitionCommitInfo().entrySet()) {
+ long partitionId = entry.getValue().getPartitionId();
+ Partition partition = table.getPartition(partitionId);
+ if (partition == null) {
+ continue;
+ }
+ int loadRequiredReplicaNum =
table.getLoadRequiredReplicaNum(partitionId);
+
+ // TODO should use sub transaction load indexes
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ boolean alterReplicaLoadedTxn =
isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table);
+
+ // check success replica number for each tablet.
+ // a success replica means:
+ // 1. Not in errorReplicaIds: succeed in both commit and
publish phase
+ // 2. last failed version < 0: is a health replica before
+ // 3. version catch up: not with a stale version
+ // Here we only check number, the replica version will be
updated in updateCatalogAfterVisible()
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet :
partition.getIndex(index.getId()).getTablets()) {
+ List<Replica> tabletSuccReplicas =
Lists.newArrayList();
+ List<Replica> tabletWriteFailedReplicas =
Lists.newArrayList();
+ List<Replica> tabletVersionFailedReplicas =
Lists.newArrayList();
+ // TODO always use the visible version because the
replica version is not changed
+ long newVersion = partition.getVisibleVersion() + 1;
+ for (Replica replica : tablet.getReplicas()) {
+ for (PublishVersionTask publishVersionTask :
publishTasks.get(replica.getBackendId())) {
+ boolean needCheck =
publishVersionTask.getTransactionId()
+ ==
subTransactionState.getSubTransactionId()
+ &&
publishVersionTask.getPartitionVersionInfos().stream()
+ .anyMatch(s -> s.getPartitionId() ==
partitionId);
+ if (needCheck) {
+
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
alterReplicaLoadedTxn,
+ newVersion, publishVersionTask,
+ errorReplicaIds,
tabletSuccReplicas, tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
+ LOG.debug("after
checkReplicaContinuousVersion for txn_id={}, sub_txn_id={}, "
+ + "tablet_id={},
new_version={}, success_replicas={}, "
+ +
"write_failed_replicas={}, version_failed_replicas={}",
+
transactionState.getTransactionId(),
+
subTransactionState.getSubTransactionId(), tablet.getId(), newVersion,
+ tabletSuccReplicas,
tabletWriteFailedReplicas, tabletVersionFailedReplicas);
+ }
+ }
+ }
+
+ publishResult = checkQuorumReplicas(transactionState,
tableId, partition, tablet,
+ loadRequiredReplicaNum, allowPublishOneSucc,
newVersion, tabletSuccReplicas,
+ tabletWriteFailedReplicas,
tabletVersionFailedReplicas, publishResult, logs);
+ }
+ }
+ }
+ }
+
Review Comment:
Here, we need assure that there is no hole at least for one replica.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]