[CURATOR-160] Add builders and dsl for ZooKeeper's config and reconfig methods.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b00cecf4 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b00cecf4 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b00cecf4 Branch: refs/heads/CURATOR-215 Commit: b00cecf4902aeffdba05d9e09d4e9210c5ad80ab Parents: 2bd53fc Author: Ioannis Canellos <ioca...@gmail.com> Authored: Thu Nov 6 17:34:47 2014 +0200 Committer: Scott Blum <dragonsi...@apache.org> Committed: Wed Aug 12 17:07:31 2015 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/curator/RetryLoop.java | 3 +- .../api/BackgroundStatConfigurable.java | 24 +++ .../framework/api/BackgroundStatable.java | 24 +++ .../curator/framework/api/Configurable.java | 29 +++ .../curator/framework/api/CuratorEventType.java | 10 + .../curator/framework/api/Ensembleable.java | 24 +++ .../curator/framework/api/GetConfigBuilder.java | 6 +- .../api/JoinBackgroundStatConfigurable.java | 30 +++ .../apache/curator/framework/api/Joinable.java | 15 +- .../api/LeaveBackgroundStatConfigurable.java | 30 +++ .../apache/curator/framework/api/Leaveable.java | 14 +- .../curator/framework/api/Memberable.java | 13 +- .../curator/framework/api/ReconfigBuilder.java | 6 +- .../imps/EnsembleServersAndConfig.java | 52 +++++ .../framework/imps/GetConfigBuilderImpl.java | 165 +++++++++++--- .../framework/imps/ReconfigBuilderImpl.java | 216 +++++++++++-------- .../framework/imps/TestReconfiguration.java | 176 +++++++-------- .../org/apache/curator/test/InstanceSpec.java | 4 + .../org/apache/curator/test/TestingCluster.java | 1 + 19 files changed, 594 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-client/src/main/java/org/apache/curator/RetryLoop.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java index 6b66e82..065ebef 100644 --- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java +++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java @@ -150,7 +150,8 @@ public class RetryLoop return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) || (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) || (rc == KeeperException.Code.SESSIONMOVED.intValue()) || - (rc == KeeperException.Code.SESSIONEXPIRED.intValue()); + (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) || + (rc == KeeperException.Code.NEWCONFIGNOQUORUM.intValue()); } /** http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatConfigurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatConfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatConfigurable.java new file mode 100644 index 0000000..e46ba89 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatConfigurable.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface BackgroundStatConfigurable<T> extends + BackgroundStatable<Configurable<T>>, + Configurable<T> { +} http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java new file mode 100644 index 0000000..77c4e96 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface BackgroundStatable<T> extends + Backgroundable<T>, + Statable<T> { +} http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java new file mode 100644 index 0000000..a47f9d0 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface Configurable<T> { + + /** + * Sets the configuration version to use. + * @param config The version of the configuration. + * @throws Exception + */ + Ensembleable<T> fromConfig(long config) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java index 684d11b..50e9195 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java @@ -69,6 +69,16 @@ public enum CuratorEventType SET_ACL, /** + * Corresponds to {@link CuratorFramework#getConfig()} + */ + GET_CONFIG, + + /** + * Corresponds to {@link CuratorFramework#reconfig()} + */ + RECONFIG, + + /** * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()} */ WATCHED, http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/Ensembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Ensembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Ensembleable.java new file mode 100644 index 0000000..c8a82fe --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Ensembleable.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface Ensembleable<T> { + + T forEnsemble() throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java index c7c013b..c2fdf6c 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java @@ -19,9 +19,9 @@ package org.apache.curator.framework.api; public interface GetConfigBuilder extends - Watchable<GetConfigBuilder>, - DataCallbackable<Void>, - Statable<byte[]> { + Watchable<BackgroundStatable<Ensembleable<byte[]>>>, + BackgroundStatable<Ensembleable<byte[]>>, + Ensembleable<byte[]> { } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/JoinBackgroundStatConfigurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinBackgroundStatConfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinBackgroundStatConfigurable.java new file mode 100644 index 0000000..fb18c0c --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinBackgroundStatConfigurable.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +/** + * An incremental reconfiguration builder. + * This builder has access only to the incremental reconfiguration methods joining and leaving, so that we prevent + * mixing concepts that can't be used together. + */ +public interface JoinBackgroundStatConfigurable extends + Joinable<BackgroundStatConfigurable<byte[]>>, + BackgroundStatConfigurable<byte[]> { + +} http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java index ff1b3c5..dde5b1c 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java @@ -18,23 +18,14 @@ */ package org.apache.curator.framework.api; -import java.util.Collection; - public interface Joinable<T> { /** - * Adds a server to join the ensemble. + * Adds one or more servers to joining the ensemble. * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] - * @param server The server to join. + * @param server The server to joining. * @return this. */ - T join(String server); + T joining(String... server); - /** - * Adds a collection of servers to the ensemble. - * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] - * @param servers The collection of servers to join - * @return this - */ - T join(Collection<String> servers); } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveBackgroundStatConfigurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveBackgroundStatConfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveBackgroundStatConfigurable.java new file mode 100644 index 0000000..196ffca --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveBackgroundStatConfigurable.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +/** + * An non-incremental reconfiguration builder. + * This builder has access only to the non-incremental reconfiguration methods withMembers, so that we prevent + * mixing concepts that can't be used together. + */ +public interface LeaveBackgroundStatConfigurable extends + Leaveable<BackgroundStatConfigurable<byte[]>>, + BackgroundStatConfigurable<byte[]> { + +} http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java index 8560d65..a3c3358 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java @@ -18,21 +18,13 @@ */ package org.apache.curator.framework.api; -import java.util.Collection; - public interface Leaveable<T> { /** - * Sets a server to leave the ensemble. - * @param server The server id. + * Sets one or more servers to leaving the ensemble. + * @param server The server ids. * @return this */ - T leave(String server); + T leaving(String... server); - /** - * Sets a collection of servers to leave the ensemble. - * @param servers The collection of server ids. - * @return this. - */ - T leave(Collection<String> servers); } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java index 5b62dba..6ef54c1 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java @@ -18,23 +18,14 @@ */ package org.apache.curator.framework.api; -import java.util.Collection; public interface Memberable<T> { /** - * Sets a member that is meant to be part of the ensemble. + * Sets one or more members that are meant to be part of the ensemble. * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] * @param server The server to add as a member of the ensemble. * @return this. */ - T withMember(String server); - - /** - * Sets a collection of members member that is meant to be part of the ensemble. - * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] - * @param servers The collection of server to set as a members of the ensemble. - * @return this. - */ - T withMembers(Collection<String> servers); + T withMembers(String... server); } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java index 0e420a1..96ebdf7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java @@ -19,8 +19,8 @@ package org.apache.curator.framework.api; public interface ReconfigBuilder extends - Joinable<IncrementalReconfigBuilder<byte[]>>, - Leaveable<IncrementalReconfigBuilder<byte[]>>, - Memberable<NonIncrementalReconfigBuilder<byte[]>> { + Joinable<LeaveBackgroundStatConfigurable>, + Leaveable<JoinBackgroundStatConfigurable>, + Memberable<BackgroundStatConfigurable<byte[]>> { } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleServersAndConfig.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleServersAndConfig.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleServersAndConfig.java new file mode 100644 index 0000000..df78aa7 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleServersAndConfig.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import java.util.Collections; +import java.util.List; + +class EnsembleServersAndConfig { + private final List<String> joiningServers; + private final List<String> leavingServers; + private final List<String> members; + private final long config; + + EnsembleServersAndConfig(List<String> joiningServers, List<String> leavingServers, List<String> members, long config) { + this.joiningServers = joiningServers.isEmpty() ? null : Collections.unmodifiableList(joiningServers); + this.leavingServers = leavingServers.isEmpty() ? null : Collections.unmodifiableList(leavingServers); + this.members = members.isEmpty() ? null : Collections.unmodifiableList(members); + this.config = config; + } + + public List<String> getJoiningServers() { + return joiningServers; + } + + public List<String> getLeavingServers() { + return leavingServers; + } + + public List<String> getMembers() { + return members; + } + + public long getConfig() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index a56894d..54b1862 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -18,63 +18,170 @@ */ package org.apache.curator.framework.imps; +import org.apache.curator.RetryLoop; +import org.apache.curator.TimeTrace; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.BackgroundStatable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.Ensembleable; import org.apache.curator.framework.api.GetConfigBuilder; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; -public class GetConfigBuilderImpl implements GetConfigBuilder { +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; + +public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperation<Void> { private final CuratorFrameworkImpl client; - private boolean watched; - private Watcher watcher; + + private Backgrounding backgrounding; + private Watching watching; + private Stat stat; public GetConfigBuilderImpl(CuratorFrameworkImpl client) { this.client = client; + backgrounding = new Backgrounding(); + watching = new Watching(); } @Override - public Void usingDataCallback(AsyncCallback.DataCallback callback, Object ctx) { - try { - if (watcher != null) { - client.getZooKeeper().getConfig(watcher, callback, ctx); - } else { - client.getZooKeeper().getConfig(watched, callback, ctx); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return null; + public Ensembleable<byte[]> storingStatIn(Stat stat) { + this.stat = new Stat(); + return this; + } + + @Override + public BackgroundStatable<Ensembleable<byte[]>> watched() { + watching = new Watching(true); + return this; + } + + @Override + public GetConfigBuilder usingWatcher(Watcher watcher) { + watching = new Watching(client, watcher); + return this; + } + + @Override + public GetConfigBuilder usingWatcher(final CuratorWatcher watcher) { + watching = new Watching(client, watcher); + return this; + } + + @Override + public Ensembleable<byte[]> inBackground() { + backgrounding = new Backgrounding(); + return this; + } + + @Override + public Ensembleable<byte[]> inBackground(Object context) { + backgrounding = new Backgrounding(context); + return this; + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback) { + backgrounding = new Backgrounding(callback); + return this; } @Override - public byte[] storingStatIn(Stat stat) { + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context) { + backgrounding = new Backgrounding(callback, context); + return this; + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Executor executor) { + backgrounding = new Backgrounding(callback, executor); + return this; + } + + @Override + public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; + } + + private void performBackgroundOperation() { try { - if (watcher != null) { - return client.getZooKeeper().getConfig(watcher, stat); - } else { - return client.getZooKeeper().getConfig(watched, stat); - } + client.getZooKeeper().getConfig(watching.getWatcher(), + new AsyncCallback.DataCallback() { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + try { + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, + rc, path, null, ctx, stat, data, null, null, null); + backgrounding.getCallback().processResult(client, event); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }, backgrounding.getContext()); } catch (Exception e) { throw new RuntimeException(e); } } @Override - public GetConfigBuilder watched() { - this.watched = true; - return this; + public byte[] forEnsemble() throws Exception { + byte[] responseData = null; + if ( backgrounding.inBackground() ) + { + client.processBackgroundOperation(new OperationAndData<Void>(this, null, backgrounding.getCallback(), null, backgrounding.getContext()), null); + } + else + { + responseData = configInForeground(); + } + return responseData; } @Override - public GetConfigBuilder usingWatcher(Watcher watcher) { - this.watcher = watcher; - return null; + public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception + { + final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background"); + AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() + { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) + { + trace.commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } + }; + if ( watching.isWatched() ) + { + client.getZooKeeper().getConfig(watching.getWatcher(), callback, backgrounding.getContext()); + } + else + { + client.getZooKeeper().getConfig(false, callback, backgrounding.getContext()); + } } - @Override - public GetConfigBuilder usingWatcher(final CuratorWatcher watcher) { - throw new UnsupportedOperationException("GetConfigBuilder doesn't support CuratorWatcher, please use Watcher instead."); + + private byte[] configInForeground() throws Exception { + TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground"); + try { + return RetryLoop.callWithRetry + ( + client.getZookeeperClient(), + new Callable<byte[]>() { + @Override + public byte[] call() throws Exception { + return client.getZooKeeper().getConfig(watching.getWatcher(), stat); + } + } + ); + } finally { + trace.commit(); + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 7b39be6..7a33297 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -18,165 +18,199 @@ */ package org.apache.curator.framework.imps; -import org.apache.curator.framework.api.AsyncReconfigurable; -import org.apache.curator.framework.api.IncrementalReconfigBuilder; -import org.apache.curator.framework.api.NonIncrementalReconfigBuilder; +import org.apache.curator.RetryLoop; +import org.apache.curator.TimeTrace; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.BackgroundStatConfigurable; +import org.apache.curator.framework.api.Configurable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.Ensembleable; +import org.apache.curator.framework.api.JoinBackgroundStatConfigurable; +import org.apache.curator.framework.api.LeaveBackgroundStatConfigurable; import org.apache.curator.framework.api.ReconfigBuilder; -import org.apache.curator.framework.api.SyncReconfigurable; -import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.data.Stat; -import java.util.Collection; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; public class ReconfigBuilderImpl implements ReconfigBuilder { private final CuratorFrameworkImpl client; - private static class IncrementalReconfigBuilderImpl<T> implements IncrementalReconfigBuilder<T> { + public ReconfigBuilderImpl(CuratorFrameworkImpl client) { + this.client = client; + } - private final CuratorFrameworkImpl client; - private List<String> joiningServers = new LinkedList<String>(); - private List<String> leavingServers = new LinkedList<String>(); + private static class ReconfigBuilderBase implements BackgroundStatConfigurable<byte[]>, Ensembleable<byte[]>, BackgroundOperation<EnsembleServersAndConfig> { - private IncrementalReconfigBuilderImpl(CuratorFrameworkImpl client) { + final CuratorFrameworkImpl client; + final List<String> joiningServers = new LinkedList<String>(); + final List<String> leavingServers = new LinkedList<String>(); + final List<String> members = new LinkedList<String>(); + Backgrounding backgrounding; + Stat stat; + long config; + + private ReconfigBuilderBase(CuratorFrameworkImpl client) { this.client = client; + backgrounding = new Backgrounding(); } @Override - public IncrementalReconfigBuilderImpl<T> join(String server) { - joiningServers.add(server); + public Configurable<byte[]> inBackground() { + backgrounding = new Backgrounding(); return this; } @Override - public IncrementalReconfigBuilder<T> join(Collection<String> servers) { - joiningServers.addAll(servers); + public Configurable<byte[]> inBackground(Object context) { + backgrounding = new Backgrounding(context); return this; } @Override - public IncrementalReconfigBuilderImpl<T> leave(String server) { - leavingServers.add(server); + public Configurable<byte[]> inBackground(BackgroundCallback callback) { + backgrounding = new Backgrounding(callback); return this; } @Override - public IncrementalReconfigBuilder<T> leave(Collection<String> servers) { - leavingServers.addAll(servers); + public Configurable<byte[]> inBackground(BackgroundCallback callback, Object context) { + backgrounding = new Backgrounding(callback, context); return this; } @Override - public SyncReconfigurable storingStatIn(final Stat stat) { - return new SyncReconfigurable() { - @Override - public byte[] fromConfig(long config) throws Exception { - return client - .getZooKeeper() - .reconfig(joiningServers.isEmpty() ? null : joiningServers, - leavingServers.isEmpty() ? null : leavingServers, - null, - config, stat); - } - }; + public Configurable<byte[]> inBackground(BackgroundCallback callback, Executor executor) { + backgrounding = new Backgrounding(callback, executor); + return this; } @Override - public AsyncReconfigurable usingDataCallback(final DataCallback callback, final Object ctx) { - return new AsyncReconfigurable() { - @Override - public void fromConfig(long config) throws Exception { - client.getZooKeeper() - .reconfig(joiningServers.isEmpty() ? null : joiningServers, - leavingServers.isEmpty() ? null : leavingServers, - null, - config, callback, ctx); - } - }; - } - } - - private static class NonIncrementalReconfigBuilderImpl<T> implements NonIncrementalReconfigBuilder<T> { - - private final CuratorFrameworkImpl client; - private List<String> newMembers = new LinkedList<String>(); - - private NonIncrementalReconfigBuilderImpl(CuratorFrameworkImpl client) { - this.client = client; - } - - private NonIncrementalReconfigBuilderImpl(CuratorFrameworkImpl client, List<String> newMembers) { - this.client = client; - this.newMembers = newMembers; + public Configurable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor) { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; } @Override - public NonIncrementalReconfigBuilder<T> withMember(String server) { - newMembers.add(server); + public Ensembleable<byte[]> fromConfig(long config) throws Exception { + this.config = config; return this; } @Override - public NonIncrementalReconfigBuilder<T> withMembers(Collection servers) { - newMembers.addAll(servers); + public Configurable<byte[]> storingStatIn(Stat stat) { + this.stat = stat; return this; } @Override - public SyncReconfigurable storingStatIn(final Stat stat) { - return new SyncReconfigurable() { - @Override - public byte[] fromConfig(long config) throws Exception { - return client.getZooKeeper().reconfig(null, null, newMembers, config, stat); - } - }; + public byte[] forEnsemble() throws Exception { + if (backgrounding.inBackground()) { + client.processBackgroundOperation(new OperationAndData<EnsembleServersAndConfig>(this, + new EnsembleServersAndConfig(joiningServers, leavingServers, members, config), + backgrounding.getCallback(), null, backgrounding.getContext()), null); + return new byte[0]; + } else { + return ensembleInForeground(); + } + } + + private byte[] ensembleInForeground() throws Exception { + TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground"); + byte[] responseData = RetryLoop.callWithRetry + ( + client.getZookeeperClient(), + new Callable<byte[]>() { + @Override + public byte[] call() throws Exception { + return client.getZooKeeper().reconfig( + joiningServers.isEmpty() ? null : joiningServers, + leavingServers.isEmpty() ? null : leavingServers, + members.isEmpty() ? null : members, + config, + stat + ); + } + } + ); + trace.commit(); + return responseData; } @Override - public AsyncReconfigurable usingDataCallback(final DataCallback callback, final Object ctx) { - return new AsyncReconfigurable() { + public void performBackgroundOperation(final OperationAndData<EnsembleServersAndConfig> operationAndData) throws Exception { + final TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Background"); + AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() { @Override - public void fromConfig(long config) throws Exception { - client.getZooKeeper().reconfig(null, null, newMembers, config, callback, ctx); + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + trace.commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.RECONFIG, rc, path, null, ctx, stat, data, null, null, null); + client.processBackgroundOperation(operationAndData, event); } }; + client.getZooKeeper().reconfig( + operationAndData.getData().getJoiningServers(), + operationAndData.getData().getLeavingServers(), + operationAndData.getData().getMembers(), + operationAndData.getData().getConfig(), + callback, + operationAndData.getContext() + ); + } } + private static class JoinReconfigBuilder extends ReconfigBuilderBase implements JoinBackgroundStatConfigurable { - public ReconfigBuilderImpl(CuratorFrameworkImpl client) { - this.client = client; - } + private JoinReconfigBuilder(CuratorFrameworkImpl client) { + super(client); + } - @Override - public IncrementalReconfigBuilder<byte[]> join(String server) { - return new IncrementalReconfigBuilderImpl(client).join(server); + @Override + public BackgroundStatConfigurable<byte[]> joining(String... servers) { + joiningServers.addAll(Arrays.asList(servers)); + return this; + } } - @Override - public IncrementalReconfigBuilder<byte[]> join(Collection<String> servers) { - return new IncrementalReconfigBuilderImpl(client).join(servers); - } + private static class LeaveReconfigBuilder extends ReconfigBuilderBase implements LeaveBackgroundStatConfigurable { - @Override - public IncrementalReconfigBuilder<byte[]> leave(String server) { - return new IncrementalReconfigBuilderImpl(client).leave(server); + private LeaveReconfigBuilder(CuratorFrameworkImpl client) { + super(client); + } + + @Override + public BackgroundStatConfigurable<byte[]> leaving(String... servers) { + leavingServers.addAll(Arrays.asList(servers)); + return this; + } } + @Override - public IncrementalReconfigBuilder<byte[]> leave(Collection<String> servers) { - return new IncrementalReconfigBuilderImpl(client).leave(servers); + public LeaveBackgroundStatConfigurable joining(String... servers) { + LeaveReconfigBuilder builder = new LeaveReconfigBuilder(client); + builder.joiningServers.addAll(Arrays.asList(servers)); + return builder; } @Override - public NonIncrementalReconfigBuilder<byte[]> withMember(String server) { - return new NonIncrementalReconfigBuilderImpl(client).withMember(server); + public JoinBackgroundStatConfigurable leaving(String... servers) { + JoinReconfigBuilder builder = new JoinReconfigBuilder(client); + builder.leavingServers.addAll(Arrays.asList(servers)); + return builder; } @Override - public NonIncrementalReconfigBuilder<byte[]> withMembers(Collection<String> servers) { - return new NonIncrementalReconfigBuilderImpl<byte[]>(client).withMembers(servers); + public BackgroundStatConfigurable<byte[]> withMembers(String... servers) { + ReconfigBuilderBase builder = new ReconfigBuilderBase(client); + builder.members.addAll(Arrays.asList(servers)); + return builder; } } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index 6918825..e8896ae 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -20,16 +20,17 @@ package org.apache.curator.framework.imps; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; -import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.IOException; @@ -41,15 +42,15 @@ import java.util.concurrent.atomic.AtomicReference; public class TestReconfiguration { - static TestingCluster cluster; + TestingCluster cluster; - @BeforeClass + @BeforeMethod public void setup() throws Exception { cluster = new TestingCluster(5); cluster.start(); } - @AfterClass + @AfterMethod public void tearDown() throws IOException { cluster.close(); } @@ -61,28 +62,28 @@ public class TestReconfiguration { client.blockUntilConnected(); try { Stat stat = new Stat(); - byte[] bytes = client.getConfig().storingStatIn(stat); + byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); Assert.assertNotNull(bytes); QuorumVerifier qv = getQuorumVerifier(bytes); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); String server1 = getServerString(qv, cluster, 1L); String server2 = getServerString(qv, cluster, 2L); //Remove Servers - bytes = client.reconfig().leave("1").storingStatIn(stat).fromConfig(qv.getVersion()); + bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(4, qv.getAllMembers().size()); - bytes = client.reconfig().leave("2").storingStatIn(stat).fromConfig(qv.getVersion()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(3, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 3); //Add Servers - bytes = client.reconfig().join("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()); + bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(4, qv.getAllMembers().size()); - bytes = client.reconfig().join("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); } finally { client.close(); } @@ -95,47 +96,48 @@ public class TestReconfiguration { client.blockUntilConnected(); try { final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>(); - final AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() { + final BackgroundCallback callback = new BackgroundCallback() { @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - bytes.set(data); - ((CountDownLatch)ctx).countDown(); + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + bytes.set(event.getData()); + ((CountDownLatch)event.getContext()).countDown(); } + }; CountDownLatch latch = new CountDownLatch(1); - client.getConfig().usingDataCallback(callback, latch); + client.getConfig().inBackground(callback, latch).forEnsemble(); latch.await(5, TimeUnit.SECONDS); Assert.assertNotNull(bytes.get()); QuorumVerifier qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); String server1 = getServerString(qv, cluster, 1L); String server2 = getServerString(qv, cluster, 2L); //Remove Servers latch = new CountDownLatch(1); - client.reconfig().leave("1").usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + client.reconfig().leaving("1").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(4, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 4); latch = new CountDownLatch(1); - client.reconfig().leave("2").usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + client.reconfig().leaving("2").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(3, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 3); //Add Servers latch = new CountDownLatch(1); - client.reconfig().join("server.1=" + server1).usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + client.reconfig().joining("server.1=" + server1).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(4, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 4); latch = new CountDownLatch(1); - client.reconfig().join("server.2=" + server2).usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + client.reconfig().joining("server.2=" + server2).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); } finally { client.close(); } @@ -148,10 +150,10 @@ public class TestReconfiguration { client.blockUntilConnected(); try { Stat stat = new Stat(); - byte[] bytes = client.getConfig().storingStatIn(stat); + byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); Assert.assertNotNull(bytes); QuorumVerifier qv = getQuorumVerifier(bytes); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); String server1 = getServerString(qv, cluster, 1L); String server2 = getServerString(qv, cluster, 2L); String server3 = getServerString(qv, cluster, 3L); @@ -160,40 +162,40 @@ public class TestReconfiguration { //Remove Servers bytes = client.reconfig() - .withMember("server.2="+server2) - .withMember("server.3="+server3) - .withMember("server.4="+server4) - .withMember("server.5="+server5) - .storingStatIn(stat).fromConfig(qv.getVersion()); + .withMembers("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(4, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 4); bytes = client.reconfig() - .withMember("server.3=" + server3) - .withMember("server.4=" + server4) - .withMember("server.5=" + server5) - .storingStatIn(stat).fromConfig(qv.getVersion()); + .withMembers("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(3, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 3); //Add Servers bytes = client.reconfig() - .withMember("server.1="+server1) - .withMember("server.3=" + server3) - .withMember("server.4="+server4) - .withMember("server.5="+server5) - .storingStatIn(stat).fromConfig(qv.getVersion()); + .withMembers("server.1=" + server1, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(4, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 4); bytes = client.reconfig() - .withMember("server.1="+server1) - .withMember("server.2="+server2) - .withMember("server.3=" + server3) - .withMember("server.4="+server4) - .withMember("server.5="+server5) - .storingStatIn(stat).fromConfig(qv.getVersion()); + .withMembers("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); qv = getQuorumVerifier(bytes); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); } finally { client.close(); } @@ -206,70 +208,70 @@ public class TestReconfiguration { client.blockUntilConnected(); try { final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>(); - final AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() { + final BackgroundCallback callback = new BackgroundCallback() { @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - bytes.set(data); - ((CountDownLatch)ctx).countDown(); + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + bytes.set(event.getData()); + ((CountDownLatch)event.getContext()).countDown(); } + }; CountDownLatch latch = new CountDownLatch(1); - client.getConfig().usingDataCallback(callback, latch); + client.getConfig().inBackground(callback, latch).forEnsemble(); latch.await(5, TimeUnit.SECONDS); Assert.assertNotNull(bytes.get()); QuorumVerifier qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); String server1 = getServerString(qv, cluster, 1L); String server2 = getServerString(qv, cluster, 2L); String server3 = getServerString(qv, cluster, 3L); String server4 = getServerString(qv, cluster, 4L); String server5 = getServerString(qv, cluster, 5L); - //Remove Servers latch = new CountDownLatch(1); client.reconfig() - .withMember("server.2=" + server2) - .withMember("server.3="+server3) - .withMember("server.4="+server4) - .withMember("server.5="+server5) - .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + .withMembers("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(4, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 4); latch = new CountDownLatch(1); client.reconfig() - .withMember("server.3="+server3) - .withMember("server.4=" + server4) - .withMember("server.5=" + server5) - .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + .withMembers("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(3, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 3); //Add Servers latch = new CountDownLatch(1); client.reconfig() - .withMember("server.1="+server1) - .withMember("server.3=" + server3) - .withMember("server.4=" + server4) - .withMember("server.5=" + server5) - .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + .withMembers("server.1=" + server1, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(4, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 4); latch = new CountDownLatch(1); client.reconfig() - .withMember("server.1="+server1) - .withMember("server.2="+server2) - .withMember("server.3="+server3) - .withMember("server.4=" + server4) - .withMember("server.5=" + server5) - .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + .withMembers("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); latch.await(5, TimeUnit.SECONDS); qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(5, qv.getAllMembers().size()); + Assert.assertEquals(qv.getAllMembers().size(), 5); } finally { client.close(); } http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java b/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java index b39a949..6d495df 100644 --- a/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java +++ b/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java @@ -70,6 +70,10 @@ public class InstanceSpec private final int tickTime; private final int maxClientCnxns; + public static void reset() { + nextServerId.set(1); + } + public static InstanceSpec newInstanceSpec() { return new InstanceSpec(null, -1, -1, -1, true, -1, -1, -1); http://git-wip-us.apache.org/repos/asf/curator/blob/b00cecf4/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java index cd86b72..f6bdbd8 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java @@ -249,6 +249,7 @@ public class TestingCluster implements Closeable private static Map<InstanceSpec, Collection<InstanceSpec>> makeSpecs(int instanceQty) { + InstanceSpec.reset(); ImmutableList.Builder<InstanceSpec> builder = ImmutableList.builder(); for ( int i = 0; i < instanceQty; ++i ) {