This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2ffebf67e0 IGNITE-23063
ItReplicaLifecycleTest.testStableAreWrittenAfterRestart(TestInfo) is flaky
(#4291)
2ffebf67e0 is described below
commit 2ffebf67e0204c797d44292ec6f61a3c4aa48b20
Author: Mikhail Efremov <[email protected]>
AuthorDate: Fri Aug 30 12:24:13 2024 +0600
IGNITE-23063
ItReplicaLifecycleTest.testStableAreWrittenAfterRestart(TestInfo) is flaky
(#4291)
---
.../replicator/ItReplicaLifecycleTest.java | 2 -
.../PartitionReplicaLifecycleManager.java | 3 +-
.../ignite/internal/replicator/ReplicaManager.java | 77 +++++++++++++++++++---
3 files changed, 71 insertions(+), 11 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index f2eadd5afe..fea2d42cd3 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -209,7 +209,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
@Timeout(60)
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-23063")
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test
after the switching to zone-based replication
public class ItReplicaLifecycleTest extends BaseIgniteAbstractTest {
private static final IgniteLogger LOG =
Loggers.forClass(ItReplicaLifecycleTest.class);
@@ -613,7 +612,6 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-23063")
void testStableAreWrittenAfterRestart(TestInfo testInfo) throws Exception {
startNodes(testInfo, 1);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f24140bc46..6faae35159 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -468,6 +468,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
if (ex != null) {
LOG.warn("Unable to update raft groups on the node [zoneId={},
partitionId={}]", ex, zoneId, partId);
}
+
return null;
});
}
@@ -1244,7 +1245,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
int i = 0;
for (ReplicationGroupId partitionId : partitionIds) {
- stopReplicaFutures[i++] = weakStopPartition(partitionId);
+ stopReplicaFutures[i++] = stopPartition(partitionId);
}
allOf(stopReplicaFutures).get(10, TimeUnit.SECONDS);
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index e96854b17e..78de8234a7 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -38,6 +38,8 @@ import static
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequests
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -58,6 +60,7 @@ import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureContext;
@@ -115,6 +118,7 @@ import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -179,6 +183,9 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Replicas. */
private final ConcurrentHashMap<ReplicationGroupId,
CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
+ /** Futures for stopping raft nodes if the corresponding replicas weren't
started. */
+ private final Map<RaftNodeId,
CompletableFuture<TopologyAwareRaftGroupService>> raftClientsFutures = new
ConcurrentHashMap<>();
+
private final ClockService clockService;
/** Scheduled executor for idle safe time sync. */
@@ -692,9 +699,35 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
RaftGroupEventsListener raftGroupEventsListener,
IgniteSpinBusyLock busyLock
) throws NodeStoppingException {
- RaftGroupOptions groupOptions = groupOptionsForPartition(
- false,
- snapshotStorageFactory);
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return internalStartZoneReplica(
+ replicaGrpId,
+ listener,
+ snapshotStorageFactory,
+ newConfiguration,
+ raftGroupListener,
+ raftGroupEventsListener,
+ busyLock
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private CompletableFuture<Replica> internalStartZoneReplica(
+ ReplicationGroupId replicaGrpId,
+ Function<RaftGroupService, ReplicaListener> listener,
+ SnapshotStorageFactory snapshotStorageFactory,
+ PeersAndLearners newConfiguration,
+ RaftGroupListener raftGroupListener,
+ RaftGroupEventsListener raftGroupEventsListener,
+ IgniteSpinBusyLock busyLock
+ ) throws NodeStoppingException {
+ RaftGroupOptions groupOptions = groupOptionsForPartition(false,
snapshotStorageFactory);
RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
@@ -707,12 +740,16 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
raftGroupServiceFactory
);
+ raftClientsFutures.put(raftNodeId, newRaftClientFut);
+
return newRaftClientFut.thenComposeAsync(raftClient -> {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
+ raftClientsFutures.remove(raftNodeId);
+
LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
Replica newReplica = new ZonePartitionReplicaImpl(
@@ -926,12 +963,36 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds,
TimeUnit.SECONDS);
shutdownAndAwaitTermination(replicasCreationExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS);
- assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
- : "There are replicas alive [replicas="
- + replicas.entrySet().stream().filter(e ->
e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
+ // A collection of lambdas with raft entities closing and replicas
completion with NodeStoppingException.
+ Collection<ManuallyCloseable> closeables = new
ArrayList<>(raftClientsFutures.size() + 1);
- for (CompletableFuture<Replica> replicaFuture : replicas.values()) {
- replicaFuture.completeExceptionally(new NodeStoppingException());
+ // Sequence of raft-entities stopping processes: if waiting
raft-client future completion finishes with an exception, then we
+ // don't trying to stop raft-node.
+ raftClientsFutures.forEach((raftNodeId, raftClientFuture) ->
closeables.add(() -> {
+ raftClientFuture.get(shutdownTimeoutSeconds, TimeUnit.SECONDS);
+
+ try {
+ raftManager.stopRaftNode(raftNodeId);
+ } catch (NodeStoppingException e) {
+ throw new AssertionError("Raft node is stopping [raftNodeId="
+ raftNodeId
+ + "], but it's abnormal, because Raft Manager must
stop strictly after Replica Manager.", e);
+ }
+ }));
+
+ // The last is completion of replica futures with mandatory check that
all futures are complete before adding NodeStoppingException.
+ // We couldn't do it in finally block because thus we're loosing
AssertionError and mandatory assert doesn't matter then.
+ closeables.add(() -> {
+ assert
replicas.values().stream().noneMatch(CompletableFuture::isDone)
+ : "There are replicas alive [replicas="
+ + replicas.entrySet().stream().filter(e ->
e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
+
+ replicas.values().forEach(replicaFuture ->
replicaFuture.completeExceptionally(new NodeStoppingException()));
+ });
+
+ try {
+ IgniteUtils.closeAllManually(closeables);
+ } catch (Exception e) {
+ return failedFuture(e);
}
return nullCompletedFuture();