This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0fa545f8e4d IGNITE-24521 Move listeners map into ZoneResourcesManager
(#5232)
0fa545f8e4d is described below
commit 0fa545f8e4d7e611aa9eb642fd0ef8dc4301d6ae
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Feb 17 10:17:27 2025 +0200
IGNITE-24521 Move listeners map into ZoneResourcesManager (#5232)
---
.../PartitionReplicaLifecycleManager.java | 99 +++---------
.../partition/replicator/ZoneResourcesManager.java | 176 +++++++++++++++++----
.../PartitionReplicaLifecycleManagerTest.java | 23 ++-
.../replicator/ZoneResourcesManagerTest.java | 75 +++++----
4 files changed, 223 insertions(+), 150 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 339b993a0ce..c751a220071 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -67,7 +67,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -112,12 +111,9 @@ import
org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
-import
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
@@ -144,8 +140,6 @@ import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -226,39 +220,12 @@ public class PartitionReplicaLifecycleManager extends
private final SchemaManager schemaManager;
- private final OutgoingSnapshotsManager outgoingSnapshotsManager;
-
/** A predicate that checks that the given assignment is corresponded to
the local node. */
private final Predicate<Assignment> isLocalNodeAssignment = assignment ->
assignment.consistentId().equals(localNode().name());
/** Configuration of rebalance retries delay. */
private final SystemDistributedConfigurationPropertyHolder<Integer>
rebalanceRetryDelayConfiguration;
- // TODO: move this map into ZoneResourcesManager, see
https://issues.apache.org/jira/browse/IGNITE-24521
- private final ConcurrentMap<ZonePartitionId, Listeners>
listenersByZonePartitionId = new ConcurrentHashMap<>();
-
- /** Holder class for Replica and Raft listeners. */
- private static class Listeners {
- /**
- * Future that completes when the zone-wide replica listener is
created.
- *
- * <p>This is needed, because on recovery tables are started before
zone replicas and we need to postpone registering table-wide
- * replica listeners until the zone replica is started.
- *
- * <p>During normal operations this future will be complete and
table-wide listener registration will happen immediately.
- */
- final CompletableFuture<ZonePartitionReplicaListener>
replicaListenerFuture = new CompletableFuture<>();
-
- final ZonePartitionRaftListener raftListener;
-
- final PartitionSnapshotStorageFactory snapshotStorageFactory;
-
- Listeners(ZonePartitionRaftListener raftListener,
PartitionSnapshotStorageFactory snapshotStorageFactory) {
- this.raftListener = raftListener;
- this.snapshotStorageFactory = snapshotStorageFactory;
- }
- }
-
private final ZoneResourcesManager zoneResourcesManager;
/**
@@ -315,8 +282,14 @@ public class PartitionReplicaLifecycleManager extends
systemDistributedConfiguration,
txManager,
schemaManager,
- outgoingSnapshotsManager,
- new ZoneResourcesManager(sharedTxStateStorage)
+ new ZoneResourcesManager(
+ sharedTxStateStorage,
+ txManager,
+ outgoingSnapshotsManager,
+ topologyService,
+ catalogService,
+ partitionOperationsExecutor
+ )
);
}
@@ -337,7 +310,6 @@ public class PartitionReplicaLifecycleManager extends
SystemDistributedConfiguration systemDistributedConfiguration,
TxManager txManager,
SchemaManager schemaManager,
- OutgoingSnapshotsManager outgoingSnapshotsManager,
ZoneResourcesManager zoneResourcesManager
) {
this.catalogService = catalogService;
@@ -354,7 +326,6 @@ public class PartitionReplicaLifecycleManager extends
this.placementDriver = placementDriver;
this.txManager = txManager;
this.schemaManager = schemaManager;
- this.outgoingSnapshotsManager = outgoingSnapshotsManager;
this.zoneResourcesManager = zoneResourcesManager;
rebalanceRetryDelayConfiguration = new
SystemDistributedConfigurationPropertyHolder<>(
@@ -590,39 +561,10 @@ public class PartitionReplicaLifecycleManager extends
rebalanceRetryDelayConfiguration
);
- var safeTimeTracker = new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
- var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
-
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
var eventParams = new
LocalPartitionReplicaEventParameters(zonePartitionId, revision);
- TxStatePartitionStorage txStatePartitionStorage =
zoneResourcesManager.getOrCreatePartitionTxStateStorage(
- zonePartitionId.zoneId(),
- partitionCount,
- zonePartitionId.partitionId()
- );
-
- var raftGroupListener = new ZonePartitionRaftListener(
- zonePartitionId,
- txStatePartitionStorage,
- txManager,
- safeTimeTracker,
- storageIndexTracker,
- outgoingSnapshotsManager
- );
-
- var snapshotStorageFactory = new PartitionSnapshotStorageFactory(
- new ZonePartitionKey(zonePartitionId.zoneId(),
zonePartitionId.partitionId()),
- topologyService,
- outgoingSnapshotsManager,
- new PartitionTxStateAccessImpl(txStatePartitionStorage),
- catalogService,
- partitionOperationsExecutor
- );
-
- var listeners = new Listeners(raftGroupListener,
snapshotStorageFactory);
-
- listenersByZonePartitionId.put(zonePartitionId, listeners);
+ ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(zonePartitionId,
partitionCount);
return
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
.thenCompose(v -> {
@@ -631,7 +573,7 @@ public class PartitionReplicaLifecycleManager extends
zonePartitionId,
raftClient -> {
var replicaListener = new
ZonePartitionReplicaListener(
- txStatePartitionStorage,
+
zoneResources.txStatePartitionStorage(),
clockService,
txManager,
new
CatalogValidationSchemasSource(catalogService, schemaManager),
@@ -641,13 +583,13 @@ public class PartitionReplicaLifecycleManager extends
zonePartitionId
);
-
listeners.replicaListenerFuture.complete(replicaListener);
+
zoneResources.replicaListenerFuture().complete(replicaListener);
return replicaListener;
},
- snapshotStorageFactory,
+ zoneResources.snapshotStorageFactory(),
stablePeersAndLearners,
- raftGroupListener,
+ zoneResources.raftListener(),
raftGroupEventsListener,
// TODO: IGNITE-24371 - pass real
isVolatile flag
false,
@@ -664,7 +606,7 @@ public class PartitionReplicaLifecycleManager extends
}))
.whenComplete((v, e) -> {
if (e != null) {
- listenersByZonePartitionId.remove(zonePartitionId);
+
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
}
});
};
@@ -1385,7 +1327,7 @@ public class PartitionReplicaLifecycleManager extends
return nullCompletedFuture();
}
-
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId.zoneId(),
zonePartitionId.partitionId());
+
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
return fireEvent(
LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED,
@@ -1408,7 +1350,6 @@ public class PartitionReplicaLifecycleManager extends
return replicaMgr.stopReplica(zonePartitionId)
.thenApply((replicaWasStopped) -> {
if (replicaWasStopped) {
-
listenersByZonePartitionId.remove(zonePartitionId);
replicationGroupIds.remove(zonePartitionId);
}
@@ -1473,20 +1414,20 @@ public class PartitionReplicaLifecycleManager extends
RaftTableProcessor raftTableProcessor,
PartitionMvStorageAccess partitionMvStorageAccess
) {
- Listeners listeners = listenersByZonePartitionId.get(zonePartitionId);
+ ZonePartitionResources resources =
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
// Register an intent to register a table-wide replica listener. On
recovery this method is called before the replica is started,
// so the listeners will be registered by the thread completing the
"replicaListenerFuture". On normal operation (where there is
// a HB relationship between zone and table creation) zone-wide
replica must already be started, this future will always be
// completed and the listeners will be registered immediately.
- listeners.replicaListenerFuture.thenAccept(zoneReplicaListener ->
zoneReplicaListener.addTableReplicaListener(
+ resources.replicaListenerFuture().thenAccept(zoneReplicaListener ->
zoneReplicaListener.addTableReplicaListener(
tablePartitionId,
tablePartitionReplicaListenerFactory
));
- listeners.raftListener.addTableProcessor(tablePartitionId,
raftTableProcessor);
+ resources.raftListener().addTableProcessor(tablePartitionId,
raftTableProcessor);
-
listeners.snapshotStorageFactory.addMvPartition(tablePartitionId.tableId(),
partitionMvStorageAccess);
+
resources.snapshotStorageFactory().addMvPartition(tablePartitionId.tableId(),
partitionMvStorageAccess);
}
private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId,
Supplier<CompletableFuture<T>> action) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 90119e04ddf..64f29aac5ad 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -19,15 +19,30 @@ package org.apache.ignite.internal.partition.replicator;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
import
org.apache.ignite.internal.tx.storage.state.ThreadAssertingTxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.internal.worker.ThreadAssertions;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -38,35 +53,85 @@ import org.jetbrains.annotations.TestOnly;
class ZoneResourcesManager implements ManuallyCloseable {
private final TxStateRocksDbSharedStorage sharedTxStateStorage;
+ private final TxManager txManager;
+
+ private final OutgoingSnapshotsManager outgoingSnapshotsManager;
+
+ private final TopologyService topologyService;
+
+ private final CatalogService catalogService;
+
+ private final Executor partitionOperationsExecutor;
+
/** Map from zone IDs to their resource holders. */
private final Map<Integer, ZoneResources> resourcesByZoneId = new
ConcurrentHashMap<>();
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- ZoneResourcesManager(TxStateRocksDbSharedStorage sharedTxStateStorage) {
+ ZoneResourcesManager(
+ TxStateRocksDbSharedStorage sharedTxStateStorage,
+ TxManager txManager,
+ OutgoingSnapshotsManager outgoingSnapshotsManager,
+ TopologyService topologyService,
+ CatalogService catalogService,
+ Executor partitionOperationsExecutor
+ ) {
this.sharedTxStateStorage = sharedTxStateStorage;
+ this.txManager = txManager;
+ this.outgoingSnapshotsManager = outgoingSnapshotsManager;
+ this.topologyService = topologyService;
+ this.catalogService = catalogService;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
}
- /**
- * Gets or creates a transaction state storage for a zone partition.
- *
- * @param zoneId ID of the zone.
- * @param partitionCount Number of partitions in the zone.
- * @param partitionId Partition ID.
- */
- TxStatePartitionStorage getOrCreatePartitionTxStateStorage(int zoneId, int
partitionCount, int partitionId) {
- return inBusyLock(busyLock, () -> {
- ZoneResources zoneResources = resourcesByZoneId.computeIfAbsent(
- zoneId,
- id -> createZoneResources(id, partitionCount)
- );
-
- return
zoneResources.txStateStorage.getOrCreatePartitionStorage(partitionId);
- });
+ ZonePartitionResources allocateZonePartitionResources(ZonePartitionId
zonePartitionId, int partitionCount) {
+ ZoneResources zoneResources = resourcesByZoneId.computeIfAbsent(
+ zonePartitionId.zoneId(),
+ zoneId -> new ZoneResources(createTxStateStorage(zoneId,
partitionCount))
+ );
+
+ TxStatePartitionStorage txStatePartitionStorage =
zoneResources.txStateStorage
+ .getOrCreatePartitionStorage(zonePartitionId.partitionId());
+
+ var safeTimeTracker = new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
+
+ var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
+
+ var raftGroupListener = new ZonePartitionRaftListener(
+ zonePartitionId,
+ txStatePartitionStorage,
+ txManager,
+ safeTimeTracker,
+ storageIndexTracker,
+ outgoingSnapshotsManager
+ );
+
+ var snapshotStorageFactory = new PartitionSnapshotStorageFactory(
+ new ZonePartitionKey(zonePartitionId.zoneId(),
zonePartitionId.partitionId()),
+ topologyService,
+ outgoingSnapshotsManager,
+ new PartitionTxStateAccessImpl(txStatePartitionStorage),
+ catalogService,
+ partitionOperationsExecutor
+ );
+
+ var zonePartitionResources = new
ZonePartitionResources(txStatePartitionStorage, raftGroupListener,
snapshotStorageFactory);
+
+
zoneResources.resourcesByPartitionId.put(zonePartitionId.partitionId(),
zonePartitionResources);
+
+ return zonePartitionResources;
}
- private ZoneResources createZoneResources(int zoneId, int partitionCount) {
- return new ZoneResources(createTxStateStorage(zoneId, partitionCount));
+ ZonePartitionResources getZonePartitionResources(ZonePartitionId
zonePartitionId) {
+ ZoneResources zoneResources =
resourcesByZoneId.get(zonePartitionId.zoneId());
+
+ assert zoneResources != null : "Missing resources for zone " +
zonePartitionId.zoneId();
+
+ ZonePartitionResources zonePartitionResources =
zoneResources.resourcesByPartitionId.get(zonePartitionId.partitionId());
+
+ assert zonePartitionResources != null : "Missing resources for
partition " + zonePartitionId;
+
+ return zonePartitionResources;
}
private TxStateStorage createTxStateStorage(int zoneId, int
partitionCount) {
@@ -87,30 +152,27 @@ class ZoneResourcesManager implements ManuallyCloseable {
for (ZoneResources zoneResources : resourcesByZoneId.values()) {
zoneResources.txStateStorage.close();
+ zoneResources.resourcesByPartitionId.clear();
}
+
+ resourcesByZoneId.clear();
}
- void destroyZonePartitionResources(int zoneId, int partitionId) {
+ void destroyZonePartitionResources(ZonePartitionId zonePartitionId) {
inBusyLock(busyLock, () -> {
- ZoneResources resources = resourcesByZoneId.get(zoneId);
+ ZoneResources resources =
resourcesByZoneId.get(zonePartitionId.zoneId());
if (resources != null) {
- resources.txStateStorage.destroyTxStateStorage(partitionId);
+
resources.resourcesByPartitionId.remove(zonePartitionId.partitionId());
+
+
resources.txStateStorage.destroyTxStateStorage(zonePartitionId.partitionId());
}
});
}
- private static class ZoneResources {
- private final TxStateStorage txStateStorage;
-
- private ZoneResources(TxStateStorage txStateStorage) {
- this.txStateStorage = txStateStorage;
- }
- }
-
@TestOnly
@Nullable
- public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int
partitionId) {
+ TxStatePartitionStorage txStatePartitionStorage(int zoneId, int
partitionId) {
ZoneResources resources = resourcesByZoneId.get(zoneId);
if (resources == null) {
@@ -119,4 +181,56 @@ class ZoneResourcesManager implements ManuallyCloseable {
return resources.txStateStorage.getPartitionStorage(partitionId);
}
+
+ private static class ZoneResources {
+ final TxStateStorage txStateStorage;
+
+ final Map<Integer, ZonePartitionResources> resourcesByPartitionId =
new Int2ObjectOpenHashMap<>();
+
+ ZoneResources(TxStateStorage txStateStorage) {
+ this.txStateStorage = txStateStorage;
+ }
+ }
+
+ static class ZonePartitionResources {
+ private final TxStatePartitionStorage txStatePartitionStorage;
+ private final ZonePartitionRaftListener raftListener;
+ private final PartitionSnapshotStorageFactory snapshotStorageFactory;
+
+ /**
+ * Future that completes when the zone-wide replica listener is
created.
+ *
+ * <p>This is needed, because on recovery tables are started before
zone replicas and we need to postpone registering table-wide
+ * replica listeners until the zone replica is started.
+ *
+ * <p>During normal operations this future will be complete and
table-wide listener registration will happen immediately.
+ */
+ private final CompletableFuture<ZonePartitionReplicaListener>
replicaListenerFuture = new CompletableFuture<>();
+
+ ZonePartitionResources(
+ TxStatePartitionStorage txStatePartitionStorage,
+ ZonePartitionRaftListener raftListener,
+ PartitionSnapshotStorageFactory snapshotStorageFactory
+ ) {
+ this.txStatePartitionStorage = txStatePartitionStorage;
+ this.raftListener = raftListener;
+ this.snapshotStorageFactory = snapshotStorageFactory;
+ }
+
+ TxStatePartitionStorage txStatePartitionStorage() {
+ return txStatePartitionStorage;
+ }
+
+ ZonePartitionRaftListener raftListener() {
+ return raftListener;
+ }
+
+ PartitionSnapshotStorageFactory snapshotStorageFactory() {
+ return snapshotStorageFactory;
+ }
+
+ CompletableFuture<ZonePartitionReplicaListener>
replicaListenerFuture() {
+ return replicaListenerFuture;
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 5378aff6b8e..9d782b3f7e1 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -56,7 +57,9 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.Loza;
@@ -72,6 +75,7 @@ import
org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
@@ -79,7 +83,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Answers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -116,7 +119,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
@BeforeEach
void setUp(
TestInfo testInfo,
- @Mock(answer = Answers.RETURNS_DEEP_STUBS) ClusterService
clusterService,
+ @Mock(answer = RETURNS_DEEP_STUBS) ClusterService clusterService,
@Mock DistributionZoneManager distributionZoneManager,
@Mock LowWatermark lowWatermark,
@Mock ClockService clockService,
@@ -128,7 +131,9 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
@Mock FailureManager failureManager,
@Mock TopologyAwareRaftGroupServiceFactory
topologyAwareRaftGroupServiceFactory,
@Mock LogStorageFactoryCreator logStorageFactoryCreator,
- @Mock OutgoingSnapshotsManager outgoingSnapshotsManager
+ @Mock PartitionSnapshotStorageFactory
partitionSnapshotStorageFactory,
+ @Mock TxStatePartitionStorage txStatePartitionStorage,
+ @Mock ZonePartitionRaftListener raftGroupListener
) {
String nodeName = testNodeName(testInfo, 0);
@@ -141,6 +146,13 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(completedFuture(Set.of(nodeName)));
+ when(zoneResourcesManager.allocateZonePartitionResources(any(),
anyInt()))
+ .thenReturn(new ZonePartitionResources(
+ txStatePartitionStorage,
+ raftGroupListener,
+ partitionSnapshotStorageFactory
+ ));
+
metaStorageManager = StandaloneMetaStorageManager.create();
catalogManager = new CatalogManagerImpl(new
UpdateLogImpl(metaStorageManager), clockService);
@@ -180,7 +192,6 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
systemDistributedConfiguration,
txManager,
schemaManager,
- outgoingSnapshotsManager,
zoneResourcesManager
);
@@ -226,6 +237,6 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
InOrder inOrder = inOrder(raftManager, zoneResourcesManager);
inOrder.verify(raftManager,
timeout(1_000)).stopRaftNodes(zonePartitionId);
- inOrder.verify(zoneResourcesManager,
timeout(1_000)).destroyZonePartitionResources(zoneId, 0);
+ inOrder.verify(zoneResourcesManager,
timeout(1_000)).destroyZonePartitionResources(zonePartitionId);
}
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
index 9b0842393b0..703fa8dd1a1 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
@@ -25,19 +25,21 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
-import java.nio.file.Path;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.InjectExecutorService;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.internal.tx.TxManager;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -47,30 +49,32 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
-@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ExecutorServiceExtension.class)
-class ZoneResourcesManagerTest extends BaseIgniteAbstractTest {
+class ZoneResourcesManagerTest extends IgniteAbstractTest {
private TxStateRocksDbSharedStorage sharedStorage;
- @Mock
- private LogSyncer logSyncer;
-
private ZoneResourcesManager manager;
- @WorkDirectory
- private Path workDir;
-
- @InjectExecutorService
- private ScheduledExecutorService scheduler;
-
- @InjectExecutorService
- private ExecutorService executor;
-
@BeforeEach
- void init() {
+ void init(
+ @Mock LogSyncer logSyncer,
+ @Mock TxManager txManager,
+ @Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
+ @Mock TopologyService topologyService,
+ @Mock CatalogService catalogService,
+ @InjectExecutorService ScheduledExecutorService scheduler,
+ @InjectExecutorService ExecutorService executor
+ ) {
sharedStorage = new TxStateRocksDbSharedStorage(workDir, scheduler,
executor, logSyncer, () -> 0);
- manager = new ZoneResourcesManager(sharedStorage);
+ manager = new ZoneResourcesManager(
+ sharedStorage,
+ txManager,
+ outgoingSnapshotsManager,
+ topologyService,
+ catalogService,
+ executor
+ );
assertThat(sharedStorage.startAsync(new ComponentContext()),
willCompleteSuccessfully());
}
@@ -83,17 +87,20 @@ class ZoneResourcesManagerTest extends
BaseIgniteAbstractTest {
}
@Test
- void createsTxStatePartitionStorage() {
- TxStatePartitionStorage txStatePartitionStorage =
getOrCreatePartitionTxStateStorage(1, 10, 1);
+ void allocatesResources() {
+ ZonePartitionResources resources = allocatePartitionResources(new
ZonePartitionId(1, 1), 10);
- assertThat(txStatePartitionStorage, is(notNullValue()));
+ assertThat(resources.txStatePartitionStorage(), is(notNullValue()));
+ assertThat(resources.raftListener(), is(notNullValue()));
+ assertThat(resources.snapshotStorageFactory(), is(notNullValue()));
+ assertThat(resources.replicaListenerFuture().isDone(), is(false));
}
@Test
void closesResourcesOnShutdown() {
- TxStatePartitionStorage zone1storage1 =
getOrCreatePartitionTxStateStorage(1, 10, 1);
- TxStatePartitionStorage zone1storage5 =
getOrCreatePartitionTxStateStorage(1, 10, 5);
- TxStatePartitionStorage zone2storage3 =
getOrCreatePartitionTxStateStorage(2, 10, 3);
+ ZonePartitionResources zone1storage1 = allocatePartitionResources(new
ZonePartitionId(1, 1), 10);
+ ZonePartitionResources zone1storage5 = allocatePartitionResources(new
ZonePartitionId(1, 5), 10);
+ ZonePartitionResources zone2storage3 = allocatePartitionResources(new
ZonePartitionId(2, 3), 10);
manager.close();
@@ -106,28 +113,28 @@ class ZoneResourcesManagerTest extends
BaseIgniteAbstractTest {
void removesTxStatePartitionStorageOnDestroy() {
int zoneId = 1;
- getOrCreatePartitionTxStateStorage(zoneId, 10, 1);
- getOrCreatePartitionTxStateStorage(zoneId, 10, 2);
+ allocatePartitionResources(new ZonePartitionId(zoneId, 1), 10);
+ allocatePartitionResources(new ZonePartitionId(zoneId, 2), 10);
assertThat(manager.txStatePartitionStorage(zoneId, 1),
is(notNullValue()));
assertThat(manager.txStatePartitionStorage(zoneId, 2),
is(notNullValue()));
- bypassingThreadAssertions(() ->
manager.destroyZonePartitionResources(zoneId, 1));
+ bypassingThreadAssertions(() ->
manager.destroyZonePartitionResources(new ZonePartitionId(zoneId, 1)));
assertThat(manager.txStatePartitionStorage(zoneId, 1),
is(nullValue()));
assertThat(manager.txStatePartitionStorage(zoneId, 2),
is(notNullValue()));
}
@SuppressWarnings("ThrowableNotThrown")
- private static void assertThatStorageIsStopped(TxStatePartitionStorage
storage) {
+ private static void assertThatStorageIsStopped(ZonePartitionResources
resources) {
assertThrows(
IgniteInternalException.class,
- () -> bypassingThreadAssertions(() ->
storage.get(UUID.randomUUID())),
+ () -> bypassingThreadAssertions(() ->
resources.txStatePartitionStorage().get(UUID.randomUUID())),
"Transaction state storage is stopped"
);
}
- private TxStatePartitionStorage getOrCreatePartitionTxStateStorage(int
zoneId, int partitionCount, int partitionId) {
- return bypassingThreadAssertions(() ->
manager.getOrCreatePartitionTxStateStorage(zoneId, partitionCount,
partitionId));
+ private ZonePartitionResources allocatePartitionResources(ZonePartitionId
zonePartitionId, int partitionCount) {
+ return bypassingThreadAssertions(() ->
manager.allocateZonePartitionResources(zonePartitionId, partitionCount));
}
}