Added a method for background operations to intercept exeptions.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/84996801 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/84996801 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/84996801 Branch: refs/heads/CURATOR-306 Commit: 8499680187d1f1061ca8597886e9f869bf637321 Parents: 18e912d Author: randgalt <[email protected]> Authored: Mon Mar 7 09:15:07 2016 -0500 Committer: randgalt <[email protected]> Committed: Mon Mar 7 09:15:07 2016 -0500 ---------------------------------------------------------------------- .../api/BackgroundPathAndBytesable.java | 2 +- .../framework/api/BackgroundPathable.java | 2 +- .../api/ErrorListenerPathAndBytesable.java | 14 +++ .../framework/api/ErrorListenerPathable.java | 14 +++ .../curator/framework/imps/Backgrounding.java | 80 ++++++++++---- .../framework/imps/CreateBuilderImpl.java | 110 +++++++++++-------- .../framework/imps/DeleteBuilderImpl.java | 79 +++++++------ .../framework/imps/ExistsBuilderImpl.java | 63 ++++++----- .../framework/imps/GetACLBuilderImpl.java | 50 ++++++--- .../framework/imps/GetChildrenBuilderImpl.java | 62 +++++++---- .../framework/imps/GetDataBuilderImpl.java | 94 ++++++++-------- .../framework/imps/SetACLBuilderImpl.java | 66 ++++++----- .../framework/imps/SetDataBuilderImpl.java | 85 +++++++------- .../curator/framework/imps/SyncBuilderImpl.java | 56 ++++++---- .../framework/imps/TestFrameworkBackground.java | 50 ++++++++- 15 files changed, 523 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathAndBytesable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathAndBytesable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathAndBytesable.java index 5cefd08..ce9d4eb 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathAndBytesable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathAndBytesable.java @@ -19,7 +19,7 @@ package org.apache.curator.framework.api; public interface BackgroundPathAndBytesable<T> extends - Backgroundable<PathAndBytesable<T>>, + Backgroundable<ErrorListenerPathAndBytesable<T>>, PathAndBytesable<T> { } http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathable.java index 956cf6a..5a053a2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathable.java @@ -19,7 +19,7 @@ package org.apache.curator.framework.api; public interface BackgroundPathable<T> extends - Backgroundable<Pathable<T>>, + Backgroundable<ErrorListenerPathable<T>>, Pathable<T> { } http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java new file mode 100644 index 0000000..9882a3e --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathAndBytesable.java @@ -0,0 +1,14 @@ +package org.apache.curator.framework.api; + +public interface ErrorListenerPathAndBytesable<T> extends PathAndBytesable<T> +{ + /** + * Set an error listener for this background operation. If an exception + * occurs while processing the call in the background, this listener will + * be called + * + * @param listener the listener + * @return this for chaining + */ + PathAndBytesable<T> withUnhandledErrorListener(UnhandledErrorListener listener); +} http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java new file mode 100644 index 0000000..8245076 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ErrorListenerPathable.java @@ -0,0 +1,14 @@ +package org.apache.curator.framework.api; + +public interface ErrorListenerPathable<T> extends Pathable<T> +{ + /** + * Set an error listener for this background operation. If an exception + * occurs while processing the call in the background, this listener will + * be called + * + * @param listener the listener + * @return this for chaining + */ + Pathable<T> withUnhandledErrorListener(UnhandledErrorListener listener); +} http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java index 1bb2423..dac06c2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java @@ -16,26 +16,31 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; +import com.google.common.base.Throwables; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import java.util.concurrent.Executor; class Backgrounding { - private final boolean inBackground; - private final Object context; - private final BackgroundCallback callback; + private final boolean inBackground; + private final Object context; + private final BackgroundCallback callback; + private final UnhandledErrorListener errorListener; Backgrounding(Object context) { this.inBackground = true; this.context = context; this.callback = null; + errorListener = null; } Backgrounding(BackgroundCallback callback) @@ -43,6 +48,7 @@ class Backgrounding this.inBackground = true; this.context = null; this.callback = callback; + errorListener = null; } Backgrounding(boolean inBackground) @@ -50,6 +56,7 @@ class Backgrounding this.inBackground = inBackground; this.context = null; this.callback = null; + errorListener = null; } Backgrounding(BackgroundCallback callback, Object context) @@ -57,6 +64,7 @@ class Backgrounding this.inBackground = true; this.context = context; this.callback = callback; + errorListener = null; } Backgrounding(CuratorFrameworkImpl client, BackgroundCallback callback, Object context, Executor executor) @@ -69,11 +77,24 @@ class Backgrounding this(wrapCallback(client, callback, executor)); } + Backgrounding(Backgrounding rhs, UnhandledErrorListener errorListener) + { + if ( rhs == null ) + { + rhs = new Backgrounding(); + } + this.inBackground = rhs.inBackground; + this.context = rhs.context; + this.callback = rhs.callback; + this.errorListener = errorListener; + } + Backgrounding() { inBackground = false; context = null; this.callback = null; + errorListener = null; } boolean inBackground() @@ -91,6 +112,25 @@ class Backgrounding return callback; } + void checkError(Throwable e) throws Exception + { + if ( e != null ) + { + if ( errorListener != null ) + { + errorListener.unhandledError("n/a", e); + } + else if ( e instanceof Exception ) + { + throw (Exception)e; + } + else + { + Throwables.propagate(e); + } + } + } + private static BackgroundCallback wrapCallback(final CuratorFrameworkImpl client, final BackgroundCallback callback, final Executor executor) { return new BackgroundCallback() @@ -99,28 +139,28 @@ class Backgrounding public void processResult(CuratorFramework dummy, final CuratorEvent event) throws Exception { executor.execute - ( - new Runnable() - { - @Override - public void run() + ( + new Runnable() { - try - { - callback.processResult(client, event); - } - catch ( Exception e ) + @Override + public void run() { - ThreadUtils.checkInterrupted(e); - if ( e instanceof KeeperException ) + try + { + callback.processResult(client, event); + } + catch ( Exception e ) { - client.validateConnection(client.codeToState(((KeeperException)e).code())); + ThreadUtils.checkInterrupted(e); + if ( e instanceof KeeperException ) + { + client.validateConnection(client.codeToState(((KeeperException)e).code())); + } + client.logError("Background operation result handling threw exception", e); } - client.logError("Background operation result handling threw exception", e); } } - } - ); + ); } }; } http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index e11d74f..0f893d8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -41,7 +41,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes> +class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> { private final CuratorFrameworkImpl client; private CreateMode createMode; @@ -151,37 +151,37 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) { return CreateBuilderImpl.this.inBackground(callback, context); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) { return CreateBuilderImpl.this.inBackground(callback, context, executor); } @Override - public PathAndBytesable<String> inBackground() + public ErrorListenerPathAndBytesable<String> inBackground() { return CreateBuilderImpl.this.inBackground(); } @Override - public PathAndBytesable<String> inBackground(Object context) + public ErrorListenerPathAndBytesable<String> inBackground(Object context) { return CreateBuilderImpl.this.inBackground(context); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback) { return CreateBuilderImpl.this.inBackground(callback); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) { return CreateBuilderImpl.this.inBackground(callback, executor); } @@ -219,37 +219,37 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt } @Override - public PathAndBytesable<String> inBackground() + public ErrorListenerPathAndBytesable<String> inBackground() { return CreateBuilderImpl.this.inBackground(); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) { return CreateBuilderImpl.this.inBackground(callback, context); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) { return CreateBuilderImpl.this.inBackground(callback, context, executor); } @Override - public PathAndBytesable<String> inBackground(Object context) + public ErrorListenerPathAndBytesable<String> inBackground(Object context) { return CreateBuilderImpl.this.inBackground(context); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback) { return CreateBuilderImpl.this.inBackground(callback); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) { return CreateBuilderImpl.this.inBackground(callback, executor); } @@ -302,37 +302,37 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt } @Override - public PathAndBytesable<String> inBackground() + public ErrorListenerPathAndBytesable<String> inBackground() { return CreateBuilderImpl.this.inBackground(); } @Override - public PathAndBytesable<String> inBackground(Object context) + public ErrorListenerPathAndBytesable<String> inBackground(Object context) { return CreateBuilderImpl.this.inBackground(context); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback) { return CreateBuilderImpl.this.inBackground(callback); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) { return CreateBuilderImpl.this.inBackground(callback, context); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) { return CreateBuilderImpl.this.inBackground(callback, executor); } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) { return CreateBuilderImpl.this.inBackground(callback, context, executor); } @@ -400,48 +400,55 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public PathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override - public PathAndBytesable<String> inBackground() + public ErrorListenerPathAndBytesable<String> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public PathAndBytesable<String> inBackground(Object context) + public ErrorListenerPathAndBytesable<String> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override + public PathAndBytesable<String> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public String forPath(String path) throws Exception { return forPath(path, client.getDefaultData()); @@ -501,32 +508,39 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt @Override public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background"); - client.getZooKeeper().create - ( - operationAndData.getData().getPath(), - operationAndData.getData().getData(), - acling.getAclList(operationAndData.getData().getPath()), - createMode, - new AsyncCallback.StringCallback() - { - @Override - public void processResult(int rc, String path, Object ctx, String name) + try + { + final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background"); + client.getZooKeeper().create + ( + operationAndData.getData().getPath(), + operationAndData.getData().getData(), + acling.getAclList(operationAndData.getData().getPath()), + createMode, + new AsyncCallback.StringCallback() { - trace.commit(); - - if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded ) - { - backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers); - } - else + @Override + public void processResult(int rc, String path, Object ctx, String name) { - sendBackgroundResponse(rc, path, ctx, name, operationAndData); + trace.commit(); + + if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded ) + { + backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers); + } + else + { + sendBackgroundResponse(rc, path, ctx, name, operationAndData); + } } - } - }, - backgrounding.getContext() - ); + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e); + } } private static String getProtectedPrefix(String protectedId) http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java index c3247a1..833904b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java @@ -20,14 +20,7 @@ package org.apache.curator.framework.imps; import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundPathable; -import org.apache.curator.framework.api.BackgroundVersionable; -import org.apache.curator.framework.api.ChildrenDeletable; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.DeleteBuilder; -import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.*; import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; import org.apache.curator.framework.api.transaction.OperationType; import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder; @@ -39,7 +32,7 @@ import org.apache.zookeeper.Op; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> +class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> { private final CuratorFrameworkImpl client; private int version; @@ -99,74 +92,88 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> } @Override - public Pathable<Void> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public Pathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public Pathable<Void> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public Pathable<Void> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override - public Pathable<Void> inBackground() + public ErrorListenerPathable<Void> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public Pathable<Void> inBackground(Object context) + public ErrorListenerPathable<Void> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override + public Pathable<Void> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background"); - client.getZooKeeper().delete - ( - operationAndData.getData(), - version, - new AsyncCallback.VoidCallback() - { - @Override - public void processResult(int rc, String path, Object ctx) + try + { + final TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background"); + client.getZooKeeper().delete + ( + operationAndData.getData(), + version, + new AsyncCallback.VoidCallback() { - trace.commit(); - if ( (rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded ) - { - backgroundDeleteChildrenThenNode(operationAndData); - } - else + @Override + public void processResult(int rc, String path, Object ctx) { - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null); - client.processBackgroundOperation(operationAndData, event); + trace.commit(); + if ( (rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded ) + { + backgroundDeleteChildrenThenNode(operationAndData); + } + else + { + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } } - } - }, - backgrounding.getContext() - ); + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e); + } } private void backgroundDeleteChildrenThenNode(final OperationAndData<String> mainOperationAndData) http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java index d4a059d..5fb7056 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java @@ -20,14 +20,7 @@ package org.apache.curator.framework.imps; import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundPathable; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.ExistsBuilder; -import org.apache.curator.framework.api.ExistsBuilderMain; -import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.*; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; @@ -36,7 +29,7 @@ import org.apache.zookeeper.data.Stat; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String> +class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, ErrorListenerPathable<Stat> { private final CuratorFrameworkImpl client; private Backgrounding backgrounding; @@ -80,68 +73,82 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String> } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override - public Pathable<Stat> inBackground() + public ErrorListenerPathable<Stat> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public Pathable<Stat> inBackground(Object context) + public ErrorListenerPathable<Stat> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override + public Pathable<Stat> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Background"); - AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() + try { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) + final TimeTrace trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Background"); + AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() { - trace.commit(); - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null); - client.processBackgroundOperation(operationAndData, event); + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) + { + trace.commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } + }; + if ( watching.isWatched() ) + { + client.getZooKeeper().exists(operationAndData.getData(), true, callback, backgrounding.getContext()); + } + else + { + client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); } - }; - if ( watching.isWatched() ) - { - client.getZooKeeper().exists(operationAndData.getData(), true, callback, backgrounding.getContext()); } - else + catch ( Throwable e ) { - client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); + backgrounding.checkError(e); } } http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java index 250c2c8..351a8c5 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java @@ -22,8 +22,10 @@ import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.ErrorListenerPathable; import org.apache.curator.framework.api.GetACLBuilder; import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -31,7 +33,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String> +class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String>, ErrorListenerPathable<List<ACL>> { private final CuratorFrameworkImpl client; @@ -46,48 +48,55 @@ class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String> } @Override - public Pathable<List<ACL>> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<List<ACL>> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public Pathable<List<ACL>> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<List<ACL>> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public Pathable<List<ACL>> inBackground() + public ErrorListenerPathable<List<ACL>> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public Pathable<List<ACL>> inBackground(Object context) + public ErrorListenerPathable<List<ACL>> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override - public Pathable<List<ACL>> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<List<ACL>> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public Pathable<List<ACL>> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<List<ACL>> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override + public Pathable<List<ACL>> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public Pathable<List<ACL>> storingStatIn(Stat stat) { responseStat = stat; @@ -97,18 +106,25 @@ class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String> @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("GetACLBuilderImpl-Background"); - AsyncCallback.ACLCallback callback = new AsyncCallback.ACLCallback() + try { - @Override - public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) + final TimeTrace trace = client.getZookeeperClient().startTracer("GetACLBuilderImpl-Background"); + AsyncCallback.ACLCallback callback = new AsyncCallback.ACLCallback() { - trace.commit(); - CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl); - client.processBackgroundOperation(operationAndData, event); - } - }; - client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext()); + @Override + public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) + { + trace.commit(); + CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl); + client.processBackgroundOperation(operationAndData, event); + } + }; + client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext()); + } + catch ( Throwable e ) + { + backgrounding.checkError(e); + } } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java index 16f6d4b..745800d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java @@ -25,8 +25,10 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.BackgroundPathable; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.ErrorListenerPathable; import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.WatchPathable; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.Watcher; @@ -35,12 +37,12 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<String> +class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<String>, ErrorListenerPathable<List<String>> { private final CuratorFrameworkImpl client; private Watching watching; private Backgrounding backgrounding; - private Stat responseStat; + private Stat responseStat; GetChildrenBuilderImpl(CuratorFrameworkImpl client) { @@ -86,48 +88,55 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< } @Override - public Pathable<List<String>> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public Pathable<List<String>> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public Pathable<List<String>> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public Pathable<List<String>> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override - public Pathable<List<String>> inBackground() + public ErrorListenerPathable<List<String>> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public Pathable<List<String>> inBackground(Object context) + public ErrorListenerPathable<List<String>> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override + public Pathable<List<String>> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public BackgroundPathable<List<String>> watched() { watching = new Watching(true); @@ -151,28 +160,35 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("GetChildrenBuilderImpl-Background"); - AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback() + try { - @Override - public void processResult(int rc, String path, Object o, List<String> strings, Stat stat) + final TimeTrace trace = client.getZookeeperClient().startTracer("GetChildrenBuilderImpl-Background"); + AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback() { - trace.commit(); - if ( strings == null ) + @Override + public void processResult(int rc, String path, Object o, List<String> strings, Stat stat) { - strings = Lists.newArrayList(); + trace.commit(); + if ( strings == null ) + { + strings = Lists.newArrayList(); + } + CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null); + client.processBackgroundOperation(operationAndData, event); } - CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null); - client.processBackgroundOperation(operationAndData, event); + }; + if ( watching.isWatched() ) + { + client.getZooKeeper().getChildren(operationAndData.getData(), true, callback, backgrounding.getContext()); + } + else + { + client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); } - }; - if ( watching.isWatched() ) - { - client.getZooKeeper().getChildren(operationAndData.getData(), true, callback, backgrounding.getContext()); } - else + catch ( Throwable e ) { - client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); + backgrounding.checkError(e); } } http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 5a8d16c..94d27ad 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -20,15 +20,7 @@ package org.apache.curator.framework.imps; import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundPathable; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.GetDataBuilder; -import org.apache.curator.framework.api.GetDataWatchBackgroundStatable; -import org.apache.curator.framework.api.Pathable; -import org.apache.curator.framework.api.WatchPathable; +import org.apache.curator.framework.api.*; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; @@ -39,7 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String> +class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, ErrorListenerPathable<byte[]> { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFrameworkImpl client; @@ -64,37 +56,37 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String> return new GetDataWatchBackgroundStatable() { @Override - public Pathable<byte[]> inBackground() + public ErrorListenerPathable<byte[]> inBackground() { return GetDataBuilderImpl.this.inBackground(); } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback, Object context) { return GetDataBuilderImpl.this.inBackground(callback, context); } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) { return GetDataBuilderImpl.this.inBackground(callback, context, executor); } @Override - public Pathable<byte[]> inBackground(Object context) + public ErrorListenerPathable<byte[]> inBackground(Object context) { return GetDataBuilderImpl.this.inBackground(context); } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback) { return GetDataBuilderImpl.this.inBackground(callback); } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback, Executor executor) { return GetDataBuilderImpl.this.inBackground(callback, executor); } @@ -167,48 +159,55 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String> } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public Pathable<byte[]> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override - public Pathable<byte[]> inBackground() + public ErrorListenerPathable<byte[]> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public Pathable<byte[]> inBackground(Object context) + public ErrorListenerPathable<byte[]> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override + public Pathable<byte[]> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public BackgroundPathable<byte[]> watched() { watching = new Watching(true); @@ -232,37 +231,44 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String> @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background"); - AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() + try { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) + final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background"); + AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() { - trace.commit(); - if ( decompress && (data != null) ) + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - try + trace.commit(); + if ( decompress && (data != null) ) { - data = client.getCompressionProvider().decompress(path, data); - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - log.error("Decompressing for path: " + path, e); - rc = KeeperException.Code.DATAINCONSISTENCY.intValue(); + try + { + data = client.getCompressionProvider().decompress(path, data); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.error("Decompressing for path: " + path, e); + rc = KeeperException.Code.DATAINCONSISTENCY.intValue(); + } } + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_DATA, rc, path, null, ctx, stat, data, null, null, null); + client.processBackgroundOperation(operationAndData, event); } - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_DATA, rc, path, null, ctx, stat, data, null, null, null); - client.processBackgroundOperation(operationAndData, event); + }; + if ( watching.isWatched() ) + { + client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext()); + } + else + { + client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); } - }; - if ( watching.isWatched() ) - { - client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext()); } - else + catch ( Throwable e ) { - client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); + backgrounding.checkError(e); } } http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java index f7b2480..5507529 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, BackgroundOperation<String> +class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, BackgroundOperation<String>, ErrorListenerPathable<Stat> { private final CuratorFrameworkImpl client; @@ -63,48 +63,55 @@ class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, Back } @Override - public Pathable<Stat> inBackground() + public ErrorListenerPathable<Stat> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public Pathable<Stat> inBackground(Object context) + public ErrorListenerPathable<Stat> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public Pathable<Stat> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<Stat> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override + public Pathable<Stat> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public Stat forPath(String path) throws Exception { path = client.fixForNamespace(path); @@ -124,26 +131,33 @@ class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, Back @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("SetACLBuilderImpl-Background"); - String path = operationAndData.getData(); - client.getZooKeeper().setACL - ( - path, - acling.getAclList(path), - version, - new AsyncCallback.StatCallback() - { - @SuppressWarnings({"unchecked"}) - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) + try + { + final TimeTrace trace = client.getZookeeperClient().startTracer("SetACLBuilderImpl-Background"); + String path = operationAndData.getData(); + client.getZooKeeper().setACL + ( + path, + acling.getAclList(path), + version, + new AsyncCallback.StatCallback() { - trace.commit(); - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_ACL, rc, path, null, ctx, stat, null, null, null, null); - client.processBackgroundOperation(operationAndData, event); - } - }, - backgrounding.getContext() - ); + @SuppressWarnings({"unchecked"}) + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) + { + trace.commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_ACL, rc, path, null, ctx, stat, null, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e); + } } private Stat pathInForeground(final String path) throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java index 4117930..62e39cf 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java @@ -20,14 +20,7 @@ package org.apache.curator.framework.imps; import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundPathAndBytesable; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.PathAndBytesable; -import org.apache.curator.framework.api.SetDataBackgroundVersionable; -import org.apache.curator.framework.api.SetDataBuilder; -import org.apache.curator.framework.api.VersionPathAndBytesable; +import org.apache.curator.framework.api.*; import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; import org.apache.curator.framework.api.transaction.OperationType; import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder; @@ -38,7 +31,7 @@ import org.apache.zookeeper.data.Stat; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes> +class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<Stat> { private final CuratorFrameworkImpl client; private Backgrounding backgrounding; @@ -99,37 +92,37 @@ class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndB return new SetDataBackgroundVersionable() { @Override - public PathAndBytesable<Stat> inBackground() + public ErrorListenerPathAndBytesable<Stat> inBackground() { return SetDataBuilderImpl.this.inBackground(); } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context) { return SetDataBuilderImpl.this.inBackground(callback, context); } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) { return SetDataBuilderImpl.this.inBackground(callback, context, executor); } @Override - public PathAndBytesable<Stat> inBackground(Object context) + public ErrorListenerPathAndBytesable<Stat> inBackground(Object context) { return SetDataBuilderImpl.this.inBackground(context); } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback) { return SetDataBuilderImpl.this.inBackground(callback); } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback, Executor executor) { return SetDataBuilderImpl.this.inBackground(callback, executor); } @@ -162,69 +155,83 @@ class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndB } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public PathAndBytesable<Stat> inBackground() + public ErrorListenerPathAndBytesable<Stat> inBackground() { backgrounding = new Backgrounding(true); return this; } @Override - public PathAndBytesable<Stat> inBackground(Object context) + public ErrorListenerPathAndBytesable<Stat> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override - public PathAndBytesable<Stat> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override + public PathAndBytesable<Stat> withUnhandledErrorListener(UnhandledErrorListener listener) + { + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } + + @Override public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("SetDataBuilderImpl-Background"); - client.getZooKeeper().setData - ( - operationAndData.getData().getPath(), - operationAndData.getData().getData(), - version, - new AsyncCallback.StatCallback() - { - @SuppressWarnings({"unchecked"}) - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) + try + { + final TimeTrace trace = client.getZookeeperClient().startTracer("SetDataBuilderImpl-Background"); + client.getZooKeeper().setData + ( + operationAndData.getData().getPath(), + operationAndData.getData().getData(), + version, + new AsyncCallback.StatCallback() { - trace.commit(); - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null); - client.processBackgroundOperation(operationAndData, event); - } - }, - backgrounding.getContext() - ); + @SuppressWarnings({"unchecked"}) + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) + { + trace.commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e); + } } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java index 09dfbae..1e5f371 100755 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java @@ -22,12 +22,14 @@ import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.ErrorListenerPathable; import org.apache.curator.framework.api.Pathable; import org.apache.curator.framework.api.SyncBuilder; +import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.zookeeper.AsyncCallback; import java.util.concurrent.Executor; -public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String> +public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> { private final CuratorFrameworkImpl client; private Backgrounding backgrounding = new Backgrounding(); @@ -39,65 +41,79 @@ public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String> } @Override - public Pathable<Void> inBackground() + public ErrorListenerPathable<Void> inBackground() { // NOP always in background return this; } @Override - public Pathable<Void> inBackground(Object context) + public ErrorListenerPathable<Void> inBackground(Object context) { backgrounding = new Backgrounding(context); return this; } @Override - public Pathable<Void> inBackground(BackgroundCallback callback) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback) { backgrounding = new Backgrounding(callback); return this; } @Override - public Pathable<Void> inBackground(BackgroundCallback callback, Object context) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback, Object context) { backgrounding = new Backgrounding(callback, context); return this; } @Override - public Pathable<Void> inBackground(BackgroundCallback callback, Executor executor) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback, Executor executor) { backgrounding = new Backgrounding(client, callback, executor); return this; } @Override - public Pathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor) + public ErrorListenerPathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor) { backgrounding = new Backgrounding(client, callback, context, executor); return this; } @Override - public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception + public Pathable<Void> withUnhandledErrorListener(UnhandledErrorListener listener) { - final TimeTrace trace = client.getZookeeperClient().startTracer("SyncBuilderImpl-Background"); - final String path = operationAndData.getData(); - String adjustedPath = client.fixForNamespace(path); + backgrounding = new Backgrounding(backgrounding, listener); + return this; + } - AsyncCallback.VoidCallback voidCallback = new AsyncCallback.VoidCallback() + @Override + public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception + { + try { - @Override - public void processResult(int rc, String path, Object ctx) + final TimeTrace trace = client.getZookeeperClient().startTracer("SyncBuilderImpl-Background"); + final String path = operationAndData.getData(); + String adjustedPath = client.fixForNamespace(path); + + AsyncCallback.VoidCallback voidCallback = new AsyncCallback.VoidCallback() { - trace.commit(); - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, path, ctx, null, null, null, null, null); - client.processBackgroundOperation(operationAndData, event); - } - }; - client.getZooKeeper().sync(adjustedPath, voidCallback, backgrounding.getContext()); + @Override + public void processResult(int rc, String path, Object ctx) + { + trace.commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, path, ctx, null, null, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } + }; + client.getZooKeeper().sync(adjustedPath, voidCallback, backgrounding.getContext()); + } + catch ( Throwable e ) + { + backgrounding.checkError(e); + } } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/84996801/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java index 26cc941..8e21929 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java @@ -22,6 +22,7 @@ package org.apache.curator.framework.imps; import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; @@ -30,10 +31,10 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.ACL; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Arrays; @@ -46,6 +47,53 @@ import java.util.concurrent.atomic.AtomicReference; public class TestFrameworkBackground extends BaseClassForTests { @Test + public void testErrorListener() throws Exception + { + ACLProvider badAclProvider = new ACLProvider() + { + @Override + public List<ACL> getDefaultAcl() + { + throw new UnsupportedOperationException(); + } + + @Override + public List<ACL> getAclForPath(String path) + { + throw new UnsupportedOperationException(); + } + }; + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .aclProvider(badAclProvider) + .build(); + try + { + client.start(); + + final CountDownLatch errorLatch = new CountDownLatch(1); + UnhandledErrorListener listener = new UnhandledErrorListener() + { + @Override + public void unhandledError(String message, Throwable e) + { + if ( e instanceof UnsupportedOperationException ) + { + errorLatch.countDown(); + } + } + }; + client.create().inBackground().withUnhandledErrorListener(listener).forPath("/foo"); + Assert.assertTrue(new Timing().awaitLatch(errorLatch)); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testListenerConnectedAtStart() throws Exception { server.stop();
