This is an automated email from the ASF dual-hosted git repository.
kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new e44115e2 CURATOR-675: Fix AsyncCuratorFramework's
UnhandledErrorListener hang AsyncStage (#465)
e44115e2 is described below
commit e44115e2fb5ea77af1bfe825ce122caf59e53fc1
Author: Kezhu Wang <[email protected]>
AuthorDate: Mon Jul 3 21:32:09 2023 +0800
CURATOR-675: Fix AsyncCuratorFramework's UnhandledErrorListener hang
AsyncStage (#465)
`AsyncCuratorFramework` binds its `UnhandledErrorListener` to operation
level which could prevent `AsyncStage` from completion if exception is
swallowed by that listener.
In any cases, a framework level error listener should not prevent an
operation future from completion.
---
.../curator/framework/api/ErrorListenerEnsembleable.java | 3 ++-
.../framework/api/ErrorListenerMultiTransactionMain.java | 3 ++-
.../curator/framework/api/ErrorListenerPathAndBytesable.java | 3 ++-
.../apache/curator/framework/api/ErrorListenerPathable.java | 3 ++-
.../framework/api/ErrorListenerReconfigBuilderMain.java | 3 ++-
.../java/org/apache/curator/test/compatibility/Timing2.java | 8 ++++++++
.../org/apache/curator/x/async/details/BuilderCommon.java | 11 ++++++++++-
.../curator/framework/imps/TestFrameworkBackground.java | 10 ++++++----
.../apache/curator/x/async/CompletableBaseClassForTests.java | 2 +-
9 files changed, 35 insertions(+), 11 deletions(-)
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
index d658f8f2..9bff13af 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
@@ -23,10 +23,11 @@ public interface ErrorListenerEnsembleable<T> extends
Ensembleable<T> {
/**
* Set an error listener for this background operation. If an exception
* occurs while processing the call in the background, this listener will
- * be called
+ * be called.
*
* @param listener the listener
* @return this for chaining
+ * @apiNote swallow the exception will prevent {@link BackgroundCallback}
from completion
*/
Ensembleable<T> withUnhandledErrorListener(UnhandledErrorListener
listener);
}
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
index acee4bf1..fa2f7d87 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
@@ -25,10 +25,11 @@ public interface ErrorListenerMultiTransactionMain extends
CuratorMultiTransacti
/**
* Set an error listener for this background operation. If an exception
* occurs while processing the call in the background, this listener will
- * be called
+ * be called.
*
* @param listener the listener
* @return this for chaining
+ * @apiNote swallow the exception will prevent {@link BackgroundCallback}
from completion
*/
CuratorMultiTransactionMain
withUnhandledErrorListener(UnhandledErrorListener listener);
}
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
index 0aaee4a5..ea98e175 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
@@ -23,10 +23,11 @@ public interface ErrorListenerPathAndBytesable<T> extends
PathAndBytesable<T> {
/**
* Set an error listener for this background operation. If an exception
* occurs while processing the call in the background, this listener will
- * be called
+ * be called.
*
* @param listener the listener
* @return this for chaining
+ * @apiNote swallow the exception will prevent {@link BackgroundCallback}
from completion
*/
PathAndBytesable<T> withUnhandledErrorListener(UnhandledErrorListener
listener);
}
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
index a2e51763..a5d3408c 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
@@ -23,10 +23,11 @@ public interface ErrorListenerPathable<T> extends
Pathable<T> {
/**
* Set an error listener for this background operation. If an exception
* occurs while processing the call in the background, this listener will
- * be called
+ * be called.
*
* @param listener the listener
* @return this for chaining
+ * @apiNote swallow the exception will prevent {@link BackgroundCallback}
from completion
*/
Pathable<T> withUnhandledErrorListener(UnhandledErrorListener listener);
}
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
index e5ad09e9..bc44eb81 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
@@ -23,10 +23,11 @@ public interface ErrorListenerReconfigBuilderMain extends
ReconfigBuilderMain {
/**
* Set an error listener for this background operation. If an exception
* occurs while processing the call in the background, this listener will
- * be called
+ * be called.
*
* @param listener the listener
* @return this for chaining
+ * @apiNote swallow the exception will prevent {@link BackgroundCallback}
from completion
*/
ReconfigBuilderMain withUnhandledErrorListener(UnhandledErrorListener
listener);
}
diff --git
a/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java
b/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java
index 946b65eb..590d6ed6 100644
---
a/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java
+++
b/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java
@@ -20,7 +20,9 @@
package org.apache.curator.test.compatibility;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -106,6 +108,12 @@ public class Timing2 {
return (int) value;
}
+ public <T> T getFuture(CompletableFuture<T> future)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ Timing2 m = forWaiting();
+ return future.get(m.value, m.unit);
+ }
+
/**
* Wait on the given latch
*
diff --git
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
index cb0c056b..b2b0f620 100644
---
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
+++
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
@@ -19,6 +19,7 @@
package org.apache.curator.x.async.details;
+import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.Backgrounding;
import org.apache.curator.x.async.WatchMode;
@@ -34,6 +35,14 @@ class BuilderCommon<T> {
BuilderCommon(Filters filters, WatchMode watchMode, BackgroundProc<T>
proc) {
watcher = (watchMode != null) ? new InternalWatcher(watchMode,
filters.getWatcherFilter()) : null;
internalCallback = new InternalCallback<>(proc, watcher,
filters.getResultFilter());
- backgrounding = new Backgrounding(internalCallback,
filters.getListener());
+ UnhandledErrorListener listener = filters.getListener();
+ if (listener != null) {
+ UnhandledErrorListener wrappedListener = listener;
+ listener = (message, e) -> {
+ wrappedListener.unhandledError(message, e);
+ internalCallback.completeExceptionally(e);
+ };
+ }
+ backgrounding = new Backgrounding(internalCallback, listener);
}
}
diff --git
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index fe18aa90..df9d373e 100644
---
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -37,17 +37,18 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TestFrameworkBackground extends BaseClassForTests {
+public class TestFrameworkBackground extends CompletableBaseClassForTests {
private final Logger log = LoggerFactory.getLogger(getClass());
@Test
@@ -91,8 +92,9 @@ public class TestFrameworkBackground extends
BaseClassForTests {
errorLatch.countDown();
}
};
- async.with(listener).create().forPath("/foo");
- assertTrue(new Timing().awaitLatch(errorLatch));
+ AsyncStage<String> stage =
async.with(listener).create().forPath("/foo");
+ assertTrue(timing.awaitLatch(errorLatch));
+ exceptional(stage, UnsupportedOperationException.class);
} finally {
CloseableUtils.closeQuietly(client);
}
diff --git
a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
index bf97631b..ef685e38 100644
---
a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
+++
b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
@@ -35,7 +35,7 @@ public abstract class CompletableBaseClassForTests extends
BaseClassForTests {
protected void joinThrowable(CompletionStage<?> stage) throws Throwable {
try {
- stage.toCompletableFuture().get();
+ timing.getFuture(stage.toCompletableFuture());
} catch (Exception ex) {
throw Throwables.getRootCause(ex);
}