[CURATOR-160] Add EnsembleListener and EnsembleTracker. Implement a DynamicEnsembleProvider. TestReconfiguration now also tests the DynamicEnsembleProvider.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/59292d88 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/59292d88 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/59292d88 Branch: refs/heads/CURATOR-215 Commit: 59292d8854678d73d27a86746c1734f6c0f7e6a8 Parents: 3b5452f Author: Ioannis Canellos <[email protected]> Authored: Tue Nov 11 16:35:57 2014 +0200 Committer: Ioannis Canellos <[email protected]> Committed: Fri Apr 17 13:00:30 2015 +0300 ---------------------------------------------------------------------- .../curator/ensemble/EnsembleListener.java | 24 + .../dynamic/DynamicEnsembleProvider.java | 61 +++ .../curator/framework/imps/EnsembleTracker.java | 167 +++++++ .../framework/imps/TestReconfiguration.java | 489 +++++++++++-------- 4 files changed, 538 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/59292d88/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java new file mode 100644 index 0000000..8f963cd --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.ensemble; + +public interface EnsembleListener { + + void connectionStringUpdated(String connectionString); +} http://git-wip-us.apache.org/repos/asf/curator/blob/59292d88/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java new file mode 100644 index 0000000..70b755f --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.ensemble.dynamic; + +import com.google.common.base.Preconditions; +import org.apache.curator.ensemble.EnsembleListener; +import org.apache.curator.ensemble.EnsembleProvider; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +public class DynamicEnsembleProvider implements EnsembleProvider, EnsembleListener { + + private final AtomicReference<String> connectionString = new AtomicReference<String>(); + + /** + * The connection string to use + * + * @param connectionString connection string + */ + public DynamicEnsembleProvider(String connectionString) + { + this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString cannot be null")); + } + + @Override + public void start() throws Exception { + // NOP + } + + @Override + public String getConnectionString() { + return connectionString.get(); + } + + @Override + public void close() throws IOException { + // NOP + } + + @Override + public void connectionStringUpdated(String connectionString) { + this.connectionString.set(connectionString); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/59292d88/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java new file mode 100644 index 0000000..a789e42 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import org.apache.curator.ensemble.EnsembleListener; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances. + */ +public class EnsembleTracker implements Closeable { + + private final Logger log = LoggerFactory.getLogger(getClass()); + private final CuratorFramework client; + private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<EnsembleListener>(); + private final AtomicBoolean isConnected = new AtomicBoolean(true); + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) { + if (isConnected.compareAndSet(false, true)) { + try { + reset(); + } catch (Exception e) { + log.error("Trying to reset after reconnection", e); + } + } + } else { + isConnected.set(false); + } + } + }; + + private final CuratorWatcher watcher = new CuratorWatcher() { + @Override + public void process(WatchedEvent event) throws Exception { + reset(); + } + }; + + + private enum State { + LATENT, + STARTED, + CLOSED + } + + private final BackgroundCallback backgroundCallback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + processBackgroundResult(event); + } + }; + + + public EnsembleTracker(CuratorFramework client) { + this.client = client; + } + + public void start() throws Exception { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); + client.getConnectionStateListenable().addListener(connectionStateListener); + reset(); + } + + @Override + public void close() throws IOException { + if (state.compareAndSet(State.STARTED, State.CLOSED)) { + listeners.clear(); + } + client.getConnectionStateListenable().removeListener(connectionStateListener); + } + + /** + * Return the ensemble listenable + * + * @return listenable + */ + public ListenerContainer<EnsembleListener> getListenable() + { + Preconditions.checkState(state.get() != State.CLOSED, "Closed"); + + return listeners; + } + + private void reset() throws Exception { + client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble(); + } + + private void processBackgroundResult(CuratorEvent event) throws Exception { + switch (event.getType()) { + case GET_CONFIG: { + if (event.getResultCode() == KeeperException.Code.OK.intValue()) { + processConfigData(event.getData()); + } + } + } + } + + private void processConfigData(byte[] data) throws Exception { + Properties properties = new Properties(); + properties.load(new ByteArrayInputStream(data)); + QuorumVerifier qv = new QuorumMaj(properties); + StringBuilder sb = new StringBuilder(); + for (QuorumPeer.QuorumServer server : qv.getAllMembers().values()) { + if (sb.length() != 0) { + sb.append(","); + } + sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort()); + } + + final String connectionString = sb.toString(); + listeners.forEach + ( + new Function<EnsembleListener, Void>() { + @Override + public Void apply(EnsembleListener listener) { + try { + listener.connectionStringUpdated(connectionString); + } catch (Exception e) { + log.error("Calling listener", e); + } + return null; + } + } + ); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/59292d88/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index e8896ae..faec551 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -18,6 +18,8 @@ */ package org.apache.curator.framework.imps; +import org.apache.curator.ensemble.EnsembleListener; +import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; @@ -35,246 +37,287 @@ import org.testng.annotations.Test; import java.io.IOException; import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; public class TestReconfiguration { TestingCluster cluster; + DynamicEnsembleProvider dynamicEnsembleProvider; + WaitOnDelegateListener waitOnDelegateListener; + EnsembleTracker ensembleTracker; + CuratorFramework client; + + String connectionString1to5; + String connectionString2to5; + String connectionString3to5; @BeforeMethod public void setup() throws Exception { cluster = new TestingCluster(5); cluster.start(); + + connectionString1to5 = cluster.getConnectString(); + connectionString2to5 = getConnectionString(cluster, 2,3,4,5); + connectionString3to5 = getConnectionString(cluster, 3,4,5); + + dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5); + client = CuratorFrameworkFactory.builder() + .ensembleProvider(dynamicEnsembleProvider) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + client.blockUntilConnected(); + + //Wrap around the dynamic ensemble provider, so that we can wait until it has received the event. + waitOnDelegateListener = new WaitOnDelegateListener(dynamicEnsembleProvider); + ensembleTracker = new EnsembleTracker(client); + ensembleTracker.getListenable().addListener(waitOnDelegateListener); + ensembleTracker.start(); + //Wait for the initial event. + waitOnDelegateListener.waitForEvent(); } @AfterMethod public void tearDown() throws IOException { + ensembleTracker.close(); + client.close(); cluster.close(); } @Test public void testSyncIncremental() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); - client.start(); - client.blockUntilConnected(); - try { - Stat stat = new Stat(); - byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); - Assert.assertNotNull(bytes); - QuorumVerifier qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - - //Remove Servers - bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - //Add Servers - bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - } finally { - client.close(); - } + Stat stat = new Stat(); + byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); + Assert.assertNotNull(bytes); + QuorumVerifier qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + + //Remove Servers + bytes = client.reconfig().leaving("1").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig().leaving("2").storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + + //Add Servers + bytes = client.reconfig().joining("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig().joining("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); } @Test public void testAsyncIncremental() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); - client.start(); - client.blockUntilConnected(); - try { - final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>(); - final BackgroundCallback callback = new BackgroundCallback() { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - bytes.set(event.getData()); - ((CountDownLatch)event.getContext()).countDown(); + final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>(); + final BackgroundCallback callback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + bytes.set(event.getData()); + //We only need the latch on getConfig. + if (event.getContext() != null) { + ((CountDownLatch) event.getContext()).countDown(); } + } - }; - - CountDownLatch latch = new CountDownLatch(1); - client.getConfig().inBackground(callback, latch).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - Assert.assertNotNull(bytes.get()); - QuorumVerifier qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - - - //Remove Servers - latch = new CountDownLatch(1); - client.reconfig().leaving("1").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - latch = new CountDownLatch(1); - client.reconfig().leaving("2").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - //Add Servers - latch = new CountDownLatch(1); - client.reconfig().joining("server.1=" + server1).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - latch = new CountDownLatch(1); - client.reconfig().joining("server.2=" + server2).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - } finally { - client.close(); - } + }; + + CountDownLatch latch = new CountDownLatch(1); + client.getConfig().inBackground(callback, latch).forEnsemble(); + latch.await(5, TimeUnit.SECONDS); + Assert.assertNotNull(bytes.get()); + QuorumVerifier qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + + + //Remove Servers + client.reconfig().leaving("1").inBackground(callback).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + client.reconfig().leaving("2").inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + //Add Servers + client.reconfig().joining("server.2=" + server2).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + + client.reconfig().joining("server.1=" + server1).inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); } @Test public void testSyncNonIncremental() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); - client.start(); - client.blockUntilConnected(); - try { - Stat stat = new Stat(); - byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); - Assert.assertNotNull(bytes); - QuorumVerifier qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - String server3 = getServerString(qv, cluster, 3L); - String server4 = getServerString(qv, cluster, 4L); - String server5 = getServerString(qv, cluster, 5L); - - //Remove Servers - bytes = client.reconfig() - .withMembers("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - bytes = client.reconfig() - .withMembers("server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - //Add Servers - bytes = client.reconfig() - .withMembers("server.1=" + server1, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 4); - bytes = client.reconfig() - .withMembers("server.1=" + server1, - "server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); - qv = getQuorumVerifier(bytes); - Assert.assertEquals(qv.getAllMembers().size(), 5); - } finally { - client.close(); - } + Stat stat = new Stat(); + byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble(); + Assert.assertNotNull(bytes); + QuorumVerifier qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + String server3 = getServerString(qv, cluster, 3L); + String server4 = getServerString(qv, cluster, 4L); + String server5 = getServerString(qv, cluster, 5L); + + //Remove Servers + bytes = client.reconfig() + .withMembers("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig() + .withMembers("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + + //Add Servers + bytes = client.reconfig() + .withMembers("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + + bytes = client.reconfig() + .withMembers("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(qv.getAllMembers().size(), 5); + + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); } @Test public void testAsyncNonIncremental() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); - client.start(); - client.blockUntilConnected(); - try { - final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>(); - final BackgroundCallback callback = new BackgroundCallback() { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - bytes.set(event.getData()); - ((CountDownLatch)event.getContext()).countDown(); - } + final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>(); + final BackgroundCallback callback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + bytes.set(event.getData()); + ((CountDownLatch) event.getContext()).countDown(); + } - }; - - CountDownLatch latch = new CountDownLatch(1); - client.getConfig().inBackground(callback, latch).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - Assert.assertNotNull(bytes.get()); - QuorumVerifier qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - String server1 = getServerString(qv, cluster, 1L); - String server2 = getServerString(qv, cluster, 2L); - String server3 = getServerString(qv, cluster, 3L); - String server4 = getServerString(qv, cluster, 4L); - String server5 = getServerString(qv, cluster, 5L); - - //Remove Servers - latch = new CountDownLatch(1); - client.reconfig() - .withMembers("server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - latch = new CountDownLatch(1); - client.reconfig() - .withMembers("server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 3); - - //Add Servers - latch = new CountDownLatch(1); - client.reconfig() - .withMembers("server.1=" + server1, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 4); - latch = new CountDownLatch(1); - client.reconfig() - .withMembers("server.1=" + server1, - "server.2=" + server2, - "server.3=" + server3, - "server.4=" + server4, - "server.5=" + server5) - .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); - latch.await(5, TimeUnit.SECONDS); - qv = getQuorumVerifier(bytes.get()); - Assert.assertEquals(qv.getAllMembers().size(), 5); - } finally { - client.close(); - } + }; + + CountDownLatch latch = new CountDownLatch(1); + client.getConfig().inBackground(callback, latch).forEnsemble(); + latch.await(5, TimeUnit.SECONDS); + Assert.assertNotNull(bytes.get()); + QuorumVerifier qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + String server3 = getServerString(qv, cluster, 3L); + String server4 = getServerString(qv, cluster, 4L); + String server5 = getServerString(qv, cluster, 5L); + + //Remove Servers + client.reconfig() + .withMembers("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + client.reconfig() + .withMembers("server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 3); + + //Add Servers + client.reconfig() + .withMembers("server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 4); + + client.reconfig() + .withMembers("server.1=" + server1, + "server.2=" + server2, + "server.3=" + server3, + "server.4=" + server4, + "server.5=" + server5) + .inBackground(callback, latch).fromConfig(qv.getVersion()).forEnsemble(); + waitOnDelegateListener.waitForEvent(); + Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(qv.getAllMembers().size(), 5); } @@ -302,4 +345,44 @@ public class TestReconfiguration { return str + ";" + getInstance(cluster, (int) id).getConnectString(); } } + + static String getConnectionString(TestingCluster cluster, long... ids) throws Exception { + StringBuilder sb = new StringBuilder(); + Map<Long, InstanceSpec> specs = new HashMap<Long, InstanceSpec>(); + for (InstanceSpec spec : cluster.getInstances()) { + specs.put(new Long(spec.getServerId()), spec); + } + for (long id : ids) { + if (sb.length() != 0) { + sb.append(","); + } + sb.append(specs.get(id).getConnectString()); + } + return sb.toString(); + } + + //Simple EnsembleListener that can wait until the delegate handles the event. + private static class WaitOnDelegateListener implements EnsembleListener { + private CountDownLatch latch = new CountDownLatch(1); + + private final EnsembleListener delegate; + + private WaitOnDelegateListener(EnsembleListener delegate) { + this.delegate = delegate; + } + + @Override + public void connectionStringUpdated(String connectionString) { + delegate.connectionStringUpdated(connectionString); + latch.countDown(); + } + + public void waitForEvent() throws InterruptedException, TimeoutException { + if (latch.await(5, TimeUnit.SECONDS)) { + latch = new CountDownLatch(1); + } else { + throw new TimeoutException("Failed to receive event in time."); + } + } + }; } \ No newline at end of file
