This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6dc84b0ae9 IGNITE-23664 Simplify working with locks in 
PartitionReplicaLifecycleManager (#4712)
6dc84b0ae9 is described below

commit 6dc84b0ae9cad2f076f916d23fa52bb6efc5c035
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Nov 13 12:29:09 2024 +0200

    IGNITE-23664 Simplify working with locks in 
PartitionReplicaLifecycleManager (#4712)
---
 .../PartitionReplicaLifecycleManager.java          | 128 ++++++++-------------
 1 file changed, 46 insertions(+), 82 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 2836a9a508..f67285246c 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -71,8 +71,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -140,7 +138,7 @@ import org.jetbrains.annotations.Nullable;
  * - Stop the same nodes on the zone removing.
  * - Support the rebalance mechanism and start the new replication nodes when 
the rebalance triggers occurred.
  */
-public class PartitionReplicaLifecycleManager  extends
+public class PartitionReplicaLifecycleManager extends
         AbstractEventProducer<LocalPartitionReplicaEvent, 
LocalPartitionReplicaEventParameters> implements IgniteComponent {
     public static final String FEATURE_FLAG_NAME = 
"IGNITE_ZONE_BASED_REPLICATION";
     /* Feature flag for zone based collocation track */
@@ -214,7 +212,7 @@ public class PartitionReplicaLifecycleManager  extends
      * @param topologyService Topology service.
      * @param rebalanceScheduler Executor for scheduling rebalance routine.
      * @param partitionOperationsExecutor Striped executor on which partition 
operations (potentially requiring I/O with storages)
-     *     will be executed.
+     *         will be executed.
      * @param clockService Clock service.
      * @param placementDriver Placement driver.
      */
@@ -462,8 +460,6 @@ public class PartitionReplicaLifecycleManager  extends
 
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
             try {
-                AtomicReference<Long> stamp = new AtomicReference<>(null);
-
                 return replicaMgr.startReplica(
                                 replicaGrpId,
                                 (raftClient) -> new 
ZonePartitionReplicaListener(
@@ -473,31 +469,15 @@ public class PartitionReplicaLifecycleManager  extends
                                 raftGroupListener,
                                 raftGroupEventsListener,
                                 busyLock
-                        ).thenCompose(replica -> {
-                            zonePartitionsLocks.compute(zoneId, (id, lock) -> {
-                                if (lock == null) {
-                                    lock = new StampedLock();
-                                }
-
-                                stamp.set(lock.writeLock());
-
-                                return lock;
-                            });
-
+                        ).thenCompose(replica -> 
executeUnderZoneWriteLock(zoneId, () -> {
                             replicationGroupIds.add(replicaGrpId);
 
-                            return fireEvent(
-                                    
LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
-                                    new LocalPartitionReplicaEventParameters(
-                                            new 
ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId())
-                                    )
+                            var eventParams = new 
LocalPartitionReplicaEventParameters(
+                                    new ZonePartitionId(replicaGrpId.zoneId(), 
replicaGrpId.partitionId())
                             );
-                        })
-                        .whenComplete((unused, throwable) -> {
-                            if (stamp.get() != null) {
-                                
zonePartitionsLocks.get(zoneId).unlockWrite(stamp.get());
-                            }
-                        })
+
+                            return 
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED, eventParams);
+                        }))
                         .thenApply(unused -> false);
             } catch (NodeStoppingException e) {
                 return failedFuture(e);
@@ -571,7 +551,7 @@ public class PartitionReplicaLifecycleManager  extends
      * Writes the set of assignments to meta storage. If there are some 
assignments already, gets them from meta storage. Returns
      * the list of assignments that really are in meta storage.
      *
-     * @param zoneId  Zone id.
+     * @param zoneId Zone id.
      * @param assignmentsFuture Assignments future, to get the assignments 
that should be written.
      * @return Real list of assignments.
      */
@@ -930,7 +910,7 @@ public class PartitionReplicaLifecycleManager  extends
 
         if (shouldStopLocalServices) {
             return clientUpdateFuture.thenCompose(v -> 
stopAndDestroyPartition(zonePartitionId))
-                    .thenAccept(v -> { });
+                    .thenAccept(v -> {});
         } else {
             return clientUpdateFuture;
         }
@@ -1201,7 +1181,7 @@ public class PartitionReplicaLifecycleManager  extends
             Assignments pendingAssignments,
             long revision
     ) {
-        Entry reduceEntry  = 
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId), 
revision);
+        Entry reduceEntry = 
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId), 
revision);
 
         Assignments reduceAssignments = reduceEntry != null
                 ? Assignments.fromBytes(reduceEntry.value())
