This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new e44115e2 CURATOR-675: Fix AsyncCuratorFramework's 
UnhandledErrorListener hang AsyncStage (#465)
e44115e2 is described below

commit e44115e2fb5ea77af1bfe825ce122caf59e53fc1
Author: Kezhu Wang <[email protected]>
AuthorDate: Mon Jul 3 21:32:09 2023 +0800

    CURATOR-675: Fix AsyncCuratorFramework's UnhandledErrorListener hang 
AsyncStage (#465)
    
    `AsyncCuratorFramework` binds its `UnhandledErrorListener` to operation
    level which could prevent `AsyncStage` from completion if exception is
    swallowed by that listener.
    
    In any cases, a framework level error listener should not prevent an
    operation future from completion.
---
 .../curator/framework/api/ErrorListenerEnsembleable.java      |  3 ++-
 .../framework/api/ErrorListenerMultiTransactionMain.java      |  3 ++-
 .../curator/framework/api/ErrorListenerPathAndBytesable.java  |  3 ++-
 .../apache/curator/framework/api/ErrorListenerPathable.java   |  3 ++-
 .../framework/api/ErrorListenerReconfigBuilderMain.java       |  3 ++-
 .../java/org/apache/curator/test/compatibility/Timing2.java   |  8 ++++++++
 .../org/apache/curator/x/async/details/BuilderCommon.java     | 11 ++++++++++-
 .../curator/framework/imps/TestFrameworkBackground.java       | 10 ++++++----
 .../apache/curator/x/async/CompletableBaseClassForTests.java  |  2 +-
 9 files changed, 35 insertions(+), 11 deletions(-)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
index d658f8f2..9bff13af 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerEnsembleable.java
@@ -23,10 +23,11 @@ public interface ErrorListenerEnsembleable<T> extends 
Ensembleable<T> {
     /**
      * Set an error listener for this background operation. If an exception
      * occurs while processing the call in the background, this listener will
-     * be called
+     * be called.
      *
      * @param listener the listener
      * @return this for chaining
+     * @apiNote swallow the exception will prevent {@link BackgroundCallback} 
from completion
      */
     Ensembleable<T> withUnhandledErrorListener(UnhandledErrorListener 
listener);
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
index acee4bf1..fa2f7d87 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerMultiTransactionMain.java
@@ -25,10 +25,11 @@ public interface ErrorListenerMultiTransactionMain extends 
CuratorMultiTransacti
     /**
      * Set an error listener for this background operation. If an exception
      * occurs while processing the call in the background, this listener will
-     * be called
+     * be called.
      *
      * @param listener the listener
      * @return this for chaining
+     * @apiNote swallow the exception will prevent {@link BackgroundCallback} 
from completion
      */
     CuratorMultiTransactionMain 
withUnhandledErrorListener(UnhandledErrorListener listener);
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
index 0aaee4a5..ea98e175 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java
@@ -23,10 +23,11 @@ public interface ErrorListenerPathAndBytesable<T> extends 
PathAndBytesable<T> {
     /**
      * Set an error listener for this background operation. If an exception
      * occurs while processing the call in the background, this listener will
-     * be called
+     * be called.
      *
      * @param listener the listener
      * @return this for chaining
+     * @apiNote swallow the exception will prevent {@link BackgroundCallback} 
from completion
      */
     PathAndBytesable<T> withUnhandledErrorListener(UnhandledErrorListener 
listener);
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
index a2e51763..a5d3408c 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java
@@ -23,10 +23,11 @@ public interface ErrorListenerPathable<T> extends 
Pathable<T> {
     /**
      * Set an error listener for this background operation. If an exception
      * occurs while processing the call in the background, this listener will
-     * be called
+     * be called.
      *
      * @param listener the listener
      * @return this for chaining
+     * @apiNote swallow the exception will prevent {@link BackgroundCallback} 
from completion
      */
     Pathable<T> withUnhandledErrorListener(UnhandledErrorListener listener);
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
index e5ad09e9..bc44eb81 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerReconfigBuilderMain.java
@@ -23,10 +23,11 @@ public interface ErrorListenerReconfigBuilderMain extends 
ReconfigBuilderMain {
     /**
      * Set an error listener for this background operation. If an exception
      * occurs while processing the call in the background, this listener will
-     * be called
+     * be called.
      *
      * @param listener the listener
      * @return this for chaining
+     * @apiNote swallow the exception will prevent {@link BackgroundCallback} 
from completion
      */
     ReconfigBuilderMain withUnhandledErrorListener(UnhandledErrorListener 
listener);
 }
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java 
b/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java
index 946b65eb..590d6ed6 100644
--- 
a/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java
+++ 
b/curator-test/src/main/java/org/apache/curator/test/compatibility/Timing2.java
@@ -20,7 +20,9 @@
 package org.apache.curator.test.compatibility;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -106,6 +108,12 @@ public class Timing2 {
         return (int) value;
     }
 
+    public <T> T getFuture(CompletableFuture<T> future)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        Timing2 m = forWaiting();
+        return future.get(m.value, m.unit);
+    }
+
     /**
      * Wait on the given latch
      *
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
index cb0c056b..b2b0f620 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.x.async.details;
 
+import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.Backgrounding;
 import org.apache.curator.x.async.WatchMode;
 
@@ -34,6 +35,14 @@ class BuilderCommon<T> {
     BuilderCommon(Filters filters, WatchMode watchMode, BackgroundProc<T> 
proc) {
         watcher = (watchMode != null) ? new InternalWatcher(watchMode, 
filters.getWatcherFilter()) : null;
         internalCallback = new InternalCallback<>(proc, watcher, 
filters.getResultFilter());
-        backgrounding = new Backgrounding(internalCallback, 
filters.getListener());
+        UnhandledErrorListener listener = filters.getListener();
+        if (listener != null) {
+            UnhandledErrorListener wrappedListener = listener;
+            listener = (message, e) -> {
+                wrappedListener.unhandledError(message, e);
+                internalCallback.completeExceptionally(e);
+            };
+        }
+        backgrounding = new Backgrounding(internalCallback, listener);
     }
 }
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index fe18aa90..df9d373e 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -37,17 +37,18 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestFrameworkBackground extends BaseClassForTests {
+public class TestFrameworkBackground extends CompletableBaseClassForTests {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Test
@@ -91,8 +92,9 @@ public class TestFrameworkBackground extends 
BaseClassForTests {
                     errorLatch.countDown();
                 }
             };
-            async.with(listener).create().forPath("/foo");
-            assertTrue(new Timing().awaitLatch(errorLatch));
+            AsyncStage<String> stage = 
async.with(listener).create().forPath("/foo");
+            assertTrue(timing.awaitLatch(errorLatch));
+            exceptional(stage, UnsupportedOperationException.class);
         } finally {
             CloseableUtils.closeQuietly(client);
         }
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
index bf97631b..ef685e38 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
@@ -35,7 +35,7 @@ public abstract class CompletableBaseClassForTests extends 
BaseClassForTests {
 
     protected void joinThrowable(CompletionStage<?> stage) throws Throwable {
         try {
-            stage.toCompletableFuture().get();
+            timing.getFuture(stage.toCompletableFuture());
         } catch (Exception ex) {
             throw Throwables.getRootCause(ex);
         }

Reply via email to