This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-cache in repository https://gitbox.apache.org/repos/asf/curator.git
commit 33f5c6c61903e487d07c5561e1917b5a69672fa9 Author: randgalt <[email protected]> AuthorDate: Wed Oct 2 23:24:14 2019 -0500 Added support for a PersistentWatcher recipe based on new persistent watch APIs --- .../framework/api/AddPersistentWatchBuilder.java | 30 ---- .../framework/api/AddPersistentWatchBuilder2.java | 25 --- .../framework/api/AddPersistentWatchable.java | 40 ----- .../imps/AddPersistentWatchBuilderImpl.java | 169 --------------------- .../framework/imps/CuratorFrameworkImpl.java | 7 - .../framework/recipes/watch/PersistentWatcher.java | 134 ++++++++++++++++ .../recipes/watch/TestPersistentWatcher.java | 86 +++++++++++ ...WatchBuilder.java => AsyncAddWatchBuilder.java} | 8 +- ...lderImpl.java => AsyncAddWatchBuilderImpl.java} | 31 ++-- 9 files changed, 240 insertions(+), 290 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java deleted file mode 100644 index a167174..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.api; - -public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2 -{ - /** - * ZooKeeper persistent watches can optionally be recursive. See - * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)} - * - * @return this - */ - AddPersistentWatchBuilder2 recursive(); -} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java deleted file mode 100644 index 15cea4f..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.api; - -public interface AddPersistentWatchBuilder2 extends - Backgroundable<AddPersistentWatchable<Pathable<Void>>>, - AddPersistentWatchable<Pathable<Void>> -{ -} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java deleted file mode 100644 index faa8906..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.api; - -import org.apache.zookeeper.Watcher; - -public interface AddPersistentWatchable<T> -{ - /** - * Set a watcher for the operation - * - * @param watcher the watcher - * @return this - */ - T usingWatcher(Watcher watcher); - - /** - * Set a watcher for the operation - * - * @param watcher the watcher - * @return this - */ - T usingWatcher(CuratorWatcher watcher); -} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java deleted file mode 100644 index acb70c8..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.imps; - -import org.apache.curator.RetryLoop; -import org.apache.curator.drivers.OperationTrace; -import org.apache.curator.framework.api.AddPersistentWatchBuilder; -import org.apache.curator.framework.api.AddPersistentWatchBuilder2; -import org.apache.curator.framework.api.AddPersistentWatchable; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.Pathable; -import org.apache.zookeeper.Watcher; -import java.util.concurrent.Executor; - -public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String> -{ - private final CuratorFrameworkImpl client; - private Watching watching = null; - private Backgrounding backgrounding = new Backgrounding(); - private boolean recursive = false; - - AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client) - { - this.client = client; - } - - public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive) - { - this.client = client; - this.watching = watching; - this.backgrounding = backgrounding; - this.recursive = recursive; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground() - { - backgrounding = new Backgrounding(); - return this; - } - - @Override - public AddPersistentWatchBuilder2 recursive() - { - recursive = true; - return this; - } - - @Override - public Pathable<Void> usingWatcher(Watcher watcher) - { - watching = new Watching(client, watcher); - return this; - } - - @Override - public Pathable<Void> usingWatcher(CuratorWatcher watcher) - { - watching = new Watching(client, watcher); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(Object context) - { - backgrounding = new Backgrounding(context); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback) - { - backgrounding = new Backgrounding(callback); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context) - { - backgrounding = new Backgrounding(callback, context); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor) - { - backgrounding = new Backgrounding(callback, executor); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor) - { - backgrounding = new Backgrounding(client, callback, context, executor); - return this; - } - - @Override - public Void forPath(String path) throws Exception - { - if ( backgrounding.inBackground() ) - { - client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); - } - else - { - pathInForeground(path); - } - return null; - } - - @Override - public void performBackgroundOperation(final OperationAndData<String> data) throws Exception - { - String path = data.getData(); - String fixedPath = client.fixForNamespace(path); - try - { - final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background"); - client.getZooKeeper().addPersistentWatch - ( - fixedPath, - watching.getWatcher(path), - recursive, (rc, path1, ctx) -> { - trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit(); - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, null, null, null); - client.processBackgroundOperation(data, event); - }, - backgrounding.getContext() - ); - } - catch ( Throwable e ) - { - backgrounding.checkError(e, watching); - } - } - - private void pathInForeground(final String path) throws Exception - { - final String fixedPath = client.fixForNamespace(path); - OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground"); - RetryLoop.callWithRetry - ( - client.getZookeeperClient(), () -> { - client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive); - return null; - }); - trace.setPath(fixedPath).setWithWatcher(true).commit(); - } -} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 6cd3d63..c8ebbb6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -571,13 +571,6 @@ public class CuratorFrameworkImpl implements CuratorFramework return new WatchesBuilderImpl(this); } - @Override - public AddWatchBuilder addWatch() - { - Preconditions.checkState(!isZk34CompatibilityMode(), "Persistent watches APIs are not support when running in ZooKeeper 3.4 compatibility mode"); - return new AddWatchBuilderImpl(this); - } - protected void internalSync(CuratorFrameworkImpl impl, String path, Object context) { BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java new file mode 100644 index 0000000..9f0f497 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.curator.framework.recipes.watch; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A managed persistent watcher. The watch will be managed such that it stays set through + * connection lapses, etc. + */ + public class PersistentWatcher implements Closeable + { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard(); + private final ConnectionStateListener connectionStateListener = (client, newState) -> { + if ( newState.isConnected() ) + { + reset(); + } + }; + private final Watcher watcher = event -> listeners.forEach(w -> w.process(event)); + private final CuratorFramework client; + private final String basePath; + private final boolean recursive; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + /** + * @param client client + * @param basePath path to set the watch on + * @param recursive ZooKeeper persistent watches can optionally be recursive + */ + public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null"); + this.recursive = recursive; + } + + /** + * Start watching + */ + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); + client.getConnectionStateListenable().addListener(connectionStateListener); + reset(); + } + + /** + * Remove the watcher + */ + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + listeners.clear(); + client.getConnectionStateListenable().removeListener(connectionStateListener); + try + { + client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.debug(String.format("Could not remove watcher for path: %s", basePath), e); + } + } + } + + /** + * Container for setting listeners + * + * @return listener container + */ + public Listenable<Watcher> getListenable() + { + return listeners; + } + + private void reset() + { + try + { + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) { + client.runSafe(this::reset); + } + }; + client.watches().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath); + } + catch ( Exception e ) + { + log.error("Could not reset persistent watch at path: " + basePath, e); + } + } + } \ No newline at end of file diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java new file mode 100644 index 0000000..df18de5 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -0,0 +1,86 @@ +package org.apache.curator.framework.recipes.watch; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestPersistentWatcher extends BaseClassForTests +{ + private final Timing2 timing = new Timing2(); + + @Test + public void testConnectionLostRecursive() throws Exception + { + internalTest(true); + } + + @Test + public void testConnectionLost() throws Exception + { + internalTest(false); + } + + private void internalTest(boolean recursive) throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + CountDownLatch lostLatch = new CountDownLatch(1); + CountDownLatch reconnectedLatch = new CountDownLatch(1); + client.start(); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.LOST ) { + lostLatch.countDown(); + } else if ( newState == ConnectionState.RECONNECTED ) { + reconnectedLatch.countDown(); + } + }); + + try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) ) + { + persistentWatcher.start(); + + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + persistentWatcher.getListenable().addListener(events::add); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + if ( recursive ) + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a"); + } + else + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); // child added + } + + server.stop(); + Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + + server.restart(); + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + + timing.sleepABit(); // time to allow watcher to get reset + events.clear(); + + if ( recursive ) + { + client.setData().forPath("/top/main/a", "foo".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged); + } + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + } + } +} diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncAddWatchBuilder.java similarity index 74% rename from curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java rename to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncAddWatchBuilder.java index 0f29233..5d1e8e9 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncAddWatchBuilder.java @@ -18,16 +18,16 @@ */ package org.apache.curator.x.async.api; - import org.apache.curator.framework.api.AddPersistentWatchable; + import org.apache.curator.framework.api.AddWatchable; import org.apache.curator.x.async.AsyncStage; - public interface AsyncPersistentWatchBuilder extends AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> + public interface AsyncAddWatchBuilder extends AddWatchable<AsyncPathable<AsyncStage<Void>>> { /** * ZooKeeper persistent watches can optionally be recursive. See - * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)} + * {@link org.apache.zookeeper.ZooKeeper#addWatch(String, org.apache.zookeeper.Watcher, org.apache.zookeeper.AddWatchMode)} * * @return this */ - AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive(); + AddWatchable<AsyncPathable<AsyncStage<Void>>> recursive(); } \ No newline at end of file diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncAddWatchBuilderImpl.java similarity index 61% rename from curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java rename to curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncAddWatchBuilderImpl.java index 14f3e30..2b57f79 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncAddWatchBuilderImpl.java @@ -18,34 +18,35 @@ */ package org.apache.curator.x.async.details; - import org.apache.curator.framework.api.AddPersistentWatchable; - import org.apache.curator.framework.api.CuratorWatcher; - import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl; - import org.apache.curator.framework.imps.CuratorFrameworkImpl; - import org.apache.curator.framework.imps.Watching; - import org.apache.curator.x.async.AsyncStage; - import org.apache.curator.x.async.api.AsyncPathable; - import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder; - import org.apache.zookeeper.Watcher; +import org.apache.curator.framework.api.AddWatchable; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.imps.AddWatchBuilderImpl; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.imps.Watching; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.api.AsyncAddWatchBuilder; +import org.apache.curator.x.async.api.AsyncPathable; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; - import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; - import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; +import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; +import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; - class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>> + class AsyncAddWatchBuilderImpl implements AsyncAddWatchBuilder, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>> { private final CuratorFrameworkImpl client; private final Filters filters; private Watching watching = null; private boolean recursive = false; - AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters) + AsyncAddWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; this.filters = filters; } @Override - public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive() + public AddWatchable<AsyncPathable<AsyncStage<Void>>> recursive() { recursive = true; return this; @@ -69,7 +70,7 @@ public AsyncStage<Void> forPath(String path) { BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); - AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive); + AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT); return safeCall(common.internalCallback, () -> builder.forPath(path)); } } \ No newline at end of file
