This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fa545f8e4d IGNITE-24521 Move listeners map into ZoneResourcesManager 
(#5232)
0fa545f8e4d is described below

commit 0fa545f8e4d7e611aa9eb642fd0ef8dc4301d6ae
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Feb 17 10:17:27 2025 +0200

    IGNITE-24521 Move listeners map into ZoneResourcesManager (#5232)
---
 .../PartitionReplicaLifecycleManager.java          |  99 +++---------
 .../partition/replicator/ZoneResourcesManager.java | 176 +++++++++++++++++----
 .../PartitionReplicaLifecycleManagerTest.java      |  23 ++-
 .../replicator/ZoneResourcesManagerTest.java       |  75 +++++----
 4 files changed, 223 insertions(+), 150 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 339b993a0ce..c751a220071 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -67,7 +67,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -112,12 +111,9 @@ import 
org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
 import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
-import 
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
 import 
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
@@ -144,8 +140,6 @@ import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.util.SafeTimeValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -226,39 +220,12 @@ public class PartitionReplicaLifecycleManager extends
 
     private final SchemaManager schemaManager;
 
-    private final OutgoingSnapshotsManager outgoingSnapshotsManager;
-
     /** A predicate that checks that the given assignment is corresponded to 
the local node. */
     private final Predicate<Assignment> isLocalNodeAssignment = assignment -> 
assignment.consistentId().equals(localNode().name());
 
     /** Configuration of rebalance retries delay. */
     private final SystemDistributedConfigurationPropertyHolder<Integer> 
rebalanceRetryDelayConfiguration;
 
-    // TODO: move this map into ZoneResourcesManager, see 
https://issues.apache.org/jira/browse/IGNITE-24521
-    private final ConcurrentMap<ZonePartitionId, Listeners> 
listenersByZonePartitionId = new ConcurrentHashMap<>();
-
-    /** Holder class for Replica and Raft listeners. */
-    private static class Listeners {
-        /**
-         * Future that completes when the zone-wide replica listener is 
created.
-         *
-         * <p>This is needed, because on recovery tables are started before 
zone replicas and we need to postpone registering table-wide
-         * replica listeners until the zone replica is started.
-         *
-         * <p>During normal operations this future will be complete and 
table-wide listener registration will happen immediately.
-         */
-        final CompletableFuture<ZonePartitionReplicaListener> 
replicaListenerFuture = new CompletableFuture<>();
-
-        final ZonePartitionRaftListener raftListener;
-
-        final PartitionSnapshotStorageFactory snapshotStorageFactory;
-
-        Listeners(ZonePartitionRaftListener raftListener, 
PartitionSnapshotStorageFactory snapshotStorageFactory) {
-            this.raftListener = raftListener;
-            this.snapshotStorageFactory = snapshotStorageFactory;
-        }
-    }
-
     private final ZoneResourcesManager zoneResourcesManager;
 
     /**
@@ -315,8 +282,14 @@ public class PartitionReplicaLifecycleManager extends
                 systemDistributedConfiguration,
                 txManager,
                 schemaManager,
-                outgoingSnapshotsManager,
-                new ZoneResourcesManager(sharedTxStateStorage)
+                new ZoneResourcesManager(
+                        sharedTxStateStorage,
+                        txManager,
+                        outgoingSnapshotsManager,
+                        topologyService,
+                        catalogService,
+                        partitionOperationsExecutor
+                )
         );
     }
 
@@ -337,7 +310,6 @@ public class PartitionReplicaLifecycleManager extends
             SystemDistributedConfiguration systemDistributedConfiguration,
             TxManager txManager,
             SchemaManager schemaManager,
-            OutgoingSnapshotsManager outgoingSnapshotsManager,
             ZoneResourcesManager zoneResourcesManager
     ) {
         this.catalogService = catalogService;
@@ -354,7 +326,6 @@ public class PartitionReplicaLifecycleManager extends
         this.placementDriver = placementDriver;
         this.txManager = txManager;
         this.schemaManager = schemaManager;
-        this.outgoingSnapshotsManager = outgoingSnapshotsManager;
         this.zoneResourcesManager = zoneResourcesManager;
 
         rebalanceRetryDelayConfiguration = new 
SystemDistributedConfigurationPropertyHolder<>(
@@ -590,39 +561,10 @@ public class PartitionReplicaLifecycleManager extends
                 rebalanceRetryDelayConfiguration
         );
 
-        var safeTimeTracker = new 
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
-        var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
-
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
             var eventParams = new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision);
 
-            TxStatePartitionStorage txStatePartitionStorage = 
zoneResourcesManager.getOrCreatePartitionTxStateStorage(
-                    zonePartitionId.zoneId(),
-                    partitionCount,
-                    zonePartitionId.partitionId()
-            );
-
-            var raftGroupListener = new ZonePartitionRaftListener(
-                    zonePartitionId,
-                    txStatePartitionStorage,
-                    txManager,
-                    safeTimeTracker,
-                    storageIndexTracker,
-                    outgoingSnapshotsManager
-            );
-
-            var snapshotStorageFactory = new PartitionSnapshotStorageFactory(
-                    new ZonePartitionKey(zonePartitionId.zoneId(), 
zonePartitionId.partitionId()),
-                    topologyService,
-                    outgoingSnapshotsManager,
-                    new PartitionTxStateAccessImpl(txStatePartitionStorage),
-                    catalogService,
-                    partitionOperationsExecutor
-            );
-
-            var listeners = new Listeners(raftGroupListener, 
snapshotStorageFactory);
-
-            listenersByZonePartitionId.put(zonePartitionId, listeners);
+            ZonePartitionResources zoneResources = 
zoneResourcesManager.allocateZonePartitionResources(zonePartitionId, 
partitionCount);
 
             return 
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
                     .thenCompose(v -> {
@@ -631,7 +573,7 @@ public class PartitionReplicaLifecycleManager extends
                                     zonePartitionId,
                                     raftClient -> {
                                         var replicaListener = new 
ZonePartitionReplicaListener(
-                                                txStatePartitionStorage,
+                                                
zoneResources.txStatePartitionStorage(),
                                                 clockService,
                                                 txManager,
                                                 new 
CatalogValidationSchemasSource(catalogService, schemaManager),
@@ -641,13 +583,13 @@ public class PartitionReplicaLifecycleManager extends
                                                 zonePartitionId
                                         );
 
-                                        
listeners.replicaListenerFuture.complete(replicaListener);
+                                        
zoneResources.replicaListenerFuture().complete(replicaListener);
 
                                         return replicaListener;
                                     },
-                                    snapshotStorageFactory,
+                                    zoneResources.snapshotStorageFactory(),
                                     stablePeersAndLearners,
-                                    raftGroupListener,
+                                    zoneResources.raftListener(),
                                     raftGroupEventsListener,
                                     // TODO: IGNITE-24371 - pass real 
isVolatile flag
                                     false,
@@ -664,7 +606,7 @@ public class PartitionReplicaLifecycleManager extends
                     }))
                     .whenComplete((v, e) -> {
                         if (e != null) {
-                            listenersByZonePartitionId.remove(zonePartitionId);
+                            
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
                         }
                     });
         };
@@ -1385,7 +1327,7 @@ public class PartitionReplicaLifecycleManager extends
                         return nullCompletedFuture();
                     }
 
-                    
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId.zoneId(), 
zonePartitionId.partitionId());
+                    
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
 
                     return fireEvent(
                             LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED,
@@ -1408,7 +1350,6 @@ public class PartitionReplicaLifecycleManager extends
                 return replicaMgr.stopReplica(zonePartitionId)
                         .thenApply((replicaWasStopped) -> {
                             if (replicaWasStopped) {
-                                
listenersByZonePartitionId.remove(zonePartitionId);
                                 replicationGroupIds.remove(zonePartitionId);
                             }
 
@@ -1473,20 +1414,20 @@ public class PartitionReplicaLifecycleManager extends
             RaftTableProcessor raftTableProcessor,
             PartitionMvStorageAccess partitionMvStorageAccess
     ) {
-        Listeners listeners = listenersByZonePartitionId.get(zonePartitionId);
+        ZonePartitionResources resources = 
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
 
         // Register an intent to register a table-wide replica listener. On 
recovery this method is called before the replica is started,
         // so the listeners will be registered by the thread completing the 
"replicaListenerFuture". On normal operation (where there is
         // a HB relationship between zone and table creation) zone-wide 
replica must already be started, this future will always be
         // completed and the listeners will be registered immediately.
-        listeners.replicaListenerFuture.thenAccept(zoneReplicaListener -> 
zoneReplicaListener.addTableReplicaListener(
+        resources.replicaListenerFuture().thenAccept(zoneReplicaListener -> 
zoneReplicaListener.addTableReplicaListener(
                 tablePartitionId,
                 tablePartitionReplicaListenerFactory
         ));
 
-        listeners.raftListener.addTableProcessor(tablePartitionId, 
raftTableProcessor);
+        resources.raftListener().addTableProcessor(tablePartitionId, 
raftTableProcessor);
 
-        
listeners.snapshotStorageFactory.addMvPartition(tablePartitionId.tableId(), 
partitionMvStorageAccess);
+        
resources.snapshotStorageFactory().addMvPartition(tablePartitionId.tableId(), 
partitionMvStorageAccess);
     }
 
     private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, 
Supplier<CompletableFuture<T>> action) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 90119e04ddf..64f29aac5ad 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -19,15 +19,30 @@ package org.apache.ignite.internal.partition.replicator;
 
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
 import 
org.apache.ignite.internal.tx.storage.state.ThreadAssertingTxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.SafeTimeValuesTracker;
 import org.apache.ignite.internal.worker.ThreadAssertions;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -38,35 +53,85 @@ import org.jetbrains.annotations.TestOnly;
 class ZoneResourcesManager implements ManuallyCloseable {
     private final TxStateRocksDbSharedStorage sharedTxStateStorage;
 
+    private final TxManager txManager;
+
+    private final OutgoingSnapshotsManager outgoingSnapshotsManager;
+
+    private final TopologyService topologyService;
+
+    private final CatalogService catalogService;
+
+    private final Executor partitionOperationsExecutor;
+
     /** Map from zone IDs to their resource holders. */
     private final Map<Integer, ZoneResources> resourcesByZoneId = new 
ConcurrentHashMap<>();
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    ZoneResourcesManager(TxStateRocksDbSharedStorage sharedTxStateStorage) {
+    ZoneResourcesManager(
+            TxStateRocksDbSharedStorage sharedTxStateStorage,
+            TxManager txManager,
+            OutgoingSnapshotsManager outgoingSnapshotsManager,
+            TopologyService topologyService,
+            CatalogService catalogService,
+            Executor partitionOperationsExecutor
+    ) {
         this.sharedTxStateStorage = sharedTxStateStorage;
+        this.txManager = txManager;
+        this.outgoingSnapshotsManager = outgoingSnapshotsManager;
+        this.topologyService = topologyService;
+        this.catalogService = catalogService;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
     }
 
-    /**
-     * Gets or creates a transaction state storage for a zone partition.
-     *
-     * @param zoneId ID of the zone.
-     * @param partitionCount Number of partitions in the zone.
-     * @param partitionId Partition ID.
-     */
-    TxStatePartitionStorage getOrCreatePartitionTxStateStorage(int zoneId, int 
partitionCount, int partitionId) {
-        return inBusyLock(busyLock, () -> {
-            ZoneResources zoneResources = resourcesByZoneId.computeIfAbsent(
-                    zoneId,
-                    id -> createZoneResources(id, partitionCount)
-            );
-
-            return 
zoneResources.txStateStorage.getOrCreatePartitionStorage(partitionId);
-        });
+    ZonePartitionResources allocateZonePartitionResources(ZonePartitionId 
zonePartitionId, int partitionCount) {
+        ZoneResources zoneResources = resourcesByZoneId.computeIfAbsent(
+                zonePartitionId.zoneId(),
+                zoneId -> new ZoneResources(createTxStateStorage(zoneId, 
partitionCount))
+        );
+
+        TxStatePartitionStorage txStatePartitionStorage = 
zoneResources.txStateStorage
+                .getOrCreatePartitionStorage(zonePartitionId.partitionId());
+
+        var safeTimeTracker = new 
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
+
+        var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
+
+        var raftGroupListener = new ZonePartitionRaftListener(
+                zonePartitionId,
+                txStatePartitionStorage,
+                txManager,
+                safeTimeTracker,
+                storageIndexTracker,
+                outgoingSnapshotsManager
+        );
+
+        var snapshotStorageFactory = new PartitionSnapshotStorageFactory(
+                new ZonePartitionKey(zonePartitionId.zoneId(), 
zonePartitionId.partitionId()),
+                topologyService,
+                outgoingSnapshotsManager,
+                new PartitionTxStateAccessImpl(txStatePartitionStorage),
+                catalogService,
+                partitionOperationsExecutor
+        );
+
+        var zonePartitionResources = new 
ZonePartitionResources(txStatePartitionStorage, raftGroupListener, 
snapshotStorageFactory);
+
+        
zoneResources.resourcesByPartitionId.put(zonePartitionId.partitionId(), 
zonePartitionResources);
+
+        return zonePartitionResources;
     }
 
-    private ZoneResources createZoneResources(int zoneId, int partitionCount) {
-        return new ZoneResources(createTxStateStorage(zoneId, partitionCount));
+    ZonePartitionResources getZonePartitionResources(ZonePartitionId 
zonePartitionId) {
+        ZoneResources zoneResources = 
resourcesByZoneId.get(zonePartitionId.zoneId());
+
+        assert zoneResources != null : "Missing resources for zone " + 
zonePartitionId.zoneId();
+
+        ZonePartitionResources zonePartitionResources = 
zoneResources.resourcesByPartitionId.get(zonePartitionId.partitionId());
+
+        assert zonePartitionResources != null : "Missing resources for 
partition " + zonePartitionId;
+
+        return zonePartitionResources;
     }
 
     private TxStateStorage createTxStateStorage(int zoneId, int 
partitionCount) {
@@ -87,30 +152,27 @@ class ZoneResourcesManager implements ManuallyCloseable {
 
         for (ZoneResources zoneResources : resourcesByZoneId.values()) {
             zoneResources.txStateStorage.close();
+            zoneResources.resourcesByPartitionId.clear();
         }
+
+        resourcesByZoneId.clear();
     }
 
-    void destroyZonePartitionResources(int zoneId, int partitionId) {
+    void destroyZonePartitionResources(ZonePartitionId zonePartitionId) {
         inBusyLock(busyLock, () -> {
-            ZoneResources resources = resourcesByZoneId.get(zoneId);
+            ZoneResources resources = 
resourcesByZoneId.get(zonePartitionId.zoneId());
 
             if (resources != null) {
-                resources.txStateStorage.destroyTxStateStorage(partitionId);
+                
resources.resourcesByPartitionId.remove(zonePartitionId.partitionId());
+
+                
resources.txStateStorage.destroyTxStateStorage(zonePartitionId.partitionId());
             }
         });
     }
 
-    private static class ZoneResources {
-        private final TxStateStorage txStateStorage;
-
-        private ZoneResources(TxStateStorage txStateStorage) {
-            this.txStateStorage = txStateStorage;
-        }
-    }
-
     @TestOnly
     @Nullable
-    public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int 
partitionId) {
+    TxStatePartitionStorage txStatePartitionStorage(int zoneId, int 
partitionId) {
         ZoneResources resources = resourcesByZoneId.get(zoneId);
 
         if (resources == null) {
@@ -119,4 +181,56 @@ class ZoneResourcesManager implements ManuallyCloseable {
 
         return resources.txStateStorage.getPartitionStorage(partitionId);
     }
+
+    private static class ZoneResources {
+        final TxStateStorage txStateStorage;
+
+        final Map<Integer, ZonePartitionResources> resourcesByPartitionId = 
new Int2ObjectOpenHashMap<>();
+
+        ZoneResources(TxStateStorage txStateStorage) {
+            this.txStateStorage = txStateStorage;
+        }
+    }
+
+    static class ZonePartitionResources {
+        private final TxStatePartitionStorage txStatePartitionStorage;
+        private final ZonePartitionRaftListener raftListener;
+        private final PartitionSnapshotStorageFactory snapshotStorageFactory;
+
+        /**
+         * Future that completes when the zone-wide replica listener is 
created.
+         *
+         * <p>This is needed, because on recovery tables are started before 
zone replicas and we need to postpone registering table-wide
+         * replica listeners until the zone replica is started.
+         *
+         * <p>During normal operations this future will be complete and 
table-wide listener registration will happen immediately.
+         */
+        private final CompletableFuture<ZonePartitionReplicaListener> 
replicaListenerFuture = new CompletableFuture<>();
+
+        ZonePartitionResources(
+                TxStatePartitionStorage txStatePartitionStorage,
+                ZonePartitionRaftListener raftListener,
+                PartitionSnapshotStorageFactory snapshotStorageFactory
+        ) {
+            this.txStatePartitionStorage = txStatePartitionStorage;
+            this.raftListener = raftListener;
+            this.snapshotStorageFactory = snapshotStorageFactory;
+        }
+
+        TxStatePartitionStorage txStatePartitionStorage() {
+            return txStatePartitionStorage;
+        }
+
+        ZonePartitionRaftListener raftListener() {
+            return raftListener;
+        }
+
+        PartitionSnapshotStorageFactory snapshotStorageFactory() {
+            return snapshotStorageFactory;
+        }
+
+        CompletableFuture<ZonePartitionReplicaListener> 
replicaListenerFuture() {
+            return replicaListenerFuture;
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 5378aff6b8e..9d782b3f7e1 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -27,6 +27,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -56,7 +57,9 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import 
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
@@ -72,6 +75,7 @@ import 
org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
@@ -79,7 +83,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Answers;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -116,7 +119,7 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
     @BeforeEach
     void setUp(
             TestInfo testInfo,
-            @Mock(answer = Answers.RETURNS_DEEP_STUBS) ClusterService 
clusterService,
+            @Mock(answer = RETURNS_DEEP_STUBS) ClusterService clusterService,
             @Mock DistributionZoneManager distributionZoneManager,
             @Mock LowWatermark lowWatermark,
             @Mock ClockService clockService,
@@ -128,7 +131,9 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
             @Mock FailureManager failureManager,
             @Mock TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory,
             @Mock LogStorageFactoryCreator logStorageFactoryCreator,
-            @Mock OutgoingSnapshotsManager outgoingSnapshotsManager
+            @Mock PartitionSnapshotStorageFactory 
partitionSnapshotStorageFactory,
+            @Mock TxStatePartitionStorage txStatePartitionStorage,
+            @Mock ZonePartitionRaftListener raftGroupListener
     ) {
         String nodeName = testNodeName(testInfo, 0);
 
@@ -141,6 +146,13 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
 
         when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(completedFuture(Set.of(nodeName)));
 
+        when(zoneResourcesManager.allocateZonePartitionResources(any(), 
anyInt()))
+                .thenReturn(new ZonePartitionResources(
+                        txStatePartitionStorage,
+                        raftGroupListener,
+                        partitionSnapshotStorageFactory
+                ));
+
         metaStorageManager = StandaloneMetaStorageManager.create();
 
         catalogManager = new CatalogManagerImpl(new 
UpdateLogImpl(metaStorageManager), clockService);
@@ -180,7 +192,6 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 systemDistributedConfiguration,
                 txManager,
                 schemaManager,
-                outgoingSnapshotsManager,
                 zoneResourcesManager
         );
 
@@ -226,6 +237,6 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
         InOrder inOrder = inOrder(raftManager, zoneResourcesManager);
 
         inOrder.verify(raftManager, 
timeout(1_000)).stopRaftNodes(zonePartitionId);
-        inOrder.verify(zoneResourcesManager, 
timeout(1_000)).destroyZonePartitionResources(zoneId, 0);
+        inOrder.verify(zoneResourcesManager, 
timeout(1_000)).destroyZonePartitionResources(zonePartitionId);
     }
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
index 9b0842393b0..703fa8dd1a1 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
@@ -25,19 +25,21 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
-import java.nio.file.Path;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.internal.tx.TxManager;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -47,30 +49,32 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
-@ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ExecutorServiceExtension.class)
-class ZoneResourcesManagerTest extends BaseIgniteAbstractTest {
+class ZoneResourcesManagerTest extends IgniteAbstractTest {
     private TxStateRocksDbSharedStorage sharedStorage;
 
-    @Mock
-    private LogSyncer logSyncer;
-
     private ZoneResourcesManager manager;
 
-    @WorkDirectory
-    private Path workDir;
-
-    @InjectExecutorService
-    private ScheduledExecutorService scheduler;
-
-    @InjectExecutorService
-    private ExecutorService executor;
-
     @BeforeEach
-    void init() {
+    void init(
+            @Mock LogSyncer logSyncer,
+            @Mock TxManager txManager,
+            @Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
+            @Mock TopologyService topologyService,
+            @Mock CatalogService catalogService,
+            @InjectExecutorService ScheduledExecutorService scheduler,
+            @InjectExecutorService ExecutorService executor
+    ) {
         sharedStorage = new TxStateRocksDbSharedStorage(workDir, scheduler, 
executor, logSyncer, () -> 0);
 
-        manager = new ZoneResourcesManager(sharedStorage);
+        manager = new ZoneResourcesManager(
+                sharedStorage,
+                txManager,
+                outgoingSnapshotsManager,
+                topologyService,
+                catalogService,
+                executor
+        );
 
         assertThat(sharedStorage.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
     }
@@ -83,17 +87,20 @@ class ZoneResourcesManagerTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void createsTxStatePartitionStorage() {
-        TxStatePartitionStorage txStatePartitionStorage = 
getOrCreatePartitionTxStateStorage(1, 10, 1);
+    void allocatesResources() {
+        ZonePartitionResources resources = allocatePartitionResources(new 
ZonePartitionId(1, 1), 10);
 
-        assertThat(txStatePartitionStorage, is(notNullValue()));
+        assertThat(resources.txStatePartitionStorage(), is(notNullValue()));
+        assertThat(resources.raftListener(), is(notNullValue()));
+        assertThat(resources.snapshotStorageFactory(), is(notNullValue()));
+        assertThat(resources.replicaListenerFuture().isDone(), is(false));
     }
 
     @Test
     void closesResourcesOnShutdown() {
-        TxStatePartitionStorage zone1storage1 = 
getOrCreatePartitionTxStateStorage(1, 10, 1);
-        TxStatePartitionStorage zone1storage5 = 
getOrCreatePartitionTxStateStorage(1, 10, 5);
-        TxStatePartitionStorage zone2storage3 = 
getOrCreatePartitionTxStateStorage(2, 10, 3);
+        ZonePartitionResources zone1storage1 = allocatePartitionResources(new 
ZonePartitionId(1, 1), 10);
+        ZonePartitionResources zone1storage5 = allocatePartitionResources(new 
ZonePartitionId(1, 5), 10);
+        ZonePartitionResources zone2storage3 = allocatePartitionResources(new 
ZonePartitionId(2, 3), 10);
 
         manager.close();
 
@@ -106,28 +113,28 @@ class ZoneResourcesManagerTest extends 
BaseIgniteAbstractTest {
     void removesTxStatePartitionStorageOnDestroy() {
         int zoneId = 1;
 
-        getOrCreatePartitionTxStateStorage(zoneId, 10, 1);
-        getOrCreatePartitionTxStateStorage(zoneId, 10, 2);
+        allocatePartitionResources(new ZonePartitionId(zoneId, 1), 10);
+        allocatePartitionResources(new ZonePartitionId(zoneId, 2), 10);
 
         assertThat(manager.txStatePartitionStorage(zoneId, 1), 
is(notNullValue()));
         assertThat(manager.txStatePartitionStorage(zoneId, 2), 
is(notNullValue()));
 
-        bypassingThreadAssertions(() -> 
manager.destroyZonePartitionResources(zoneId, 1));
+        bypassingThreadAssertions(() -> 
manager.destroyZonePartitionResources(new ZonePartitionId(zoneId, 1)));
 
         assertThat(manager.txStatePartitionStorage(zoneId, 1), 
is(nullValue()));
         assertThat(manager.txStatePartitionStorage(zoneId, 2), 
is(notNullValue()));
     }
 
     @SuppressWarnings("ThrowableNotThrown")
-    private static void assertThatStorageIsStopped(TxStatePartitionStorage 
storage) {
+    private static void assertThatStorageIsStopped(ZonePartitionResources 
resources) {
         assertThrows(
                 IgniteInternalException.class,
-                () -> bypassingThreadAssertions(() -> 
storage.get(UUID.randomUUID())),
+                () -> bypassingThreadAssertions(() -> 
resources.txStatePartitionStorage().get(UUID.randomUUID())),
                 "Transaction state storage is stopped"
         );
     }
 
-    private TxStatePartitionStorage getOrCreatePartitionTxStateStorage(int 
zoneId, int partitionCount, int partitionId) {
-        return bypassingThreadAssertions(() -> 
manager.getOrCreatePartitionTxStateStorage(zoneId, partitionCount, 
partitionId));
+    private ZonePartitionResources allocatePartitionResources(ZonePartitionId 
zonePartitionId, int partitionCount) {
+        return bypassingThreadAssertions(() -> 
manager.allocateZonePartitionResources(zonePartitionId, partitionCount));
     }
 }


Reply via email to