This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-cache in repository https://gitbox.apache.org/repos/asf/curator.git
commit 6da56a845d0e84a741e5df4a10b6bb2c8674217d Author: randgalt <[email protected]> AuthorDate: Wed Oct 2 20:09:32 2019 -0500 Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs. --- .../apache/curator/framework/CuratorFramework.java | 6 + .../framework/api/AddPersistentWatchBuilder.java | 30 ++++ .../framework/api/AddPersistentWatchBuilder2.java | 25 +++ .../framework/api/AddPersistentWatchable.java | 40 +++++ .../imps/AddPersistentWatchBuilderImpl.java | 169 +++++++++++++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 7 + .../x/async/api/AsyncPersistentWatchBuilder.java | 33 ++++ .../details/AsyncPersistentWatchBuilderImpl.java | 75 +++++++++ 8 files changed, 385 insertions(+) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index a803e63..89054a2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -199,6 +199,12 @@ public interface CuratorFramework extends Closeable public WatchesBuilder watches(); /** + * Start an add watch builder + * + * @return builder object + */ + public AddWatchBuilder addWatch(); + /** * Returns the listenable interface for the Connect State * * @return listenable diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java new file mode 100644 index 0000000..a167174 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2 +{ + /** + * ZooKeeper persistent watches can optionally be recursive. See + * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)} + * + * @return this + */ + AddPersistentWatchBuilder2 recursive(); +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java new file mode 100644 index 0000000..15cea4f --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface AddPersistentWatchBuilder2 extends + Backgroundable<AddPersistentWatchable<Pathable<Void>>>, + AddPersistentWatchable<Pathable<Void>> +{ +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java new file mode 100644 index 0000000..faa8906 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +import org.apache.zookeeper.Watcher; + +public interface AddPersistentWatchable<T> +{ + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(Watcher watcher); + + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(CuratorWatcher watcher); +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java new file mode 100644 index 0000000..acb70c8 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.RetryLoop; +import org.apache.curator.drivers.OperationTrace; +import org.apache.curator.framework.api.AddPersistentWatchBuilder; +import org.apache.curator.framework.api.AddPersistentWatchBuilder2; +import org.apache.curator.framework.api.AddPersistentWatchable; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.Pathable; +import org.apache.zookeeper.Watcher; +import java.util.concurrent.Executor; + +public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String> +{ + private final CuratorFrameworkImpl client; + private Watching watching = null; + private Backgrounding backgrounding = new Backgrounding(); + private boolean recursive = false; + + AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client) + { + this.client = client; + } + + public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive) + { + this.client = client; + this.watching = watching; + this.backgrounding = backgrounding; + this.recursive = recursive; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground() + { + backgrounding = new Backgrounding(); + return this; + } + + @Override + public AddPersistentWatchBuilder2 recursive() + { + recursive = true; + return this; + } + + @Override + public Pathable<Void> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public Pathable<Void> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(Object context) + { + backgrounding = new Backgrounding(context); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback) + { + backgrounding = new Backgrounding(callback); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context) + { + backgrounding = new Backgrounding(callback, context); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor) + { + backgrounding = new Backgrounding(callback, executor); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; + } + + @Override + public Void forPath(String path) throws Exception + { + if ( backgrounding.inBackground() ) + { + client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); + } + else + { + pathInForeground(path); + } + return null; + } + + @Override + public void performBackgroundOperation(final OperationAndData<String> data) throws Exception + { + String path = data.getData(); + String fixedPath = client.fixForNamespace(path); + try + { + final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background"); + client.getZooKeeper().addPersistentWatch + ( + fixedPath, + watching.getWatcher(path), + recursive, (rc, path1, ctx) -> { + trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, null, null, null); + client.processBackgroundOperation(data, event); + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e, watching); + } + } + + private void pathInForeground(final String path) throws Exception + { + final String fixedPath = client.fixForNamespace(path); + OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground"); + RetryLoop.callWithRetry + ( + client.getZookeeperClient(), () -> { + client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive); + return null; + }); + trace.setPath(fixedPath).setWithWatcher(true).commit(); + } +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index c8ebbb6..6cd3d63 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -571,6 +571,13 @@ public class CuratorFrameworkImpl implements CuratorFramework return new WatchesBuilderImpl(this); } + @Override + public AddWatchBuilder addWatch() + { + Preconditions.checkState(!isZk34CompatibilityMode(), "Persistent watches APIs are not support when running in ZooKeeper 3.4 compatibility mode"); + return new AddWatchBuilderImpl(this); + } + protected void internalSync(CuratorFrameworkImpl impl, String path, Object context) { BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java new file mode 100644 index 0000000..0f29233 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.curator.x.async.api; + + import org.apache.curator.framework.api.AddPersistentWatchable; + import org.apache.curator.x.async.AsyncStage; + + public interface AsyncPersistentWatchBuilder extends AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> + { + /** + * ZooKeeper persistent watches can optionally be recursive. See + * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)} + * + * @return this + */ + AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive(); + } \ No newline at end of file diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java new file mode 100644 index 0000000..14f3e30 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.curator.x.async.details; + + import org.apache.curator.framework.api.AddPersistentWatchable; + import org.apache.curator.framework.api.CuratorWatcher; + import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl; + import org.apache.curator.framework.imps.CuratorFrameworkImpl; + import org.apache.curator.framework.imps.Watching; + import org.apache.curator.x.async.AsyncStage; + import org.apache.curator.x.async.api.AsyncPathable; + import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder; + import org.apache.zookeeper.Watcher; + + import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; + import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + + class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>> + { + private final CuratorFrameworkImpl client; + private final Filters filters; + private Watching watching = null; + private boolean recursive = false; + + AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters) + { + this.client = client; + this.filters = filters; + } + + @Override + public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive() + { + recursive = true; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncStage<Void> forPath(String path) + { + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); + AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive); + return safeCall(common.internalCallback, () -> builder.forPath(path)); + } + } \ No newline at end of file
