This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d10c90404d IGNITE-26989 Fix stopping of replication groups on node 
stop (#6930)
4d10c90404d is described below

commit 4d10c90404d4447388409c8255c5cd72cc519c91
Author: Ivan Bessonov <[email protected]>
AuthorDate: Tue Nov 11 14:52:03 2025 +0300

    IGNITE-26989 Fix stopping of replication groups on node stop (#6930)
---
 .../PartitionReplicaLifecycleManager.java          |  30 +++--
 .../replicator/StartedReplicationGroups.java       | 134 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 8 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 64f145861f0..8b9ce15fd48 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -69,6 +69,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -206,7 +207,8 @@ public class PartitionReplicaLifecycleManager extends
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(PartitionReplicaLifecycleManager.class);
 
-    private final Set<ZonePartitionId> replicationGroupIds = 
ConcurrentHashMap.newKeySet();
+    /** Tracks starting and started replicas. */
+    private final StartedReplicationGroups startedReplicationGroups = new 
StartedReplicationGroups();
 
     // TODO https://issues.apache.org/jira/browse/IGNITE-25347
     /** (zoneId -> lock) map to provide concurrent access to the zone replicas 
list. */
@@ -699,6 +701,8 @@ public class PartitionReplicaLifecycleManager extends
             return 
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
                     .thenCompose(v -> {
                         try {
+                            
startedReplicationGroups.beforeStartingGroup(zonePartitionId);
+
                             // TODO 
https://issues.apache.org/jira/browse/IGNITE-24654 Properly close 
storageIndexTracker.
                             //  internalTbl.updatePartitionTrackers is used in 
order to add storageIndexTracker to some context for further
                             //  storage closing.
@@ -738,9 +742,15 @@ public class PartitionReplicaLifecycleManager extends
                             return failedFuture(e);
                         }
                     })
+                    .whenComplete((replica, throwable) -> {
+                        if (throwable != null) {
+                            
startedReplicationGroups.startingFailed(zonePartitionId);
+                        }
+                    })
                     .thenCompose(v -> {
                         Supplier<CompletableFuture<Void>> 
addReplicationGroupIdFuture = () -> {
-                            replicationGroupIds.add(zonePartitionId);
+                            
startedReplicationGroups.startingCompleted(zonePartitionId);
+
                             return nullCompletedFuture();
                         };
 
@@ -828,7 +838,8 @@ public class PartitionReplicaLifecycleManager extends
         metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
         metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
 
-        cleanUpPartitionsResources(replicationGroupIds);
+        startedReplicationGroups.waitForStartingReplicas();
+        
cleanUpPartitionsResources(startedReplicationGroups.streamStartedReplicationGroups());
     }
 
     /**
@@ -988,9 +999,12 @@ public class PartitionReplicaLifecycleManager extends
      */
     // TODO: https://issues.apache.org/jira/browse/IGNITE-25107 replace this 
method by the replicas await process.
     public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
-        assert 
zonePartitionsLocks.get(zonePartitionId.zoneId()).isReadLocked() : 
zonePartitionId;
+        assert 
Optional.ofNullable(zonePartitionsLocks.get(zonePartitionId.zoneId()))
+                .map(NaiveAsyncReadWriteLock::isReadLocked)
+                .orElse(false)
+                : zonePartitionId;
 
-        return replicationGroupIds.contains(zonePartitionId);
+        return 
startedReplicationGroups.hasReplicationGroupStarted(zonePartitionId);
     }
 
     /**
@@ -1629,7 +1643,7 @@ public class PartitionReplicaLifecycleManager extends
                                             return nullCompletedFuture();
                                         }
 
-                                        
replicationGroupIds.remove(zonePartitionId);
+                                        
startedReplicationGroups.afterStoppingGroup(zonePartitionId);
 
                                         return 
fireEvent(afterReplicaStoppedEvent, eventParameters);
                                     }, ioExecutor);
@@ -1645,8 +1659,8 @@ public class PartitionReplicaLifecycleManager extends
      *
      * @param partitionIds Partitions to stop.
      */
-    private void cleanUpPartitionsResources(Set<ZonePartitionId> partitionIds) 
{
-        CompletableFuture<?>[] stopPartitionsFuture = partitionIds.stream()
+    private void cleanUpPartitionsResources(Stream<ZonePartitionId> 
partitionIds) {
+        CompletableFuture<?>[] stopPartitionsFuture = partitionIds
                 .map(zonePartitionId -> stopPartitionInternal(
                         zonePartitionId,
                         BEFORE_REPLICA_STOPPED,
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
new file mode 100644
index 00000000000..cb66641923d
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.CompletableFutures;
+
+/**
+ * Class that helps tracking starting and already started replicas. The flow 
is controlled from the outside, this class only helps
+ * maintaining collections of replicas in different statuses.
+ */
+class StartedReplicationGroups {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(StartedReplicationGroups.class);
+
+    /** Groups that are starting but are not yet reported to be fully started. 
*/
+    private final Map<ZonePartitionId, CompletableFuture<Void>> 
startingReplicationGroupIds = new ConcurrentHashMap<>();
+
+    /** Groups that are reported to be started. */
+    private final Set<ZonePartitionId> startedReplicationGroupIds = 
ConcurrentHashMap.newKeySet();
+
+    /**
+     * Callback to be called before replication group begins start procedure. 
It is expected that whoever maintains the replication groups
+     * will call either {@link #startingFailed(ZonePartitionId)} or {@link 
#startingCompleted(ZonePartitionId)} some time after.
+     */
+    void beforeStartingGroup(ZonePartitionId zonePartitionId) {
+        CompletableFuture<Void> startingFuture = new CompletableFuture<>();
+
+        CompletableFuture<Void> prevFuture = 
startingReplicationGroupIds.put(zonePartitionId, startingFuture);
+
+        assert prevFuture == null : "Replication group is starting second 
time. [zonePartitionId=" + zonePartitionId + ']';
+
+        // Copy future state if assertions are disabled.
+        //noinspection ConstantValue
+        if (prevFuture != null) {
+            LOG.info("Replication group is starting second time. 
[zonePartitionId=" + zonePartitionId + ']');
+
+            startingFuture.whenComplete(copyStateTo(prevFuture));
+        }
+    }
+
+    /**
+     * Mark starting replication groups as failed. Such a group won't be 
considered {@code starting} anymore.
+     */
+    void startingFailed(ZonePartitionId zonePartitionId) {
+        completeStartingFuture(zonePartitionId);
+    }
+
+    /**
+     * Mark starting replication groups as successfully started. Such a group 
won't be considered {@code starting} anymore. After this call
+     * the {@link #streamStartedReplicationGroups()} will start returning this 
ID.
+     */
+    void startingCompleted(ZonePartitionId zonePartitionId) {
+        startedReplicationGroupIds.add(zonePartitionId);
+
+        completeStartingFuture(zonePartitionId);
+    }
+
+    /**
+     * Marks the replication group as stopped. After this call the {@link 
#streamStartedReplicationGroups()} will stop returning this ID.
+     */
+    void afterStoppingGroup(ZonePartitionId zonePartitionId) {
+        startedReplicationGroupIds.remove(zonePartitionId);
+    }
+
+    /**
+     * Check if the replication group has started.
+     *
+     * @see #startingCompleted(ZonePartitionId)
+     * @see #afterStoppingGroup(ZonePartitionId)
+     */
+    boolean hasReplicationGroupStarted(ZonePartitionId zonePartitionId) {
+        return startedReplicationGroupIds.contains(zonePartitionId);
+    }
+
+    private void completeStartingFuture(ZonePartitionId zonePartitionId) {
+        CompletableFuture<Void> startingFuture = 
startingReplicationGroupIds.remove(zonePartitionId);
+
+        assert startingFuture != null : "Starting future is not found. 
[zonePartitionId=" + zonePartitionId + ']';
+
+        //noinspection ConstantValue
+        if (startingFuture != null) {
+            startingFuture.complete(null);
+        } else {
+            LOG.info("Starting future is not found. [zonePartitionId=" + 
zonePartitionId + ']');
+        }
+    }
+
+    /**
+     * Wait for all replication groups in {@code starting} status to call 
{@link #startingFailed(ZonePartitionId)} or
+     * {@link #startingCompleted(ZonePartitionId)}.
+     */
+    void waitForStartingReplicas() {
+        try {
+            
CompletableFutures.allOf(startingReplicationGroupIds.values()).get(30, 
TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            LOG.error("Failed to wait starting replicas before stop", e);
+        }
+    }
+
+    /**
+     * Returns a stream of IDs of started replication groups.
+     */
+    Stream<ZonePartitionId> streamStartedReplicationGroups() {
+        return startedReplicationGroupIds.stream();
+    }
+}

Reply via email to