mattisonchao commented on code in PR #19844:
URL: https://github.com/apache/pulsar/pull/19844#discussion_r1139017878
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java:
##########
@@ -197,76 +191,81 @@ private CompletableFuture<Void>
acquireWithNoRevalidation(T newValue) {
}
synchronized void lockWasInvalidated() {
- if (state != State.Valid) {
- // Ignore notifications while we're releasing the lock ourselves
- return;
- }
-
- log.info("Lock on resource {} was invalidated", path);
- revalidate(value, true, true)
- .thenRun(() -> log.info("Successfully revalidated the lock on
{}", path));
+ log.info("Lock on resource {} was invalidated. state {}", path, state);
+ silentRevalidateOnce();
}
synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection()
{
if (revalidateAfterReconnection) {
- revalidateAfterReconnection = false;
+ ResourceLockImpl.this.revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
- return revalidate(value, true, true);
+ return silentRevalidateOnce();
} else {
return CompletableFuture.completedFuture(null);
}
}
- synchronized CompletableFuture<Void> revalidate(T newValue, boolean
trackPendingOperation,
- boolean
revalidateAfterReconnection) {
-
+ /**
+ * Revalidate the distributed lock if it is not released.
+ * This method is thread-safe and it will perform multiple re-validation
operations in turn.
+ * @param newValue the lock value
+ */
+ synchronized CompletableFuture<Void> revalidate(T newValue) {
final CompletableFuture<Void> trackFuture;
-
- if (!trackPendingOperation) {
+ // If the lock is first revalidated. the pending operation future
should always be completed.
+ if (pendingOperationFuture.isDone()) {
trackFuture = doRevalidate(newValue);
- } else if (pendingOperationFuture.isDone()) {
- pendingOperationFuture = doRevalidate(newValue);
- trackFuture = pendingOperationFuture;
} else {
if (log.isDebugEnabled()) {
log.debug("Previous revalidating is not finished while
revalidate newValue={}, value={}, version={}",
newValue, value, version);
}
- trackFuture = new CompletableFuture<>();
- trackFuture.whenComplete((unused, throwable) -> {
- doRevalidate(newValue).thenRun(() ->
trackFuture.complete(null))
- .exceptionally(throwable1 -> {
- trackFuture.completeExceptionally(throwable1);
- return null;
- });
- });
- pendingOperationFuture = trackFuture;
+ trackFuture = pendingOperationFuture.exceptionally(ex -> null)
+ .thenCompose(__ -> doRevalidate(newValue));
}
-
- trackFuture.exceptionally(ex -> {
- synchronized (ResourceLockImpl.this) {
- Throwable realCause = FutureUtil.unwrapCompletionException(ex);
- if (!revalidateAfterReconnection || realCause instanceof
BadVersionException
- || realCause instanceof LockBusyException) {
- log.warn("Failed to revalidate the lock at {}. Marked as
expired. {}",
- path, realCause.getMessage());
- state = State.Released;
- expiredFuture.complete(null);
- } else {
- // We failed to revalidate the lock due to connectivity
issue
- // Continue assuming we hold the lock, until we can
revalidate it, either
- // on Reconnected or SessionReestablished events.
- ResourceLockImpl.this.revalidateAfterReconnection = true;
- log.warn("Failed to revalidate the lock at {}. Retrying
later on reconnection {}", path,
- realCause.getMessage());
- }
- }
- return null;
- });
return trackFuture;
}
+ /**
+ * This method designed for background notification usage.
+ * It will auto mark the lock is released if revalidation operation got
one of exceptions as follows:
+ * - LockBusyException
+ * - BadVersionException
+ *
+ * @return The revalidation future #Notice: It will not return any useful
result,
+ * the caller needs to re-check the lock state after silent revalidation
once.
+ */
+ synchronized CompletableFuture<Void> silentRevalidateOnce() {
+ // Assign the pending operation future here to ensure
+ return pendingOperationFuture = revalidate(value)
+ .thenRun(() -> log.info("Successfully revalidated the lock on
{}", path))
+ .exceptionally(ex -> {
+ synchronized (ResourceLockImpl.this) {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof BadVersionException ||
realCause instanceof LockBusyException) {
+ log.warn("Failed to revalidate the lock at {}.
Marked as expired. {}",
+ path, realCause.getMessage());
+ state = State.Released;
+ expiredFuture.complete(null);
+ } else {
+ // We failed to revalidate the lock due to
connectivity issue
+ // Continue assuming we hold the lock, until we
can revalidate it, either
+ // on Reconnected or SessionReestablished events.
+ ResourceLockImpl.this.revalidateAfterReconnection
= true;
+ log.warn("Failed to revalidate the lock at {}.
Retrying later on reconnection {}", path,
+ realCause.getMessage());
+ }
+ }
+ return null;
+ });
+ }
+
private synchronized CompletableFuture<Void> doRevalidate(T newValue) {
+ // Since the distributed lock has been expired, we don't need to
revalidate it.
+ if (state != State.Valid && state != State.Init) {
Review Comment:
3. Add strict revalidation state check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]