This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ffebf67e0 IGNITE-23063 
ItReplicaLifecycleTest.testStableAreWrittenAfterRestart(TestInfo) is flaky 
(#4291)
2ffebf67e0 is described below

commit 2ffebf67e0204c797d44292ec6f61a3c4aa48b20
Author: Mikhail Efremov <[email protected]>
AuthorDate: Fri Aug 30 12:24:13 2024 +0600

    IGNITE-23063 
ItReplicaLifecycleTest.testStableAreWrittenAfterRestart(TestInfo) is flaky 
(#4291)
---
 .../replicator/ItReplicaLifecycleTest.java         |  2 -
 .../PartitionReplicaLifecycleManager.java          |  3 +-
 .../ignite/internal/replicator/ReplicaManager.java | 77 +++++++++++++++++++---
 3 files changed, 71 insertions(+), 11 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index f2eadd5afe..fea2d42cd3 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -209,7 +209,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 @Timeout(60)
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-23063";)
 // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test 
after the switching to zone-based replication
 public class ItReplicaLifecycleTest extends BaseIgniteAbstractTest {
     private static final IgniteLogger LOG = 
Loggers.forClass(ItReplicaLifecycleTest.class);
@@ -613,7 +612,6 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-23063";)
     void testStableAreWrittenAfterRestart(TestInfo testInfo) throws Exception {
         startNodes(testInfo, 1);
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f24140bc46..6faae35159 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -468,6 +468,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             if (ex != null) {
                 LOG.warn("Unable to update raft groups on the node [zoneId={}, 
partitionId={}]", ex, zoneId, partId);
             }
+
             return null;
         });
     }
@@ -1244,7 +1245,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                 int i = 0;
 
                 for (ReplicationGroupId partitionId : partitionIds) {
-                    stopReplicaFutures[i++] = weakStopPartition(partitionId);
+                    stopReplicaFutures[i++] = stopPartition(partitionId);
                 }
 
                 allOf(stopReplicaFutures).get(10, TimeUnit.SECONDS);
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index e96854b17e..78de8234a7 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -38,6 +38,8 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequests
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -58,6 +60,7 @@ import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.failure.FailureContext;
@@ -115,6 +118,7 @@ import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -179,6 +183,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     /** Replicas. */
     private final ConcurrentHashMap<ReplicationGroupId, 
CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
 
+    /** Futures for stopping raft nodes if the corresponding replicas weren't 
started. */
+    private final Map<RaftNodeId, 
CompletableFuture<TopologyAwareRaftGroupService>> raftClientsFutures = new 
ConcurrentHashMap<>();
+
     private final ClockService clockService;
 
     /** Scheduled executor for idle safe time sync. */
@@ -692,9 +699,35 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             RaftGroupEventsListener raftGroupEventsListener,
             IgniteSpinBusyLock busyLock
     ) throws NodeStoppingException {
-        RaftGroupOptions groupOptions = groupOptionsForPartition(
-                false,
-                snapshotStorageFactory);
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return internalStartZoneReplica(
+                    replicaGrpId,
+                    listener,
+                    snapshotStorageFactory,
+                    newConfiguration,
+                    raftGroupListener,
+                    raftGroupEventsListener,
+                    busyLock
+            );
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Replica> internalStartZoneReplica(
+            ReplicationGroupId replicaGrpId,
+            Function<RaftGroupService, ReplicaListener> listener,
+            SnapshotStorageFactory snapshotStorageFactory,
+            PeersAndLearners newConfiguration,
+            RaftGroupListener raftGroupListener,
+            RaftGroupEventsListener raftGroupEventsListener,
+            IgniteSpinBusyLock busyLock
+    ) throws NodeStoppingException {
+        RaftGroupOptions groupOptions = groupOptionsForPartition(false, 
snapshotStorageFactory);
 
         RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNodeConsistentId));
 
@@ -707,12 +740,16 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 raftGroupServiceFactory
         );
 
+        raftClientsFutures.put(raftNodeId, newRaftClientFut);
+
         return newRaftClientFut.thenComposeAsync(raftClient -> {
             if (!busyLock.enterBusy()) {
                 return failedFuture(new NodeStoppingException());
             }
 
             try {
+                raftClientsFutures.remove(raftNodeId);
+
                 LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
 
                 Replica newReplica = new ZonePartitionReplicaImpl(
@@ -926,12 +963,36 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds, 
TimeUnit.SECONDS);
         shutdownAndAwaitTermination(replicasCreationExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS);
 
-        assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
-                : "There are replicas alive [replicas="
-                + replicas.entrySet().stream().filter(e -> 
e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
+        // A collection of lambdas with raft entities closing and replicas 
completion with NodeStoppingException.
+        Collection<ManuallyCloseable> closeables = new 
ArrayList<>(raftClientsFutures.size() + 1);
 
-        for (CompletableFuture<Replica> replicaFuture : replicas.values()) {
-            replicaFuture.completeExceptionally(new NodeStoppingException());
+        // Sequence of raft-entities stopping processes: if waiting 
raft-client future completion finishes with an exception, then we
+        // don't trying to stop raft-node.
+        raftClientsFutures.forEach((raftNodeId, raftClientFuture) -> 
closeables.add(() -> {
+            raftClientFuture.get(shutdownTimeoutSeconds, TimeUnit.SECONDS);
+
+            try {
+                raftManager.stopRaftNode(raftNodeId);
+            } catch (NodeStoppingException e) {
+                throw new AssertionError("Raft node is stopping [raftNodeId=" 
+ raftNodeId
+                        + "], but it's abnormal, because Raft Manager must 
stop strictly after Replica Manager.", e);
+            }
+        }));
+
+        // The last is completion of replica futures with mandatory check that 
all futures are complete before adding NodeStoppingException.
+        // We couldn't do it in finally block because thus we're loosing 
AssertionError and mandatory assert doesn't matter then.
+        closeables.add(() -> {
+            assert 
replicas.values().stream().noneMatch(CompletableFuture::isDone)
+                    : "There are replicas alive [replicas="
+                    + replicas.entrySet().stream().filter(e -> 
e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
+
+            replicas.values().forEach(replicaFuture -> 
replicaFuture.completeExceptionally(new NodeStoppingException()));
+        });
+
+        try {
+            IgniteUtils.closeAllManually(closeables);
+        } catch (Exception e) {
+            return failedFuture(e);
         }
 
         return nullCompletedFuture();

Reply via email to