This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9adec1bef99 [fix][meta] Fixed race condition between ResourceLock 
update and invalidation (#19817)
9adec1bef99 is described below

commit 9adec1bef99fc7aedff6238f11e3d891f6e47e36
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 15 18:35:31 2023 -0700

    [fix][meta] Fixed race condition between ResourceLock update and 
invalidation (#19817)
---
 .../coordination/impl/LockManagerImpl.java         |  2 +-
 .../coordination/impl/ResourceLockImpl.java        | 56 ++++++++++++++++------
 2 files changed, 42 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 097f15af276..ca768d38490 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -124,7 +124,7 @@ class LockManagerImpl<T> implements LockManager<T> {
             if (se == SessionEvent.SessionReestablished) {
                 log.info("Metadata store session has been re-established. 
Revalidating all the existing locks.");
                 for (ResourceLockImpl<T> lock : locks.values()) {
-                    futures.add(lock.revalidate(lock.getValue(), true));
+                    futures.add(lock.revalidate(lock.getValue(), true, true));
                 }
 
             } else if (se == SessionEvent.Reconnected) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index dea9aa1acb9..5271a73249d 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -44,7 +44,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
     private long version;
     private final CompletableFuture<Void> expiredFuture;
     private boolean revalidateAfterReconnection = false;
-    private CompletableFuture<Void> revalidateFuture;
+    private CompletableFuture<Void> pendingOperationFuture;
 
     private enum State {
         Init,
@@ -61,6 +61,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         this.path = path;
         this.version = -1;
         this.expiredFuture = new CompletableFuture<>();
+        this.pendingOperationFuture = CompletableFuture.completedFuture(null);
         this.state = State.Init;
     }
 
@@ -71,7 +72,24 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
 
     @Override
     public synchronized CompletableFuture<Void> updateValue(T newValue) {
-       return acquire(newValue);
+        // If there is an operation in progress, we're going to let it 
complete before attempting to
+        // update the value
+        if (pendingOperationFuture.isDone()) {
+            pendingOperationFuture = CompletableFuture.completedFuture(null);
+        }
+
+        pendingOperationFuture = pendingOperationFuture.thenCompose(v -> {
+            synchronized (ResourceLockImpl.this) {
+                if (state != State.Valid) {
+                    return CompletableFuture.failedFuture(
+                            new IllegalStateException("Lock was not in valid 
state: " + state));
+                }
+
+                return acquire(newValue);
+            }
+        });
+
+        return pendingOperationFuture;
     }
 
     @Override
@@ -128,7 +146,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> 
{
                 .thenRun(() -> result.complete(null))
                 .exceptionally(ex -> {
                     if (ex.getCause() instanceof LockBusyException) {
-                        revalidate(newValue, false)
+                        revalidate(newValue, false, false)
                                 .thenAccept(__ -> result.complete(null))
                                 .exceptionally(ex1 -> {
                                    result.completeExceptionally(ex1);
@@ -185,7 +203,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> 
{
         }
 
         log.info("Lock on resource {} was invalidated", path);
-        revalidate(value, true)
+        revalidate(value, true, true)
                 .thenRun(() -> log.info("Successfully revalidated the lock on 
{}", path));
     }
 
@@ -193,31 +211,39 @@ public class ResourceLockImpl<T> implements 
ResourceLock<T> {
         if (revalidateAfterReconnection) {
             revalidateAfterReconnection = false;
             log.warn("Revalidate lock at {} after reconnection", path);
-            return revalidate(value, true);
+            return revalidate(value, true, true);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
-    synchronized CompletableFuture<Void> revalidate(T newValue, boolean 
revalidateAfterReconnection) {
-        if (revalidateFuture == null || revalidateFuture.isDone()) {
-            revalidateFuture = doRevalidate(newValue);
+    synchronized CompletableFuture<Void> revalidate(T newValue, boolean 
trackPendingOperation,
+                                                    boolean 
revalidateAfterReconnection) {
+
+        final CompletableFuture<Void> trackFuture;
+
+        if (!trackPendingOperation) {
+            trackFuture = doRevalidate(newValue);
+        } else if (pendingOperationFuture.isDone()) {
+            pendingOperationFuture = doRevalidate(newValue);
+            trackFuture = pendingOperationFuture;
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Previous revalidating is not finished while 
revalidate newValue={}, value={}, version={}",
                         newValue, value, version);
             }
-            CompletableFuture<Void> newFuture = new CompletableFuture<>();
-            revalidateFuture.whenComplete((unused, throwable) -> {
-                doRevalidate(newValue).thenRun(() -> newFuture.complete(null))
+            trackFuture = new CompletableFuture<>();
+            trackFuture.whenComplete((unused, throwable) -> {
+                doRevalidate(newValue).thenRun(() -> 
trackFuture.complete(null))
                         .exceptionally(throwable1 -> {
-                            newFuture.completeExceptionally(throwable1);
+                            trackFuture.completeExceptionally(throwable1);
                             return null;
                         });
             });
-            revalidateFuture = newFuture;
+            pendingOperationFuture = trackFuture;
         }
-        revalidateFuture.exceptionally(ex -> {
+
+        trackFuture.exceptionally(ex -> {
             synchronized (ResourceLockImpl.this) {
                 Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                 if (!revalidateAfterReconnection || realCause instanceof 
BadVersionException
@@ -237,7 +263,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> 
{
             }
             return null;
         });
-        return revalidateFuture;
+        return trackFuture;
     }
 
     private synchronized CompletableFuture<Void> doRevalidate(T newValue) {

Reply via email to