This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c25c2d088d IGNITE-25225 Extend PartitionReplicaLifecycleManager API 
(#5683)
9c25c2d088d is described below

commit 9c25c2d088dbe167857b653e0eaaa8f1eb846499
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 24 11:01:23 2025 +0300

    IGNITE-25225 Extend PartitionReplicaLifecycleManager API (#5683)
---
 .../PartitionReplicaLifecycleManager.java          | 22 +++++++------
 .../partition/replicator/ZoneResourcesManager.java | 36 +++++++++++++++-------
 .../PartitionReplicaLifecycleManagerTest.java      |  4 ++-
 3 files changed, 40 insertions(+), 22 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 44ef968e343..1858cb621c4 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.partition.replicator;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Collections.emptySet;
-import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
@@ -248,8 +247,8 @@ public class PartitionReplicaLifecycleManager extends
 
     /**
      * This future completes on {@link #beforeNodeStop()} with {@link 
NodeStoppingException} before the {@link #busyLock} is blocked.
-     * TODO: https://issues.apache.org/jira/browse/IGNITE-17592
-     **/
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-17592
     private final CompletableFuture<Void> stopReplicaLifecycleFuture = new 
CompletableFuture<>();
 
     /**
@@ -1584,9 +1583,7 @@ public class PartitionReplicaLifecycleManager extends
             PartitionMvStorageAccess partitionMvStorageAccess,
             boolean onNodeRecovery
     ) {
-        ZonePartitionResources resources = 
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
-
-        requireNonNull(resources, "Zone partition resources not found 
[zonePartitionId=" + zonePartitionId + ']');
+        ZonePartitionResources resources = 
zonePartitionResources(zonePartitionId);
 
         // Register an intent to register a table-wide replica listener. On 
recovery this method is called before the replica is started,
         // so the listeners will be registered by the thread completing the 
"replicaListenerFuture". On normal operation (where there is
@@ -1720,12 +1717,17 @@ public class PartitionReplicaLifecycleManager extends
 
     @TestOnly
     public HybridTimestamp currentSafeTimeForZonePartition(int zoneId, int 
partId) {
-        return 
requireNonNull(zoneResourcesManager.getZonePartitionResources(new 
ZonePartitionId(zoneId, partId))).raftListener()
-                .currentSafeTime();
+        return zonePartitionResources(new ZonePartitionId(zoneId, 
partId)).raftListener().currentSafeTime();
     }
 
-    @TestOnly
+    /**
+     * Returns resources for the given zone partition.
+     */
     public ZonePartitionResources zonePartitionResources(ZonePartitionId 
zonePartitionId) {
-        return 
requireNonNull(zoneResourcesManager.getZonePartitionResources(zonePartitionId));
+        ZonePartitionResources resources = 
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+
+        assert resources != null : String.format("Missing resources for zone 
partition [zonePartitionId=%s]", zonePartitionId);
+
+        return resources;
     }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 8d82e3d3178..71bf0888a67 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -47,12 +47,11 @@ import 
org.apache.ignite.internal.util.SafeTimeValuesTracker;
 import org.apache.ignite.internal.worker.ThreadAssertions;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
-import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * Manages resources of distribution zones; that is, allows creation of 
underlying storages and closes them on node stop.
  */
-class ZoneResourcesManager implements ManuallyCloseable {
+public class ZoneResourcesManager implements ManuallyCloseable {
     private final TxStateRocksDbSharedStorage sharedTxStateStorage;
 
     private final TxManager txManager;
@@ -125,7 +124,12 @@ class ZoneResourcesManager implements ManuallyCloseable {
                 partitionOperationsExecutor
         );
 
-        var zonePartitionResources = new 
ZonePartitionResources(txStatePartitionStorage, raftGroupListener, 
snapshotStorage);
+        var zonePartitionResources = new ZonePartitionResources(
+                txStatePartitionStorage,
+                raftGroupListener,
+                snapshotStorage,
+                storageIndexTracker
+        );
 
         
zoneResources.resourcesByPartitionId.put(zonePartitionId.partitionId(), 
zonePartitionResources);
 
@@ -139,9 +143,7 @@ class ZoneResourcesManager implements ManuallyCloseable {
             return null;
         }
 
-        ZonePartitionResources zonePartitionResources = 
zoneResources.resourcesByPartitionId.get(zonePartitionId.partitionId());
-
-        return zonePartitionResources;
+        return 
zoneResources.resourcesByPartitionId.get(zonePartitionId.partitionId());
     }
 
     private TxStateStorage createTxStateStorage(int zoneId, int 
partitionCount) {
@@ -220,12 +222,18 @@ class ZoneResourcesManager implements ManuallyCloseable {
         }
     }
 
-    @VisibleForTesting
+    /**
+     * Zone partition resources.
+     */
     public static class ZonePartitionResources {
         private final TxStatePartitionStorage txStatePartitionStorage;
+
         private final ZonePartitionRaftListener raftListener;
+
         private final PartitionSnapshotStorage snapshotStorage;
 
+        private final PendingComparableValuesTracker<Long, Void> 
storageIndexTracker;
+
         /**
          * Future that completes when the zone-wide replica listener is 
created.
          *
@@ -239,25 +247,31 @@ class ZoneResourcesManager implements ManuallyCloseable {
         ZonePartitionResources(
                 TxStatePartitionStorage txStatePartitionStorage,
                 ZonePartitionRaftListener raftListener,
-                PartitionSnapshotStorage snapshotStorage
+                PartitionSnapshotStorage snapshotStorage,
+                PendingComparableValuesTracker<Long, Void> storageIndexTracker
         ) {
             this.txStatePartitionStorage = txStatePartitionStorage;
             this.raftListener = raftListener;
             this.snapshotStorage = snapshotStorage;
+            this.storageIndexTracker = storageIndexTracker;
         }
 
-        TxStatePartitionStorage txStatePartitionStorage() {
+        public TxStatePartitionStorage txStatePartitionStorage() {
             return txStatePartitionStorage;
         }
 
-        ZonePartitionRaftListener raftListener() {
+        public ZonePartitionRaftListener raftListener() {
             return raftListener;
         }
 
-        PartitionSnapshotStorage snapshotStorage() {
+        public PartitionSnapshotStorage snapshotStorage() {
             return snapshotStorage;
         }
 
+        public PendingComparableValuesTracker<Long, Void> 
storageIndexTracker() {
+            return storageIndexTracker;
+        }
+
         public CompletableFuture<ZonePartitionReplicaListener> 
replicaListenerFuture() {
             return replicaListenerFuture;
         }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index b1fa0779560..524b16a9909 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -81,6 +81,7 @@ import 
org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -158,7 +159,8 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 .thenReturn(new ZonePartitionResources(
                         txStatePartitionStorage,
                         raftGroupListener,
-                        partitionSnapshotStorage
+                        partitionSnapshotStorage,
+                        new PendingComparableValuesTracker<>(0L)
                 ));
 
         when(raftManager.startRaftGroupNode(any(), any(), any(), any(), 
any(RaftGroupOptions.class), any()))

Reply via email to