This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9c25c2d088d IGNITE-25225 Extend PartitionReplicaLifecycleManager API
(#5683)
9c25c2d088d is described below
commit 9c25c2d088dbe167857b653e0eaaa8f1eb846499
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 24 11:01:23 2025 +0300
IGNITE-25225 Extend PartitionReplicaLifecycleManager API (#5683)
---
.../PartitionReplicaLifecycleManager.java | 22 +++++++------
.../partition/replicator/ZoneResourcesManager.java | 36 +++++++++++++++-------
.../PartitionReplicaLifecycleManagerTest.java | 4 ++-
3 files changed, 40 insertions(+), 22 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 44ef968e343..1858cb621c4 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.partition.replicator;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptySet;
-import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
@@ -248,8 +247,8 @@ public class PartitionReplicaLifecycleManager extends
/**
* This future completes on {@link #beforeNodeStop()} with {@link
NodeStoppingException} before the {@link #busyLock} is blocked.
- * TODO: https://issues.apache.org/jira/browse/IGNITE-17592
- **/
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17592
private final CompletableFuture<Void> stopReplicaLifecycleFuture = new
CompletableFuture<>();
/**
@@ -1584,9 +1583,7 @@ public class PartitionReplicaLifecycleManager extends
PartitionMvStorageAccess partitionMvStorageAccess,
boolean onNodeRecovery
) {
- ZonePartitionResources resources =
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
-
- requireNonNull(resources, "Zone partition resources not found
[zonePartitionId=" + zonePartitionId + ']');
+ ZonePartitionResources resources =
zonePartitionResources(zonePartitionId);
// Register an intent to register a table-wide replica listener. On
recovery this method is called before the replica is started,
// so the listeners will be registered by the thread completing the
"replicaListenerFuture". On normal operation (where there is
@@ -1720,12 +1717,17 @@ public class PartitionReplicaLifecycleManager extends
@TestOnly
public HybridTimestamp currentSafeTimeForZonePartition(int zoneId, int
partId) {
- return
requireNonNull(zoneResourcesManager.getZonePartitionResources(new
ZonePartitionId(zoneId, partId))).raftListener()
- .currentSafeTime();
+ return zonePartitionResources(new ZonePartitionId(zoneId,
partId)).raftListener().currentSafeTime();
}
- @TestOnly
+ /**
+ * Returns resources for the given zone partition.
+ */
public ZonePartitionResources zonePartitionResources(ZonePartitionId
zonePartitionId) {
- return
requireNonNull(zoneResourcesManager.getZonePartitionResources(zonePartitionId));
+ ZonePartitionResources resources =
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+
+ assert resources != null : String.format("Missing resources for zone
partition [zonePartitionId=%s]", zonePartitionId);
+
+ return resources;
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 8d82e3d3178..71bf0888a67 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -47,12 +47,11 @@ import
org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.internal.worker.ThreadAssertions;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
-import org.jetbrains.annotations.VisibleForTesting;
/**
* Manages resources of distribution zones; that is, allows creation of
underlying storages and closes them on node stop.
*/
-class ZoneResourcesManager implements ManuallyCloseable {
+public class ZoneResourcesManager implements ManuallyCloseable {
private final TxStateRocksDbSharedStorage sharedTxStateStorage;
private final TxManager txManager;
@@ -125,7 +124,12 @@ class ZoneResourcesManager implements ManuallyCloseable {
partitionOperationsExecutor
);
- var zonePartitionResources = new
ZonePartitionResources(txStatePartitionStorage, raftGroupListener,
snapshotStorage);
+ var zonePartitionResources = new ZonePartitionResources(
+ txStatePartitionStorage,
+ raftGroupListener,
+ snapshotStorage,
+ storageIndexTracker
+ );
zoneResources.resourcesByPartitionId.put(zonePartitionId.partitionId(),
zonePartitionResources);
@@ -139,9 +143,7 @@ class ZoneResourcesManager implements ManuallyCloseable {
return null;
}
- ZonePartitionResources zonePartitionResources =
zoneResources.resourcesByPartitionId.get(zonePartitionId.partitionId());
-
- return zonePartitionResources;
+ return
zoneResources.resourcesByPartitionId.get(zonePartitionId.partitionId());
}
private TxStateStorage createTxStateStorage(int zoneId, int
partitionCount) {
@@ -220,12 +222,18 @@ class ZoneResourcesManager implements ManuallyCloseable {
}
}
- @VisibleForTesting
+ /**
+ * Zone partition resources.
+ */
public static class ZonePartitionResources {
private final TxStatePartitionStorage txStatePartitionStorage;
+
private final ZonePartitionRaftListener raftListener;
+
private final PartitionSnapshotStorage snapshotStorage;
+ private final PendingComparableValuesTracker<Long, Void>
storageIndexTracker;
+
/**
* Future that completes when the zone-wide replica listener is
created.
*
@@ -239,25 +247,31 @@ class ZoneResourcesManager implements ManuallyCloseable {
ZonePartitionResources(
TxStatePartitionStorage txStatePartitionStorage,
ZonePartitionRaftListener raftListener,
- PartitionSnapshotStorage snapshotStorage
+ PartitionSnapshotStorage snapshotStorage,
+ PendingComparableValuesTracker<Long, Void> storageIndexTracker
) {
this.txStatePartitionStorage = txStatePartitionStorage;
this.raftListener = raftListener;
this.snapshotStorage = snapshotStorage;
+ this.storageIndexTracker = storageIndexTracker;
}
- TxStatePartitionStorage txStatePartitionStorage() {
+ public TxStatePartitionStorage txStatePartitionStorage() {
return txStatePartitionStorage;
}
- ZonePartitionRaftListener raftListener() {
+ public ZonePartitionRaftListener raftListener() {
return raftListener;
}
- PartitionSnapshotStorage snapshotStorage() {
+ public PartitionSnapshotStorage snapshotStorage() {
return snapshotStorage;
}
+ public PendingComparableValuesTracker<Long, Void>
storageIndexTracker() {
+ return storageIndexTracker;
+ }
+
public CompletableFuture<ZonePartitionReplicaListener>
replicaListenerFuture() {
return replicaListenerFuture;
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index b1fa0779560..524b16a9909 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -81,6 +81,7 @@ import
org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -158,7 +159,8 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
.thenReturn(new ZonePartitionResources(
txStatePartitionStorage,
raftGroupListener,
- partitionSnapshotStorage
+ partitionSnapshotStorage,
+ new PendingComparableValuesTracker<>(0L)
));
when(raftManager.startRaftGroupNode(any(), any(), any(), any(),
any(RaftGroupOptions.class), any()))