This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/curator.git
commit 61d281721f06ba85cf3b764c332da904353fb2b0 Author: chevaris <[email protected]> AuthorDate: Wed May 6 09:54:55 2020 +0200 JIRA:CURATOR-568 - Adding ensembleTracker(boolean) and withEnsembleTracker() methods to CuratorFrameworkFactory.builder() that allows enabling/disabling ensemble tracking --- .../curator/framework/CuratorFrameworkFactory.java | 25 ++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 2 +- .../framework/imps/TestReconfiguration.java | 56 +++++++++++++++++++++- 3 files changed, 80 insertions(+), 3 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 7980e1e..37db325 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -66,6 +66,7 @@ public class CuratorFrameworkFactory private static final DefaultACLProvider DEFAULT_ACL_PROVIDER = new DefaultACLProvider(); private static final long DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3); private static final int DEFAULT_CLOSE_WAIT_MS = (int)TimeUnit.SECONDS.toMillis(1); + private static final boolean DEFAULT_WITH_ENSEMBLE_TRACKER = true; /** * Return a new builder that builds a CuratorFramework @@ -129,6 +130,7 @@ public class CuratorFrameworkFactory public static class Builder { private EnsembleProvider ensembleProvider; + private boolean withEnsembleTracker = DEFAULT_WITH_ENSEMBLE_TRACKER; private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS; private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS; private int maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS; @@ -243,6 +245,29 @@ public class CuratorFrameworkFactory } /** + * Allows to configure if the ensemble configuration changes will be watched. + * The default value is {@code true}.<br> + * + * IMPORTANT: Use this method in combination with {@link #ensembleProvider(EnsembleProvider)} to provide + * and instance that returns {@code false} on {@link EnsembleProvider#updateServerListEnabled()} in order + * to fully achieve that ensemble server list changes are ignored<br> + * + * @param withTracker use {@code false} if you want to avoid following ensemble configuration changes + * @return this + */ + public Builder ensembleTracker(boolean withEnsembleTracker) { + this.withEnsembleTracker = withEnsembleTracker; + return this; + } + + /** + * @return {@code true} if ensemble configuration changes MUST be watched + */ + public boolean withEnsembleTracker() { + return withEnsembleTracker; + } + + /** * Sets the data to use when {@link PathAndBytesable#forPath(String)} is used. * This is useful for debugging purposes. For example, you could set this to be the IP of the * client. diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index d4d8241..d80c20b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -153,7 +153,7 @@ public class CuratorFrameworkImpl implements CuratorFramework failedRemoveWatcherManager = new FailedRemoveWatchManager(this); namespaceFacadeCache = new NamespaceFacadeCache(this); - ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider()); + ensembleTracker = builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null; runSafeService = makeRunSafeService(builder); } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index e3327e0..500e728 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; @@ -54,6 +55,7 @@ import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class TestReconfiguration extends CuratorTestBase @@ -174,6 +176,51 @@ public class TestReconfiguration extends CuratorTestBase } @Test + public void testAddWithoutEnsembleTracker() throws Exception + { + final String initialClusterCS = cluster.getConnectString(); + try ( CuratorFramework client = newClient(cluster.getConnectString(), false)) + { + Assert.assertEquals(((CuratorFrameworkImpl) client).getEnsembleTracker(), null); + client.start(); + + QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + assertConfig(oldConfig, cluster.getInstances()); + + CountDownLatch latch = setChangeWaiter(client); + try ( TestingCluster newCluster = new TestingCluster(TestingCluster.makeSpecs(1, false)) ) + { + newCluster.start(); + + client.reconfig().joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble(); + + Assert.assertTrue(timing.awaitLatch(latch)); + + byte[] newConfigData = client.getConfig().forEnsemble(); + QuorumVerifier newConfig = toQuorumVerifier(newConfigData); + List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances()); + newInstances.addAll(newCluster.getInstances()); + assertConfig(newConfig, newInstances); + Assert.assertEquals(ensembleProvider.getConnectionString(), initialClusterCS); + Assert.assertNotEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString()); + Assert.assertEquals(client.getZookeeperClient().getCurrentConnectionString(), initialClusterCS); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener( + (cfClient, newState) -> { + if (newState == ConnectionState.RECONNECTED) reconnectLatch.countDown(); + } + ); + client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); + Assert.assertTrue(reconnectLatch.await(2, TimeUnit.SECONDS)); + Assert.assertEquals(client.getZookeeperClient().getCurrentConnectionString(), initialClusterCS); + Assert.assertEquals(ensembleProvider.getConnectionString(), initialClusterCS); + newConfigData = client.getConfig().forEnsemble(); + Assert.assertNotEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString()); + } + } + } + + @Test public void testAdd() throws Exception { try ( CuratorFramework client = newClient()) @@ -412,10 +459,14 @@ public class TestReconfiguration extends CuratorTestBase private CuratorFramework newClient() { - return newClient(cluster.getConnectString()); + return newClient(cluster.getConnectString(), true); + } + + private CuratorFramework newClient(String connectionString) { + return newClient(connectionString, true); } - private CuratorFramework newClient(String connectionString) + private CuratorFramework newClient(String connectionString, boolean withEnsembleProvider) { final AtomicReference<String> connectString = new AtomicReference<>(connectionString); ensembleProvider = new EnsembleProvider() @@ -450,6 +501,7 @@ public class TestReconfiguration extends CuratorTestBase }; return CuratorFrameworkFactory.builder() .ensembleProvider(ensembleProvider) + .ensembleTracker(withEnsembleProvider) .sessionTimeoutMs(timing.session()) .connectionTimeoutMs(timing.connection()) .authorization("digest", superUserPassword.getBytes())
