This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6dc84b0ae9 IGNITE-23664 Simplify working with locks in
PartitionReplicaLifecycleManager (#4712)
6dc84b0ae9 is described below
commit 6dc84b0ae9cad2f076f916d23fa52bb6efc5c035
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Nov 13 12:29:09 2024 +0200
IGNITE-23664 Simplify working with locks in
PartitionReplicaLifecycleManager (#4712)
---
.../PartitionReplicaLifecycleManager.java | 128 ++++++++-------------
1 file changed, 46 insertions(+), 82 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 2836a9a508..f67285246c 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -71,8 +71,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -140,7 +138,7 @@ import org.jetbrains.annotations.Nullable;
* - Stop the same nodes on the zone removing.
* - Support the rebalance mechanism and start the new replication nodes when
the rebalance triggers occurred.
*/
-public class PartitionReplicaLifecycleManager extends
+public class PartitionReplicaLifecycleManager extends
AbstractEventProducer<LocalPartitionReplicaEvent,
LocalPartitionReplicaEventParameters> implements IgniteComponent {
public static final String FEATURE_FLAG_NAME =
"IGNITE_ZONE_BASED_REPLICATION";
/* Feature flag for zone based collocation track */
@@ -214,7 +212,7 @@ public class PartitionReplicaLifecycleManager extends
* @param topologyService Topology service.
* @param rebalanceScheduler Executor for scheduling rebalance routine.
* @param partitionOperationsExecutor Striped executor on which partition
operations (potentially requiring I/O with storages)
- * will be executed.
+ * will be executed.
* @param clockService Clock service.
* @param placementDriver Placement driver.
*/
@@ -462,8 +460,6 @@ public class PartitionReplicaLifecycleManager extends
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
try {
- AtomicReference<Long> stamp = new AtomicReference<>(null);
-
return replicaMgr.startReplica(
replicaGrpId,
(raftClient) -> new
ZonePartitionReplicaListener(
@@ -473,31 +469,15 @@ public class PartitionReplicaLifecycleManager extends
raftGroupListener,
raftGroupEventsListener,
busyLock
- ).thenCompose(replica -> {
- zonePartitionsLocks.compute(zoneId, (id, lock) -> {
- if (lock == null) {
- lock = new StampedLock();
- }
-
- stamp.set(lock.writeLock());
-
- return lock;
- });
-
+ ).thenCompose(replica ->
executeUnderZoneWriteLock(zoneId, () -> {
replicationGroupIds.add(replicaGrpId);
- return fireEvent(
-
LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
- new LocalPartitionReplicaEventParameters(
- new
ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId())
- )
+ var eventParams = new
LocalPartitionReplicaEventParameters(
+ new ZonePartitionId(replicaGrpId.zoneId(),
replicaGrpId.partitionId())
);
- })
- .whenComplete((unused, throwable) -> {
- if (stamp.get() != null) {
-
zonePartitionsLocks.get(zoneId).unlockWrite(stamp.get());
- }
- })
+
+ return
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED, eventParams);
+ }))
.thenApply(unused -> false);
} catch (NodeStoppingException e) {
return failedFuture(e);
@@ -571,7 +551,7 @@ public class PartitionReplicaLifecycleManager extends
* Writes the set of assignments to meta storage. If there are some
assignments already, gets them from meta storage. Returns
* the list of assignments that really are in meta storage.
*
- * @param zoneId Zone id.
+ * @param zoneId Zone id.
* @param assignmentsFuture Assignments future, to get the assignments
that should be written.
* @return Real list of assignments.
*/
@@ -930,7 +910,7 @@ public class PartitionReplicaLifecycleManager extends
if (shouldStopLocalServices) {
return clientUpdateFuture.thenCompose(v ->
stopAndDestroyPartition(zonePartitionId))
- .thenAccept(v -> { });
+ .thenAccept(v -> {});
} else {
return clientUpdateFuture;
}
@@ -1201,7 +1181,7 @@ public class PartitionReplicaLifecycleManager extends
Assignments pendingAssignments,
long revision
) {
- Entry reduceEntry =
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId),
revision);
+ Entry reduceEntry =
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId),
revision);
Assignments reduceAssignments = reduceEntry != null
? Assignments.fromBytes(reduceEntry.value())
@@ -1250,7 +1230,7 @@ public class PartitionReplicaLifecycleManager extends
return replicaMgr.weakStopReplica(
zonePartitionId,
WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
- () -> stopPartition(zonePartitionId).thenAccept(v -> { })
+ () -> stopPartition(zonePartitionId).thenAccept(v -> {})
);
}
@@ -1261,43 +1241,24 @@ public class PartitionReplicaLifecycleManager extends
* @return Future that will be completed after all resources have been
closed.
*/
private CompletableFuture<?> stopPartition(ZonePartitionId
zonePartitionId) {
- CompletableFuture<?> stopReplicaFuture;
-
- AtomicReference<Long> stamp = new AtomicReference<>(null);
-
- try {
- zonePartitionsLocks.compute(zonePartitionId.zoneId(), (id, lock)
-> {
- if (lock == null) {
- lock = new StampedLock();
- }
-
- stamp.set(lock.writeLock());
-
- return lock;
- });
-
- stopReplicaFuture = replicaMgr.stopReplica(zonePartitionId)
- .thenCompose((replicaWasStopped) -> {
- if (replicaWasStopped) {
- replicationGroupIds.remove(zonePartitionId);
-
- return
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, new
LocalPartitionReplicaEventParameters(
- zonePartitionId));
- } else {
- return nullCompletedFuture();
- }
- }).whenComplete((result, th) -> {
-
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
- });
-
- } catch (NodeStoppingException e) {
- // No-op.
- stopReplicaFuture = falseCompletedFuture();
-
-
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
- }
-
- return stopReplicaFuture;
+ return executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
+ try {
+ return replicaMgr.stopReplica(zonePartitionId)
+ .thenCompose((replicaWasStopped) -> {
+ if (replicaWasStopped) {
+ replicationGroupIds.remove(zonePartitionId);
+
+ return
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, new
LocalPartitionReplicaEventParameters(
+ zonePartitionId));
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+ } catch (NodeStoppingException e) {
+ // No-op.
+ return nullCompletedFuture();
+ }
+ });
}
/**
@@ -1342,19 +1303,7 @@ public class PartitionReplicaLifecycleManager extends
* @return Stamp, which must be used for further unlock.
*/
public long lockZoneForRead(int zoneId) {
- AtomicLong stamp = new AtomicLong();
-
- zonePartitionsLocks.compute(zoneId, (id, l) -> {
- if (l == null) {
- l = new StampedLock();
- }
-
- stamp.set(l.readLock());
-
- return l;
- });
-
- return stamp.get();
+ return zonePartitionsLocks.computeIfAbsent(zoneId, id -> new
StampedLock()).readLock();
}
/**
@@ -1384,4 +1333,19 @@ public class PartitionReplicaLifecycleManager extends
((ZonePartitionReplicaListener)
replicaFut.join().listener()).addTableReplicaListener(tablePartitionId,
createListener);
}
+
+ private CompletableFuture<Void> executeUnderZoneWriteLock(int zoneId,
Supplier<CompletableFuture<Void>> action) {
+ StampedLock lock = zonePartitionsLocks.computeIfAbsent(zoneId, id ->
new StampedLock());
+
+ long stamp = lock.writeLock();
+
+ try {
+ return action.get()
+ .whenComplete((v, e) -> lock.unlockWrite(stamp));
+ } catch (Throwable e) {
+ lock.unlockWrite(stamp);
+
+ return failedFuture(e);
+ }
+ }
}