mattisonchao commented on code in PR #19844:
URL: https://github.com/apache/pulsar/pull/19844#discussion_r1140971660


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java:
##########
@@ -197,76 +197,67 @@ private CompletableFuture<Void> 
acquireWithNoRevalidation(T newValue) {
     }
 
     synchronized void lockWasInvalidated() {
-        if (state != State.Valid) {
-            // Ignore notifications while we're releasing the lock ourselves
-            return;
-        }
-
-        log.info("Lock on resource {} was invalidated", path);
-        revalidate(value, true, true)
-                .thenRun(() -> log.info("Successfully revalidated the lock on 
{}", path));
+        log.info("Lock on resource {} was invalidated. state {}", path, state);
+        silentRevalidateOnce();
     }
 
     synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() 
{
         if (revalidateAfterReconnection) {
             revalidateAfterReconnection = false;
             log.warn("Revalidate lock at {} after reconnection", path);
-            return revalidate(value, true, true);
+            return silentRevalidateOnce();
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
-    synchronized CompletableFuture<Void> revalidate(T newValue, boolean 
trackPendingOperation,
-                                                    boolean 
revalidateAfterReconnection) {
-
+    /**
+     * Revalidate the distributed lock if it is not released.
+     * This method is thread-safe and it will perform multiple re-validation 
operations in turn.
+     */
+    synchronized CompletableFuture<Void> silentRevalidateOnce() {
         final CompletableFuture<Void> trackFuture;
-
-        if (!trackPendingOperation) {
-            trackFuture = doRevalidate(newValue);
-        } else if (pendingOperationFuture.isDone()) {
-            pendingOperationFuture = doRevalidate(newValue);
-            trackFuture = pendingOperationFuture;
+        // If the lock is first revalidated. the pending operation future 
should always be completed.
+        if (pendingOperationFuture.isDone()) {
+            trackFuture = revalidate(value);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Previous revalidating is not finished while 
revalidate newValue={}, value={}, version={}",
-                        newValue, value, version);
+                        value, value, version);
             }
-            trackFuture = new CompletableFuture<>();
-            trackFuture.whenComplete((unused, throwable) -> {
-                doRevalidate(newValue).thenRun(() -> 
trackFuture.complete(null))
-                        .exceptionally(throwable1 -> {
-                            trackFuture.completeExceptionally(throwable1);
-                            return null;
-                        });
-            });
-            pendingOperationFuture = trackFuture;
+            trackFuture = pendingOperationFuture.exceptionally(ex -> null)
+                    .thenCompose(__ -> revalidate(value));
         }
-
-        trackFuture.exceptionally(ex -> {
-            synchronized (ResourceLockImpl.this) {
-                Throwable realCause = FutureUtil.unwrapCompletionException(ex);
-                if (!revalidateAfterReconnection || realCause instanceof 
BadVersionException
-                        || realCause instanceof LockBusyException) {
-                    log.warn("Failed to revalidate the lock at {}. Marked as 
expired. {}",
-                            path, realCause.getMessage());
-                    state = State.Released;
-                    expiredFuture.complete(null);
-                } else {
-                    // We failed to revalidate the lock due to connectivity 
issue
-                    // Continue assuming we hold the lock, until we can 
revalidate it, either
-                    // on Reconnected or SessionReestablished events.
-                    ResourceLockImpl.this.revalidateAfterReconnection = true;
-                    log.warn("Failed to revalidate the lock at {}. Retrying 
later on reconnection {}", path,
-                            realCause.getMessage());
-                }
-            }
-            return null;
-        });
-        return trackFuture;
+        // Assign the pending operation future here to ensure
+        return pendingOperationFuture = trackFuture
+                .thenRun(() -> log.info("Successfully revalidated the lock on 
{}", path))
+                .exceptionally(ex -> {
+                    synchronized (ResourceLockImpl.this) {
+                        Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                        if (realCause instanceof BadVersionException || 
realCause instanceof LockBusyException) {
+                            log.warn("Failed to revalidate the lock at {}. 
Marked as expired. {}",
+                                    path, realCause.getMessage());
+                            state = State.Released;
+                            expiredFuture.complete(null);
+                        } else {
+                            // We failed to revalidate the lock due to 
connectivity issue
+                            // Continue assuming we hold the lock, until we 
can revalidate it, either
+                            // on Reconnected or SessionReestablished events.
+                            revalidateAfterReconnection = true;
+                            log.warn("Failed to revalidate the lock at {}. 
Retrying later on reconnection {}", path,
+                                    realCause.getMessage());
+                        }
+                    }
+                    return null;
+                });
     }
 
-    private synchronized CompletableFuture<Void> doRevalidate(T newValue) {
+    private synchronized CompletableFuture<Void> revalidate(T newValue) {

Review Comment:
   Two places invoke the revalidate method before.
   
   1. acquiring a lock.
   2. the session or event notification.
   
   The `acquiring a lock.` don't need to catch exceptions since at that time we 
want to acquire the new lock if the operation fails the caller will do nothing 
about `expireFuture`. So, we don't need to complete it. I think just keeping it 
to the init state is fine.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to