Repository: curator Updated Branches: refs/heads/CURATOR-3.0 0b40d210a -> e76eb590f
If there's an exeption, retries exceeded, etc., the watcher was still being stored in WatcherRemovealManager thus causing it to be incorrectly removed. This fix removes the watch from the WatcherRemovalmanager on exceptions, retries exceeded, etc. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b1d2198a Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b1d2198a Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b1d2198a Branch: refs/heads/CURATOR-3.0 Commit: b1d2198a75d6e01b48789635ba648bedc542c439 Parents: f7ef2f1 Author: randgalt <randg...@apache.org> Authored: Wed Apr 20 20:12:31 2016 -0500 Committer: randgalt <randg...@apache.org> Committed: Wed Apr 20 20:12:31 2016 -0500 ---------------------------------------------------------------------- .../curator/framework/imps/Backgrounding.java | 7 +- .../imps/ClassicInternalConnectionHandler.java | 2 +- .../framework/imps/CreateBuilderImpl.java | 9 +- .../framework/imps/CuratorFrameworkImpl.java | 4 +- .../imps/CuratorMultiTransactionImpl.java | 4 +- .../framework/imps/DeleteBuilderImpl.java | 6 +- .../framework/imps/ExistsBuilderImpl.java | 20 +-- .../FindAndDeleteProtectedNodeInBackground.java | 2 +- .../framework/imps/GetACLBuilderImpl.java | 4 +- .../framework/imps/GetChildrenBuilderImpl.java | 21 ++- .../framework/imps/GetConfigBuilderImpl.java | 21 ++- .../framework/imps/GetDataBuilderImpl.java | 21 ++- .../framework/imps/OperationAndData.java | 16 ++- .../framework/imps/ReconfigBuilderImpl.java | 4 +- .../imps/RemoveWatchesBuilderImpl.java | 4 +- .../framework/imps/SetACLBuilderImpl.java | 4 +- .../framework/imps/SetDataBuilderImpl.java | 4 +- .../curator/framework/imps/SyncBuilderImpl.java | 4 +- .../apache/curator/framework/imps/Watching.java | 51 ++++++- .../imps/TestWatcherRemovalManager.java | 142 +++++++++++++++++++ 20 files changed, 273 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java index dac06c2..0b823c4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java @@ -112,10 +112,15 @@ class Backgrounding return callback; } - void checkError(Throwable e) throws Exception + void checkError(Throwable e, Watching watching) throws Exception { if ( e != null ) { + if ( watching != null ) + { + watching.resetCurrentWatcher(); + } + if ( errorListener != null ) { errorListener.unhandledError("n/a", e); http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java index 63ba665..90a8a24 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java @@ -65,6 +65,6 @@ class ClassicInternalConnectionHandler implements InternalConnectionHandler } } }; - client.performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null)); + client.performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null, null)); } } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index ace163b..9e8f7e6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -613,7 +613,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } @@ -714,7 +714,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt client.queueOperation(mainOperationAndData); } }; - OperationAndData<T> parentOperation = new OperationAndData<T>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext()); + OperationAndData<T> parentOperation = new OperationAndData<T>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null); client.queueOperation(parentOperation); } @@ -751,7 +751,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt client.queueOperation(mainOperationAndData); } }; - client.queueOperation(new OperationAndData<>(operation, null, null, null, null)); + client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null)); } private void sendBackgroundResponse(int rc, String path, Object ctx, String name, Stat stat, OperationAndData<PathAndBytes> operationAndData) @@ -974,7 +974,8 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt } } }, - backgrounding.getContext()) + backgrounding.getContext(), + null) { @Override void callPerformBackgroundOperation() throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 191c50a..4e1aefc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -533,7 +533,7 @@ public class CuratorFrameworkImpl implements CuratorFramework protected void internalSync(CuratorFrameworkImpl impl, String path, Object context) { BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context); - performBackgroundOperation(new OperationAndData<String>(operation, path, null, null, context)); + performBackgroundOperation(new OperationAndData<String>(operation, path, null, null, context, null)); } @Override @@ -898,6 +898,7 @@ public class CuratorFrameworkImpl implements CuratorFramework void performBackgroundOperation(OperationAndData<?> operationAndData) { + operationAndData.resetCurrentWatcher(); try { if ( !operationAndData.isConnectionRequired() || client.isConnected() ) @@ -917,6 +918,7 @@ public class CuratorFrameworkImpl implements CuratorFramework } catch ( Throwable e ) { + operationAndData.resetCurrentWatcher(); ThreadUtils.checkInterrupted(e); /** http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java index 528fe6f..da37a06 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java @@ -124,7 +124,7 @@ public class CuratorMultiTransactionImpl implements if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<>(this, record, backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<>(this, record, backgrounding.getCallback(), null, backgrounding.getContext(), null), null); return null; } else @@ -154,7 +154,7 @@ public class CuratorMultiTransactionImpl implements } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java index 21c5cd8..678b0cb 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java @@ -184,7 +184,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, E } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } @@ -206,7 +206,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, E client.queueOperation(mainOperationAndData); } }; - OperationAndData<String> parentOperation = new OperationAndData<String>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext()); + OperationAndData<String> parentOperation = new OperationAndData<String>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null); client.queueOperation(parentOperation); } @@ -230,7 +230,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, E } }; } - client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext(), null), null); } else { http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java index 7f55cf7..cead168 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java @@ -40,7 +40,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E { this.client = client; backgrounding = new Backgrounding(); - watching = new Watching(); + watching = new Watching(client); createParentContainersIfNeeded = false; } @@ -54,21 +54,21 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E @Override public BackgroundPathable<Stat> watched() { - watching = new Watching(true); + watching = new Watching(client, true); return this; } @Override public BackgroundPathable<Stat> usingWatcher(Watcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return this; } @Override public BackgroundPathable<Stat> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return this; } @@ -132,6 +132,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E @Override public void processResult(int rc, String path, Object ctx, Stat stat) { + watching.checkBackroundRc(rc); trace.commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null); client.processBackgroundOperation(operationAndData, event); @@ -143,12 +144,12 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E } else { - client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext()); + client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext()); } } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, watching); } } @@ -160,7 +161,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E Stat returnStat = null; if ( backgrounding.inBackground() ) { - OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()); + OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching); if ( createParentContainersIfNeeded ) { CreateBuilderImpl.backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData(), backgrounding, true); @@ -215,9 +216,8 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E private Stat pathInForegroundStandard(final String path) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground"); - Stat returnStat = RetryLoop.callWithRetry + Stat returnStat = watching.callWithRetry ( - client.getZookeeperClient(), new Callable<Stat>() { @Override @@ -230,7 +230,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E } else { - returnStat = client.getZooKeeper().exists(path, watching.getWatcher(client, path)); + returnStat = client.getZooKeeper().exists(path, watching.getWatcher(path)); } return returnStat; } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java index 608a005..208b7b7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java @@ -56,7 +56,7 @@ class FindAndDeleteProtectedNodeInBackground implements BackgroundOperation<Void client.processBackgroundOperation(operationAndData, null); } }; - OperationAndData<Void> operationAndData = new OperationAndData<Void>(this, null, null, errorCallback, null); + OperationAndData<Void> operationAndData = new OperationAndData<Void>(this, null, null, errorCallback, null, null); client.processBackgroundOperation(operationAndData, null); } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java index fa02740..7b313cf 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java @@ -123,7 +123,7 @@ class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String>, E } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } @@ -135,7 +135,7 @@ class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String>, E List<ACL> result = null; if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), null), null); } else { http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java index bc9cfc6..8c7efa4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java @@ -19,7 +19,6 @@ package org.apache.curator.framework.imps; import com.google.common.collect.Lists; -import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.BackgroundPathable; @@ -47,7 +46,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< GetChildrenBuilderImpl(CuratorFrameworkImpl client) { this.client = client; - watching = new Watching(); + watching = new Watching(client); backgrounding = new Backgrounding(); responseStat = null; } @@ -139,21 +138,21 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< @Override public BackgroundPathable<List<String>> watched() { - watching = new Watching(true); + watching = new Watching(client, true); return this; } @Override public BackgroundPathable<List<String>> usingWatcher(Watcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return this; } @Override public BackgroundPathable<List<String>> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return this; } @@ -168,6 +167,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< @Override public void processResult(int rc, String path, Object o, List<String> strings, Stat stat) { + watching.checkBackroundRc(rc); trace.commit(); if ( strings == null ) { @@ -183,12 +183,12 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< } else { - client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext()); + client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext()); } } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, watching); } } @@ -200,7 +200,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< List<String> children = null; if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); } else { @@ -212,9 +212,8 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< private List<String> pathInForeground(final String path) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("GetChildrenBuilderImpl-Foreground"); - List<String> children = RetryLoop.callWithRetry + List<String> children = watching.callWithRetry ( - client.getZookeeperClient(), new Callable<List<String>>() { @Override @@ -227,7 +226,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< } else { - children = client.getZooKeeper().getChildren(path, watching.getWatcher(client, path), responseStat); + children = client.getZooKeeper().getChildren(path, watching.getWatcher(path), responseStat); } return children; } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index 2ba4d71..3a210b8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -19,7 +19,6 @@ package org.apache.curator.framework.imps; -import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.zookeeper.AsyncCallback; @@ -41,7 +40,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati { this.client = client; backgrounding = new Backgrounding(); - watching = new Watching(); + watching = new Watching(client); } @Override @@ -115,21 +114,21 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati @Override public BackgroundEnsembleable<byte[]> watched() { - watching = new Watching(true); + watching = new Watching(client, true); return new InternalBackgroundEnsembleable(); } @Override public BackgroundEnsembleable<byte[]> usingWatcher(Watcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return new InternalBackgroundEnsembleable(); } @Override public BackgroundEnsembleable<byte[]> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return new InternalBackgroundEnsembleable(); } @@ -187,7 +186,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati { if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<Void>(this, null, backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<Void>(this, null, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); return null; } else @@ -207,6 +206,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + watching.checkBackroundRc(rc); trace.commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null); client.processBackgroundOperation(operationAndData, event); @@ -218,12 +218,12 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati } else { - client.getZooKeeper().getConfig(watching.getWatcher(client, ZooDefs.CONFIG_NODE), callback, backgrounding.getContext()); + client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), callback, backgrounding.getContext()); } } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, watching); } } @@ -232,9 +232,8 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground"); try { - return RetryLoop.callWithRetry + return watching.callWithRetry ( - client.getZookeeperClient(), new Callable<byte[]>() { @Override @@ -244,7 +243,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati { return client.getZooKeeper().getConfig(true, stat); } - return client.getZooKeeper().getConfig(watching.getWatcher(client, ZooDefs.CONFIG_NODE), stat); + return client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), stat); } } ); http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 72103b9..d937d00 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -18,7 +18,6 @@ */ package org.apache.curator.framework.imps; -import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.curator.utils.ThreadUtils; @@ -44,7 +43,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, { this.client = client; responseStat = null; - watching = new Watching(); + watching = new Watching(client); backgrounding = new Backgrounding(); decompress = false; } @@ -210,21 +209,21 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, @Override public BackgroundPathable<byte[]> watched() { - watching = new Watching(true); + watching = new Watching(client, true); return this; } @Override public BackgroundPathable<byte[]> usingWatcher(Watcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return this; } @Override public BackgroundPathable<byte[]> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(watcher); + watching = new Watching(client, watcher); return this; } @@ -239,6 +238,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + watching.checkBackroundRc(rc); trace.commit(); if ( decompress && (data != null) ) { @@ -263,12 +263,12 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, } else { - client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext()); + client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext()); } } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, watching); } } @@ -280,7 +280,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, byte[] responseData = null; if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); } else { @@ -292,9 +292,8 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, private byte[] pathInForeground(final String path) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Foreground"); - byte[] responseData = RetryLoop.callWithRetry + byte[] responseData = watching.callWithRetry ( - client.getZookeeperClient(), new Callable<byte[]>() { @Override @@ -307,7 +306,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, } else { - responseData = client.getZooKeeper().getData(path, watching.getWatcher(client, path), responseStat); + responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat); } return responseData; } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java index 5f7b985..73ea38e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java @@ -41,13 +41,14 @@ class OperationAndData<T> implements Delayed, RetrySleeper private final AtomicLong ordinal = new AtomicLong(); private final Object context; private final boolean connectionRequired; + private final Watching watching; interface ErrorCallback<T> { void retriesExhausted(OperationAndData<T> operationAndData); } - OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired) + OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired, Watching watching) { this.operation = operation; this.data = data; @@ -55,6 +56,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper this.errorCallback = errorCallback; this.context = context; this.connectionRequired = connectionRequired; + this.watching = watching; reset(); } @@ -64,9 +66,9 @@ class OperationAndData<T> implements Delayed, RetrySleeper ordinal.set(nextOrdinal.getAndIncrement()); } - OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context) + OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, Watching watching) { - this(operation, data, callback, errorCallback, context, true); + this(operation, data, callback, errorCallback, context, true, watching); } Object getContext() @@ -115,6 +117,14 @@ class OperationAndData<T> implements Delayed, RetrySleeper return operation; } + void resetCurrentWatcher() + { + if ( watching != null ) + { + watching.resetCurrentWatcher(); + } + } + @Override public void sleepFor(long time, TimeUnit unit) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 74683de..df00785 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -51,7 +51,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation { if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<>(this, null, backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<>(this, null, backgrounding.getCallback(), null, backgrounding.getContext(), null), null); return new byte[0]; } else @@ -261,7 +261,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index 3d4e96d..c2d4d8e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -209,7 +209,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat } client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), - errorCallback, backgrounding.getContext(), !local), null); + errorCallback, backgrounding.getContext(), !local, null), null); } private void pathInForeground(final String path) throws Exception @@ -329,7 +329,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java index 312071c..5ab353a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java @@ -119,7 +119,7 @@ class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, Back Stat resultStat = null; if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), null), null); } else { @@ -156,7 +156,7 @@ class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, Back } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java index e75377d..ee51b9e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java @@ -228,7 +228,7 @@ class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndB } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } @@ -251,7 +251,7 @@ class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndB Stat resultStat = null; if ( backgrounding.inBackground() ) { - client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()), null); + client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext(), null), null); } else { http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java index 51b4e04..1483ae6 100755 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java @@ -112,14 +112,14 @@ public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String> } catch ( Throwable e ) { - backgrounding.checkError(e); + backgrounding.checkError(e, null); } } @Override public Void forPath(String path) throws Exception { - OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()); + OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), null); client.processBackgroundOperation(operationAndData, null); return null; } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index 27d0a7c..2058c3b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -19,46 +19,55 @@ package org.apache.curator.framework.imps; +import org.apache.curator.RetryLoop; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; +import java.util.concurrent.Callable; class Watching { private final Watcher watcher; private final CuratorWatcher curatorWatcher; private final boolean watched; + private final CuratorFrameworkImpl client; + private NamespaceWatcher namespaceWatcher; - Watching(boolean watched) + Watching(CuratorFrameworkImpl client, boolean watched) { + this.client = client; this.watcher = null; this.curatorWatcher = null; this.watched = watched; } - Watching(Watcher watcher) + Watching(CuratorFrameworkImpl client, Watcher watcher) { + this.client = client; this.watcher = watcher; this.curatorWatcher = null; this.watched = false; } - Watching(CuratorWatcher watcher) + Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { + this.client = client; this.watcher = null; this.curatorWatcher = watcher; this.watched = false; } - Watching() + Watching(CuratorFrameworkImpl client) { + this.client = client; watcher = null; watched = false; curatorWatcher = null; } - Watcher getWatcher(CuratorFrameworkImpl client, String unfixedPath) + Watcher getWatcher(String unfixedPath) { - NamespaceWatcher namespaceWatcher = null; + namespaceWatcher = null; if ( watcher != null ) { namespaceWatcher = new NamespaceWatcher(client, this.watcher, unfixedPath); @@ -83,4 +92,34 @@ class Watching { return watched; } + + <T> T callWithRetry(Callable<T> proc) throws Exception + { + resetCurrentWatcher(); + try + { + return RetryLoop.callWithRetry(client.getZookeeperClient(), proc); + } + catch ( Exception e ) + { + resetCurrentWatcher(); + throw e; + } + } + + void resetCurrentWatcher() + { + if ( (namespaceWatcher != null) && (client.getWatcherRemovalManager() != null) ) + { + client.getWatcherRemovalManager().noteTriggeredWatcher(namespaceWatcher); + } + } + + void checkBackroundRc(int rc) + { + if ( rc != KeeperException.Code.OK.intValue() ) + { + resetCurrentWatcher(); + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/b1d2198a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java index e20c450..cdb625d 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java @@ -21,11 +21,14 @@ package org.apache.curator.framework.imps; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.WatcherRemoveCuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; import org.apache.curator.test.WatchersDebug; import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.testng.Assert; @@ -36,6 +39,145 @@ import java.util.concurrent.CountDownLatch; public class TestWatcherRemovalManager extends BaseClassForTests { @Test + public void testWithRetry() throws Exception + { + server.stop(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + Watcher w = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + try + { + removerClient.checkExists().usingWatcher(w).forPath("/one/two/three"); + Assert.fail("Should have thrown ConnectionLossException"); + } + catch ( KeeperException.ConnectionLossException expected ) + { + // expected + } + Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testWithRetryInBackground() throws Exception + { + server.stop(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + Watcher w = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + + final CountDownLatch latch = new CountDownLatch(1); + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + latch.countDown(); + } + }; + removerClient.checkExists().usingWatcher(w).inBackground(callback).forPath("/one/two/three"); + Assert.assertTrue(new Timing().awaitLatch(latch)); + Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testMissingNode() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + Watcher w = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + try + { + removerClient.getData().usingWatcher(w).forPath("/one/two/three"); + Assert.fail("Should have thrown NoNodeException"); + } + catch ( KeeperException.NoNodeException expected ) + { + // expected + } + Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testMissingNodeInBackground() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + Watcher w = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + final CountDownLatch latch = new CountDownLatch(1); + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + latch.countDown(); + } + }; + removerClient.getData().usingWatcher(w).inBackground(callback).forPath("/one/two/three"); + Assert.assertTrue(new Timing().awaitLatch(latch)); + Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testBasic() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));