@@ -1250,7 +1230,7 @@ public class PartitionReplicaLifecycleManager  extends
         return replicaMgr.weakStopReplica(
                 zonePartitionId,
                 WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
-                () -> stopPartition(zonePartitionId).thenAccept(v -> { })
+                () -> stopPartition(zonePartitionId).thenAccept(v -> {})
         );
     }
 
@@ -1261,43 +1241,24 @@ public class PartitionReplicaLifecycleManager  extends
      * @return Future that will be completed after all resources have been 
closed.
      */
     private CompletableFuture<?> stopPartition(ZonePartitionId 
zonePartitionId) {
-        CompletableFuture<?> stopReplicaFuture;
-
-        AtomicReference<Long> stamp = new AtomicReference<>(null);
-
-        try {
-            zonePartitionsLocks.compute(zonePartitionId.zoneId(), (id, lock) 
-> {
-                if (lock == null) {
-                    lock = new StampedLock();
-                }
-
-                stamp.set(lock.writeLock());
-
-                return lock;
-            });
-
-            stopReplicaFuture = replicaMgr.stopReplica(zonePartitionId)
-                    .thenCompose((replicaWasStopped) -> {
-                        if (replicaWasStopped) {
-                            replicationGroupIds.remove(zonePartitionId);
-
-                            return 
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, new 
LocalPartitionReplicaEventParameters(
-                                    zonePartitionId));
-                        } else {
-                            return nullCompletedFuture();
-                        }
-                    }).whenComplete((result, th) -> {
-                        
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
-                    });
-
-        } catch (NodeStoppingException e) {
-            // No-op.
-            stopReplicaFuture = falseCompletedFuture();
-
-            
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
-        }
-
-        return stopReplicaFuture;
+        return executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
+            try {
+                return replicaMgr.stopReplica(zonePartitionId)
+                        .thenCompose((replicaWasStopped) -> {
+                            if (replicaWasStopped) {
+                                replicationGroupIds.remove(zonePartitionId);
+
+                                return 
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, new 
LocalPartitionReplicaEventParameters(
+                                        zonePartitionId));
+                            } else {
+                                return nullCompletedFuture();
+                            }
+                        });
+            } catch (NodeStoppingException e) {
+                // No-op.
+                return nullCompletedFuture();
+            }
+        });
     }
 
     /**
@@ -1342,19 +1303,7 @@ public class PartitionReplicaLifecycleManager  extends
      * @return Stamp, which must be used for further unlock.
      */
     public long lockZoneForRead(int zoneId) {
-        AtomicLong stamp = new AtomicLong();
-
-        zonePartitionsLocks.compute(zoneId, (id, l) -> {
-            if (l == null) {
-                l = new StampedLock();
-            }
-
-            stamp.set(l.readLock());
-
-            return l;
-        });
-
-        return stamp.get();
+        return zonePartitionsLocks.computeIfAbsent(zoneId, id -> new 
StampedLock()).readLock();
     }
 
     /**
@@ -1384,4 +1333,19 @@ public class PartitionReplicaLifecycleManager  extends
 
         ((ZonePartitionReplicaListener) 
replicaFut.join().listener()).addTableReplicaListener(tablePartitionId, 
createListener);
     }
+
+    private CompletableFuture<Void> executeUnderZoneWriteLock(int zoneId, 
Supplier<CompletableFuture<Void>> action) {
+        StampedLock lock = zonePartitionsLocks.computeIfAbsent(zoneId, id -> 
new StampedLock());
+
+        long stamp = lock.writeLock();
+
+        try {
+            return action.get()
+                    .whenComplete((v, e) -> lock.unlockWrite(stamp));
+        } catch (Throwable e) {
+            lock.unlockWrite(stamp);
+
+            return failedFuture(e);
+        }
+    }
 }

Reply via email to