This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 27c0bc5fc7a IGNITE-27790 Fix closing zone resources on node stop
(#7571)
27c0bc5fc7a is described below
commit 27c0bc5fc7ac8cd65f28b653419c2e9ee6a2fd60
Author: Ivan Zlenko <[email protected]>
AuthorDate: Thu Feb 12 14:57:49 2026 +0500
IGNITE-27790 Fix closing zone resources on node stop (#7571)
---
.../PartitionReplicaLifecycleManager.java | 18 ++++++-
.../partition/replicator/ZoneResourcesManager.java | 22 +++++++--
.../PartitionReplicaLifecycleManagerTest.java | 57 +++++++++++++++++++---
3 files changed, 83 insertions(+), 14 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f87660dcb76..e66bdea7289 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -1354,7 +1354,17 @@ public class PartitionReplicaLifecycleManager extends
return replicaMgr.weakStopReplica(
zonePartitionId,
WeakReplicaStopReason.RESTART,
- () -> stopPartitionInternal(zonePartitionId,
BEFORE_REPLICA_STOPPED, AFTER_REPLICA_STOPPED, revision, replica -> {})
+ () -> stopPartitionInternal(
+ zonePartitionId,
+ BEFORE_REPLICA_STOPPED,
+ AFTER_REPLICA_STOPPED,
+ revision,
+ replicaWasStopped -> {
+ if (replicaWasStopped) {
+
zoneResourcesManager.removeZonePartitionResources(zonePartitionId);
+ }
+ }
+ )
);
}
@@ -1812,7 +1822,11 @@ public class PartitionReplicaLifecycleManager extends
BEFORE_REPLICA_STOPPED,
AFTER_REPLICA_STOPPED,
-1L,
- replicaWasStopped -> {}
+ replicaWasStopped -> {
+ if (replicaWasStopped) {
+
zoneResourcesManager.removeZonePartitionResources(zonePartitionId);
+ }
+ }
))
.toArray(CompletableFuture[]::new);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 81cd7f337ae..099bc16255b 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -192,6 +192,19 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
});
}
+ /**
+ * Removes partition resources from the zone. It is safe to do so since
resources should've been closed on before node stop event.
+ */
+ void removeZonePartitionResources(ZonePartitionId zonePartitionId) {
+ inBusyLock(busyLock, () -> {
+ ZoneResources resources =
resourcesByZoneId.get(zonePartitionId.zoneId());
+
+ if (resources != null) {
+
resources.resourcesByPartitionId.remove(zonePartitionId.partitionId());
+ }
+ });
+ }
+
CompletableFuture<Void> removeTableResources(ZonePartitionId
zonePartitionId, int tableId) {
ZonePartitionResources resources =
getZonePartitionResources(zonePartitionId);
@@ -211,8 +224,8 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
/**
* Returns future of true if there are no corresponding table-related
resources, otherwise awaits replicaListenerFuture
- * and checks whether table replica processors, table raft processors and
partition snapshot storages are present.
- * if any is present, returns false, otherwise returns true.
+ * and checks whether table replica processors, table raft processors,
and partition snapshot storages are present.
+ * If any is present, returns {@code false}, otherwise returns {@code
true}.
*/
CompletableFuture<Boolean> areTableResourcesEmpty(ZonePartitionId
zonePartitionId) {
ZonePartitionResources resources =
getZonePartitionResources(zonePartitionId);
@@ -302,7 +315,7 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
return txStatePartitionStorage;
}
- public boolean txStatePartitionStorageIsInRebalanceState() {
+ boolean txStatePartitionStorageIsInRebalanceState() {
return txStatePartitionStorage.lastAppliedIndex() ==
TxStatePartitionStorage.REBALANCE_IN_PROGRESS;
}
@@ -323,7 +336,7 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
}
/** Closes trackers. */
- public void closeTrackers() {
+ void closeTrackers() {
safeTimeTracker.close();
storageIndexTracker.close();
}
@@ -331,7 +344,6 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
/** Closes all resources. */
public void close() {
closeTrackers();
- raftListener.onShutdown();
txStatePartitionStorage.close();
}
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index dbd8f483114..75097d80533 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -21,6 +21,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.CatalogTestUtils.TEST_DELAY_DURATION;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
@@ -66,7 +67,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.PartitionCountProvider;
-import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
@@ -154,6 +154,10 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
private ZonePartitionResources commonZonePartitionResources;
+ private SafeTimeValuesTracker commonSafeTimeTracker;
+
+ private PendingComparableValuesTracker<Long, Void>
commonStorageIndexTracker;
+
@Mock
private Loza raftManager;
@@ -201,12 +205,16 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
when(distributionZoneManager.dataNodes(any(), anyInt(),
anyInt())).thenReturn(completedFuture(Set.of(nodeName)));
+ // Create spy trackers for testing resource closure
+ commonSafeTimeTracker = spy(new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE));
+ commonStorageIndexTracker = spy(new
PendingComparableValuesTracker<>(0L));
+
commonZonePartitionResources = spy(new ZonePartitionResources(
txStatePartitionStorage,
raftGroupListener,
partitionSnapshotStorage,
- new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE),
- new PendingComparableValuesTracker<>(0L)
+ commonSafeTimeTracker,
+ commonStorageIndexTracker
));
when(raftManager.startRaftGroupNode(any(), any(), any(), any(),
any(RaftGroupOptions.class), any()))
@@ -413,7 +421,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
assertThat(partitionReplicaLifecycleManager.stopAsync(),
willCompleteSuccessfully());
- verify(replicaManager,
times(CatalogUtils.DEFAULT_PARTITION_COUNT)).stopReplica(any());
+ verify(replicaManager,
times(DEFAULT_PARTITION_COUNT)).stopReplica(any());
}
@Test
@@ -424,7 +432,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
assertThat(partitionReplicaLifecycleManager.stopAsync(),
willCompleteSuccessfully());
- verify(replicaManager,
times(CatalogUtils.DEFAULT_PARTITION_COUNT)).stopReplica(any());
+ verify(replicaManager,
times(DEFAULT_PARTITION_COUNT)).stopReplica(any());
// Do reset for correct replica manager stop on tear down.
reset(replicaManager);
@@ -436,7 +444,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
doReturn(commonZonePartitionResources).when(zoneResourcesManager).getZonePartitionResources(any());
int defaultZoneId = catalogManager.latestCatalog().defaultZone().id();
- List<ZonePartitionResources> defaultZoneResources = IntStream.range(0,
CatalogUtils.DEFAULT_PARTITION_COUNT)
+ List<ZonePartitionResources> defaultZoneResources = IntStream.range(0,
DEFAULT_PARTITION_COUNT)
.mapToObj(partId -> new ZonePartitionId(defaultZoneId, partId))
.map(partitionReplicaLifecycleManager::zonePartitionResources)
.collect(Collectors.toList());
@@ -451,11 +459,46 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
assertThat(partitionReplicaLifecycleManager.stopAsync(),
willThrow(RuntimeException.class));
- verify(replicaManager,
times(CatalogUtils.DEFAULT_PARTITION_COUNT)).stopReplica(any());
+ verify(replicaManager,
times(DEFAULT_PARTITION_COUNT)).stopReplica(any());
defaultZoneResources.forEach(resources ->
verify(resources.txStatePartitionStorage(), atLeastOnce()).close());
}
+ @Test
+ void testResourcesClosedOnPartitionRestart() {
+ int zoneId = catalogManager.latestCatalog().defaultZone().id();
+ var zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+
doReturn(commonZonePartitionResources).when(zoneResourcesManager).getZonePartitionResources(zonePartitionId);
+
+ assertThat(
+
partitionReplicaLifecycleManager.restartPartition(zonePartitionId,
Long.MAX_VALUE, clock.nowLong()),
+ willCompleteSuccessfully()
+ );
+
+ verify(commonSafeTimeTracker, timeout(1_000).times(1)).close();
+ verify(commonStorageIndexTracker, timeout(1_000).times(1)).close();
+
+ // We do not close tx state partition storage on restart.
+ verify(commonZonePartitionResources.txStatePartitionStorage(),
times(0)).close();
+ }
+
+ @Test
+ @ManagerIsStoppedByTest
+ void testResourcesClosedOnManagerStop() {
+
doReturn(commonZonePartitionResources).when(zoneResourcesManager).getZonePartitionResources(any());
+
+ assertDoesNotThrow(() ->
partitionReplicaLifecycleManager.beforeNodeStop());
+ assertThat(partitionReplicaLifecycleManager.stopAsync(),
willCompleteSuccessfully());
+
+ verify(commonSafeTimeTracker, times(DEFAULT_PARTITION_COUNT)).close();
+ verify(commonStorageIndexTracker,
times(DEFAULT_PARTITION_COUNT)).close();
+ verify(commonZonePartitionResources.txStatePartitionStorage(),
times(DEFAULT_PARTITION_COUNT)).close();
+
+ // Verify raftListener.onShutdown() is NOT called during resource
cleanup, as it should be called by the Raft node shutdown process.
+ verify(commonZonePartitionResources.raftListener(),
times(0)).onShutdown();
+ }
+
@Retention(RUNTIME)
private @interface ManagerIsStoppedByTest {
}