This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4d10c90404d IGNITE-26989 Fix stopping of replication groups on node
stop (#6930)
4d10c90404d is described below
commit 4d10c90404d4447388409c8255c5cd72cc519c91
Author: Ivan Bessonov <[email protected]>
AuthorDate: Tue Nov 11 14:52:03 2025 +0300
IGNITE-26989 Fix stopping of replication groups on node stop (#6930)
---
.../PartitionReplicaLifecycleManager.java | 30 +++--
.../replicator/StartedReplicationGroups.java | 134 +++++++++++++++++++++
2 files changed, 156 insertions(+), 8 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 64f145861f0..8b9ce15fd48 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -69,6 +69,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -206,7 +207,8 @@ public class PartitionReplicaLifecycleManager extends
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(PartitionReplicaLifecycleManager.class);
- private final Set<ZonePartitionId> replicationGroupIds =
ConcurrentHashMap.newKeySet();
+ /** Tracks starting and started replicas. */
+ private final StartedReplicationGroups startedReplicationGroups = new
StartedReplicationGroups();
// TODO https://issues.apache.org/jira/browse/IGNITE-25347
/** (zoneId -> lock) map to provide concurrent access to the zone replicas
list. */
@@ -699,6 +701,8 @@ public class PartitionReplicaLifecycleManager extends
return
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
.thenCompose(v -> {
try {
+
startedReplicationGroups.beforeStartingGroup(zonePartitionId);
+
// TODO
https://issues.apache.org/jira/browse/IGNITE-24654 Properly close
storageIndexTracker.
// internalTbl.updatePartitionTrackers is used in
order to add storageIndexTracker to some context for further
// storage closing.
@@ -738,9 +742,15 @@ public class PartitionReplicaLifecycleManager extends
return failedFuture(e);
}
})
+ .whenComplete((replica, throwable) -> {
+ if (throwable != null) {
+
startedReplicationGroups.startingFailed(zonePartitionId);
+ }
+ })
.thenCompose(v -> {
Supplier<CompletableFuture<Void>>
addReplicationGroupIdFuture = () -> {
- replicationGroupIds.add(zonePartitionId);
+
startedReplicationGroups.startingCompleted(zonePartitionId);
+
return nullCompletedFuture();
};
@@ -828,7 +838,8 @@ public class PartitionReplicaLifecycleManager extends
metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
- cleanUpPartitionsResources(replicationGroupIds);
+ startedReplicationGroups.waitForStartingReplicas();
+
cleanUpPartitionsResources(startedReplicationGroups.streamStartedReplicationGroups());
}
/**
@@ -988,9 +999,12 @@ public class PartitionReplicaLifecycleManager extends
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-25107 replace this
method by the replicas await process.
public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
- assert
zonePartitionsLocks.get(zonePartitionId.zoneId()).isReadLocked() :
zonePartitionId;
+ assert
Optional.ofNullable(zonePartitionsLocks.get(zonePartitionId.zoneId()))
+ .map(NaiveAsyncReadWriteLock::isReadLocked)
+ .orElse(false)
+ : zonePartitionId;
- return replicationGroupIds.contains(zonePartitionId);
+ return
startedReplicationGroups.hasReplicationGroupStarted(zonePartitionId);
}
/**
@@ -1629,7 +1643,7 @@ public class PartitionReplicaLifecycleManager extends
return nullCompletedFuture();
}
-
replicationGroupIds.remove(zonePartitionId);
+
startedReplicationGroups.afterStoppingGroup(zonePartitionId);
return
fireEvent(afterReplicaStoppedEvent, eventParameters);
}, ioExecutor);
@@ -1645,8 +1659,8 @@ public class PartitionReplicaLifecycleManager extends
*
* @param partitionIds Partitions to stop.
*/
- private void cleanUpPartitionsResources(Set<ZonePartitionId> partitionIds)
{
- CompletableFuture<?>[] stopPartitionsFuture = partitionIds.stream()
+ private void cleanUpPartitionsResources(Stream<ZonePartitionId>
partitionIds) {
+ CompletableFuture<?>[] stopPartitionsFuture = partitionIds
.map(zonePartitionId -> stopPartitionInternal(
zonePartitionId,
BEFORE_REPLICA_STOPPED,
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
new file mode 100644
index 00000000000..cb66641923d
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.CompletableFutures;
+
+/**
+ * Class that helps tracking starting and already started replicas. The flow
is controlled from the outside, this class only helps
+ * maintaining collections of replicas in different statuses.
+ */
+class StartedReplicationGroups {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(StartedReplicationGroups.class);
+
+ /** Groups that are starting but are not yet reported to be fully started.
*/
+ private final Map<ZonePartitionId, CompletableFuture<Void>>
startingReplicationGroupIds = new ConcurrentHashMap<>();
+
+ /** Groups that are reported to be started. */
+ private final Set<ZonePartitionId> startedReplicationGroupIds =
ConcurrentHashMap.newKeySet();
+
+ /**
+ * Callback to be called before replication group begins start procedure.
It is expected that whoever maintains the replication groups
+ * will call either {@link #startingFailed(ZonePartitionId)} or {@link
#startingCompleted(ZonePartitionId)} some time after.
+ */
+ void beforeStartingGroup(ZonePartitionId zonePartitionId) {
+ CompletableFuture<Void> startingFuture = new CompletableFuture<>();
+
+ CompletableFuture<Void> prevFuture =
startingReplicationGroupIds.put(zonePartitionId, startingFuture);
+
+ assert prevFuture == null : "Replication group is starting second
time. [zonePartitionId=" + zonePartitionId + ']';
+
+ // Copy future state if assertions are disabled.
+ //noinspection ConstantValue
+ if (prevFuture != null) {
+ LOG.info("Replication group is starting second time.
[zonePartitionId=" + zonePartitionId + ']');
+
+ startingFuture.whenComplete(copyStateTo(prevFuture));
+ }
+ }
+
+ /**
+ * Mark starting replication groups as failed. Such a group won't be
considered {@code starting} anymore.
+ */
+ void startingFailed(ZonePartitionId zonePartitionId) {
+ completeStartingFuture(zonePartitionId);
+ }
+
+ /**
+ * Mark starting replication groups as successfully started. Such a group
won't be considered {@code starting} anymore. After this call
+ * the {@link #streamStartedReplicationGroups()} will start returning this
ID.
+ */
+ void startingCompleted(ZonePartitionId zonePartitionId) {
+ startedReplicationGroupIds.add(zonePartitionId);
+
+ completeStartingFuture(zonePartitionId);
+ }
+
+ /**
+ * Marks the replication group as stopped. After this call the {@link
#streamStartedReplicationGroups()} will stop returning this ID.
+ */
+ void afterStoppingGroup(ZonePartitionId zonePartitionId) {
+ startedReplicationGroupIds.remove(zonePartitionId);
+ }
+
+ /**
+ * Check if the replication group has started.
+ *
+ * @see #startingCompleted(ZonePartitionId)
+ * @see #afterStoppingGroup(ZonePartitionId)
+ */
+ boolean hasReplicationGroupStarted(ZonePartitionId zonePartitionId) {
+ return startedReplicationGroupIds.contains(zonePartitionId);
+ }
+
+ private void completeStartingFuture(ZonePartitionId zonePartitionId) {
+ CompletableFuture<Void> startingFuture =
startingReplicationGroupIds.remove(zonePartitionId);
+
+ assert startingFuture != null : "Starting future is not found.
[zonePartitionId=" + zonePartitionId + ']';
+
+ //noinspection ConstantValue
+ if (startingFuture != null) {
+ startingFuture.complete(null);
+ } else {
+ LOG.info("Starting future is not found. [zonePartitionId=" +
zonePartitionId + ']');
+ }
+ }
+
+ /**
+ * Wait for all replication groups in {@code starting} status to call
{@link #startingFailed(ZonePartitionId)} or
+ * {@link #startingCompleted(ZonePartitionId)}.
+ */
+ void waitForStartingReplicas() {
+ try {
+
CompletableFutures.allOf(startingReplicationGroupIds.values()).get(30,
TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ LOG.error("Failed to wait starting replicas before stop", e);
+ }
+ }
+
+ /**
+ * Returns a stream of IDs of started replication groups.
+ */
+ Stream<ZonePartitionId> streamStartedReplicationGroups() {
+ return startedReplicationGroupIds.stream();
+ }
+}