This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new cc685334a2c [fix][meta] Fixed race condition between ResourceLock
update and invalidation (#19817)
cc685334a2c is described below
commit cc685334a2cfc412a9ac1a94ffa6503fb4e42d18
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 15 18:35:31 2023 -0700
[fix][meta] Fixed race condition between ResourceLock update and
invalidation (#19817)
---
.../coordination/impl/LockManagerImpl.java | 2 +-
.../coordination/impl/ResourceLockImpl.java | 56 ++++++++++++++++------
2 files changed, 42 insertions(+), 16 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index e677e99faf6..fb520e7da30 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl<T> implements LockManager<T> {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established.
Revalidating all the existing locks.");
for (ResourceLockImpl<T> lock : locks.values()) {
- futures.add(lock.revalidate(lock.getValue(), true));
+ futures.add(lock.revalidate(lock.getValue(), true, true));
}
} else if (se == SessionEvent.Reconnected) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 134ee8030d9..56f6ff41fe9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -44,7 +44,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private long version;
private final CompletableFuture<Void> expiredFuture;
private boolean revalidateAfterReconnection = false;
- private CompletableFuture<Void> revalidateFuture;
+ private CompletableFuture<Void> pendingOperationFuture;
private enum State {
Init,
@@ -61,6 +61,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
this.path = path;
this.version = -1;
this.expiredFuture = new CompletableFuture<>();
+ this.pendingOperationFuture = CompletableFuture.completedFuture(null);
this.state = State.Init;
}
@@ -71,7 +72,24 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
@Override
public synchronized CompletableFuture<Void> updateValue(T newValue) {
- return acquire(newValue);
+ // If there is an operation in progress, we're going to let it
complete before attempting to
+ // update the value
+ if (pendingOperationFuture.isDone()) {
+ pendingOperationFuture = CompletableFuture.completedFuture(null);
+ }
+
+ pendingOperationFuture = pendingOperationFuture.thenCompose(v -> {
+ synchronized (ResourceLockImpl.this) {
+ if (state != State.Valid) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Lock was not in valid
state: " + state));
+ }
+
+ return acquire(newValue);
+ }
+ });
+
+ return pendingOperationFuture;
}
@Override
@@ -128,7 +146,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T>
{
.thenRun(() -> result.complete(null))
.exceptionally(ex -> {
if (ex.getCause() instanceof LockBusyException) {
- revalidate(newValue, false)
+ revalidate(newValue, false, false)
.thenAccept(__ -> result.complete(null))
.exceptionally(ex1 -> {
result.completeExceptionally(ex1);
@@ -185,7 +203,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T>
{
}
log.info("Lock on resource {} was invalidated", path);
- revalidate(value, true)
+ revalidate(value, true, true)
.thenRun(() -> log.info("Successfully revalidated the lock on
{}", path));
}
@@ -193,31 +211,39 @@ public class ResourceLockImpl<T> implements
ResourceLock<T> {
if (revalidateAfterReconnection) {
revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
- return revalidate(value, true);
+ return revalidate(value, true, true);
} else {
return CompletableFuture.completedFuture(null);
}
}
- synchronized CompletableFuture<Void> revalidate(T newValue, boolean
revalidateAfterReconnection) {
- if (revalidateFuture == null || revalidateFuture.isDone()) {
- revalidateFuture = doRevalidate(newValue);
+ synchronized CompletableFuture<Void> revalidate(T newValue, boolean
trackPendingOperation,
+ boolean
revalidateAfterReconnection) {
+
+ final CompletableFuture<Void> trackFuture;
+
+ if (!trackPendingOperation) {
+ trackFuture = doRevalidate(newValue);
+ } else if (pendingOperationFuture.isDone()) {
+ pendingOperationFuture = doRevalidate(newValue);
+ trackFuture = pendingOperationFuture;
} else {
if (log.isDebugEnabled()) {
log.debug("Previous revalidating is not finished while
revalidate newValue={}, value={}, version={}",
newValue, value, version);
}
- CompletableFuture<Void> newFuture = new CompletableFuture<>();
- revalidateFuture.whenComplete((unused, throwable) -> {
- doRevalidate(newValue).thenRun(() -> newFuture.complete(null))
+ trackFuture = new CompletableFuture<>();
+ trackFuture.whenComplete((unused, throwable) -> {
+ doRevalidate(newValue).thenRun(() ->
trackFuture.complete(null))
.exceptionally(throwable1 -> {
- newFuture.completeExceptionally(throwable1);
+ trackFuture.completeExceptionally(throwable1);
return null;
});
});
- revalidateFuture = newFuture;
+ pendingOperationFuture = trackFuture;
}
- revalidateFuture.exceptionally(ex -> {
+
+ trackFuture.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (!revalidateAfterReconnection || realCause instanceof
BadVersionException
@@ -237,7 +263,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T>
{
}
return null;
});
- return revalidateFuture;
+ return trackFuture;
}
private synchronized CompletableFuture<Void> doRevalidate(T newValue) {