This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d40bd62864 [fix](publish) fix publish failed when balance (#35462)
5d40bd62864 is described below

commit 5d40bd62864232fcb775c6b2b8c622564434b112
Author: meiyi <[email protected]>
AuthorDate: Wed May 29 09:40:28 2024 +0800

    [fix](publish) fix publish failed when balance (#35462)
    
    ## Proposed changes
    
    When load and do balance, the publish is failed:
    ```
    2024-05-27 11:40:16,082 WARN (PUBLISH_VERSION|32) 
[PublishVersionDaemon.tryFinishTxn():204] error happens when finish transaction 
4358
    java.lang.NullPointerException: Cannot invoke "java.util.List.iterator()" 
because the return value of "java.util.Map.get(Object)" is null
            at 
org.apache.doris.transaction.DatabaseTransactionMgr.finishCheckQuorumReplicas(DatabaseTransactionMgr.java:1343)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.transaction.DatabaseTransactionMgr.finishTransaction(DatabaseTransactionMgr.java:1106)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.transaction.GlobalTransactionMgr.finishTransaction(GlobalTransactionMgr.java:455)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.transaction.PublishVersionDaemon.tryFinishTxn(PublishVersionDaemon.java:201)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.transaction.PublishVersionDaemon.publishVersion(PublishVersionDaemon.java:95)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.transaction.PublishVersionDaemon.runAfterCatalogReady(PublishVersionDaemon.java:69)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58) 
~[doris-fe.jar:1.2-SNAPSHOT]
---
 .../apache/doris/catalog/TabletInvertedIndex.java  |  3 ++
 .../doris/transaction/DatabaseTransactionMgr.java  | 56 +++++++++++++---------
 2 files changed, 37 insertions(+), 22 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index e7c1b0c2875..d5999b17ca3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -335,6 +335,9 @@ public class TabletInvertedIndex {
                 // make transaction VISIBLE when last publish failed.
                 Map<Long, List<PublishVersionTask>> publishVersionTask = 
transactionState.getPublishVersionTasks();
                 List<PublishVersionTask> tasks = 
publishVersionTask.get(backendId);
+                if (tasks == null) {
+                    continue;
+                }
                 for (PublishVersionTask task : tasks) {
                     if (task != null && task.isFinished()) {
                         List<Long> errorTablets = task.getErrorTablets();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index f72a91c4edd..e271146b1e7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1340,12 +1340,18 @@ public class DatabaseTransactionMgr {
                     tabletWriteFailedReplicas.clear();
                     tabletVersionFailedReplicas.clear();
                     for (Replica replica : tablet.getReplicas()) {
-                        for (PublishVersionTask publishVersionTask : 
publishTasks.get(replica.getBackendId())) {
-                            checkReplicaContinuousVersionSucc(tablet.getId(), 
replica, alterReplicaLoadedTxn,
-                                    newVersion, publishVersionTask,
-                                    errorReplicaIds, tabletSuccReplicas, 
tabletWriteFailedReplicas,
-                                    tabletVersionFailedReplicas);
+                        List<PublishVersionTask> publishVersionTasks = 
publishTasks.get(replica.getBackendId());
+                        Preconditions.checkState(publishVersionTasks == null 
|| publishVersionTasks.size() == 1,
+                                "publish tasks: " + publishVersionTasks);
+                        PublishVersionTask publishVersionTask = null;
+                        if (publishVersionTasks != null) {
+                            publishVersionTask = publishVersionTasks.get(0);
                         }
+                        checkReplicaContinuousVersionSucc(tablet.getId(), 
replica, alterReplicaLoadedTxn,
+                                newVersion, publishVersionTask,
+                                errorReplicaIds, tabletSuccReplicas, 
tabletWriteFailedReplicas,
+                                tabletVersionFailedReplicas);
+
                     }
 
                     publishResult = checkQuorumReplicas(transactionState, 
tableId, partition, tablet,
@@ -2627,25 +2633,31 @@ public class DatabaseTransactionMgr {
                         // TODO always use the visible version because the 
replica version is not changed
                         long newVersion = partition.getVisibleVersion() + 1;
                         for (Replica replica : tablet.getReplicas()) {
-                            for (PublishVersionTask publishVersionTask : 
publishTasks.get(replica.getBackendId())) {
-                                boolean needCheck = 
publishVersionTask.getTransactionId()
-                                        == 
subTransactionState.getSubTransactionId()
-                                        && 
publishVersionTask.getPartitionVersionInfos().stream()
-                                        .anyMatch(s -> s.getPartitionId() == 
partitionId);
-                                if (needCheck) {
-                                    
checkReplicaContinuousVersionSucc(tablet.getId(), replica, 
alterReplicaLoadedTxn,
-                                            newVersion, publishVersionTask,
-                                            errorReplicaIds, 
tabletSuccReplicas, tabletWriteFailedReplicas,
-                                            tabletVersionFailedReplicas);
-                                    LOG.debug("after 
checkReplicaContinuousVersion for txn_id={}, sub_txn_id={}, "
-                                                    + "tablet_id={}, 
new_version={}, success_replicas={}, "
-                                                    + "error_replicas={}, 
write_failed_replicas={}, "
-                                                    + 
"version_failed_replicas={}", transactionState.getTransactionId(),
-                                            
subTransactionState.getSubTransactionId(), tablet.getId(), newVersion,
-                                            tabletSuccReplicas, 
errorReplicaIds, tabletWriteFailedReplicas,
-                                            tabletVersionFailedReplicas);
+                            List<PublishVersionTask> publishVersionTasks = 
publishTasks.get(replica.getBackendId());
+                            PublishVersionTask publishVersionTask = null;
+                            if (publishVersionTasks != null) {
+                                List<PublishVersionTask> matchedTasks = 
publishVersionTasks.stream()
+                                        .filter(t -> t.getTransactionId() == 
subTransactionState.getSubTransactionId()
+                                                && 
t.getPartitionVersionInfos().stream()
+                                                .anyMatch(s -> 
s.getPartitionId() == partitionId))
+                                        .collect(Collectors.toList());
+                                Preconditions.checkState(matchedTasks.size() 
<= 1,
+                                        "matched publish tasks: " + 
matchedTasks);
+                                if (matchedTasks.size() == 1) {
+                                    publishVersionTask = matchedTasks.get(0);
                                 }
                             }
+                            checkReplicaContinuousVersionSucc(tablet.getId(), 
replica, alterReplicaLoadedTxn,
+                                    newVersion, publishVersionTask,
+                                    errorReplicaIds, tabletSuccReplicas, 
tabletWriteFailedReplicas,
+                                    tabletVersionFailedReplicas);
+                            LOG.debug("after checkReplicaContinuousVersion for 
txn_id={}, sub_txn_id={}, "
+                                            + "tablet_id={}, new_version={}, 
success_replicas={}, "
+                                            + "error_replicas={}, 
write_failed_replicas={}, "
+                                            + "version_failed_replicas={}", 
transactionState.getTransactionId(),
+                                    subTransactionState.getSubTransactionId(), 
tablet.getId(), newVersion,
+                                    tabletSuccReplicas, errorReplicaIds, 
tabletWriteFailedReplicas,
+                                    tabletVersionFailedReplicas);
                         }
 
                         publishResult = checkQuorumReplicas(transactionState, 
tableId, partition, tablet,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to