Repository: curator Updated Branches: refs/heads/CURATOR-248 [created] 94dff8a5a
Initial error policy with two implementations. Also, applied it to LeaderSelector as a test Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/45df7ba7 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/45df7ba7 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/45df7ba7 Branch: refs/heads/CURATOR-248 Commit: 45df7ba71f14a5f9751061a7dff956312bfdd421 Parents: f9af0ce Author: randgalt <[email protected]> Authored: Mon Aug 24 12:24:06 2015 -0500 Committer: randgalt <[email protected]> Committed: Mon Aug 24 12:24:06 2015 -0500 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 8 ++ .../framework/CuratorFrameworkFactory.java | 20 +++++ .../framework/imps/CuratorFrameworkImpl.java | 10 +++ .../curator/framework/state/ErrorPolicy.java | 18 ++++ .../framework/state/SessionErrorPolicy.java | 13 +++ .../framework/state/StandardErrorPolicy.java | 14 +++ .../leader/LeaderSelectorListenerAdapter.java | 2 +- .../recipes/leader/TestLeaderSelector.java | 90 ++++++++++++++++++++ 8 files changed, 174 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 58c5bf5..d755d28 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.api.transaction.TransactionOp; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.framework.state.ErrorPolicy; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.Watcher; @@ -297,4 +298,11 @@ public interface CuratorFramework extends Closeable * @return facade */ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework(); + + /** + * Return the configured error policy + * + * @return error policy + */ + public ErrorPolicy getErrorPolicy(); } http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index dcb2ee6..aa5181d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -31,6 +31,8 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.CuratorTempFrameworkImpl; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.imps.GzipCompressionProvider; +import org.apache.curator.framework.state.ErrorPolicy; +import org.apache.curator.framework.state.StandardErrorPolicy; import org.apache.curator.utils.DefaultZookeeperFactory; import org.apache.curator.utils.ZookeeperFactory; import org.apache.zookeeper.CreateMode; @@ -116,6 +118,7 @@ public class CuratorFrameworkFactory private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; private boolean useContainerParentsIfAvailable = true; + private ErrorPolicy errorPolicy = new StandardErrorPolicy(); /** * Apply the current values and build a new CuratorFramework @@ -343,6 +346,18 @@ public class CuratorFrameworkFactory return this; } + /** + * Set the error policy to use. The default is {@link StandardErrorPolicy} + * + * @param errorPolicy new error policy + * @return this + */ + public Builder errorPolicy(ErrorPolicy errorPolicy) + { + this.errorPolicy = errorPolicy; + return this; + } + public ACLProvider getAclProvider() { return aclProvider; @@ -398,6 +413,11 @@ public class CuratorFrameworkFactory return useContainerParentsIfAvailable; } + public ErrorPolicy getErrorPolicy() + { + return errorPolicy; + } + @Deprecated public String getAuthScheme() { http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 41bb7cd..3310daf 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -40,6 +40,7 @@ import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateManager; +import org.apache.curator.framework.state.ErrorPolicy; import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; import org.apache.curator.utils.ThreadUtils; @@ -83,6 +84,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final NamespaceFacadeCache namespaceFacadeCache; private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; + private final ErrorPolicy errorPolicy; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -124,6 +126,7 @@ public class CuratorFrameworkImpl implements CuratorFramework aclProvider = builder.getAclProvider(); state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT); useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); + errorPolicy = Preconditions.checkNotNull(builder.getErrorPolicy(), "errorPolicy cannot be null"); byte[] builderDefaultData = builder.getDefaultData(); defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0]; @@ -197,6 +200,7 @@ public class CuratorFrameworkImpl implements CuratorFramework state = parent.state; authInfos = parent.authInfos; useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable; + errorPolicy = parent.errorPolicy; } @Override @@ -241,6 +245,12 @@ public class CuratorFrameworkImpl implements CuratorFramework } @Override + public ErrorPolicy getErrorPolicy() + { + return errorPolicy; + } + + @Override public void start() { log.info("Starting"); http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java new file mode 100644 index 0000000..0e1bfb5 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java @@ -0,0 +1,18 @@ +package org.apache.curator.framework.state; + +/** + * Recipes should use the configured error policy to decide how to handle + * errors such as {@link ConnectionState} changes. + */ +public interface ErrorPolicy +{ + /** + * Returns true if the given state should cause the recipe to + * act as though the connection has been lost. i.e. locks should + * exit, etc. + * + * @param state the state + * @return true/false + */ + boolean isErrorState(ConnectionState state); +} http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java b/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java new file mode 100644 index 0000000..3f68fe4 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java @@ -0,0 +1,13 @@ +package org.apache.curator.framework.state; + +/** + * This policy treats only {@link ConnectionState#LOST} as an error + */ +public class SessionErrorPolicy implements ErrorPolicy +{ + @Override + public boolean isErrorState(ConnectionState state) + { + return state == ConnectionState.LOST; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java b/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java new file mode 100644 index 0000000..ea0c668 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java @@ -0,0 +1,14 @@ +package org.apache.curator.framework.state; + +/** + * This policy treats {@link ConnectionState#SUSPENDED} and {@link ConnectionState#LOST} + * as errors + */ +public class StandardErrorPolicy implements ErrorPolicy +{ + @Override + public boolean isErrorState(ConnectionState state) + { + return ((state == ConnectionState.SUSPENDED) || (state == ConnectionState.LOST)); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java index 7402fa7..1b0070a 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java @@ -30,7 +30,7 @@ public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorLis @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST) ) + if ( client.getErrorPolicy().isErrorState(newState) ) { throw new CancelLeadershipException(); } http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java index c7f415c..ae19b3c 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java @@ -20,10 +20,13 @@ package org.apache.curator.framework.recipes.leader; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.framework.state.SessionErrorPolicy; +import org.apache.curator.framework.state.StandardErrorPolicy; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.KillSession; @@ -50,6 +53,93 @@ public class TestLeaderSelector extends BaseClassForTests private static final String PATH_NAME = "/one/two/me"; @Test + public void testErrorPolicies() throws Exception + { + Timing timing = new Timing(); + LeaderSelector selector = null; + CuratorFramework client = CuratorFrameworkFactory + .builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new StandardErrorPolicy()) + .build(); + try + { + final BlockingQueue<String> changes = Queues.newLinkedBlockingQueue(); + + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + changes.add(newState.name()); + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() + { + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + changes.add("leader"); + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + changes.add("release"); + Thread.currentThread().interrupt(); + } + } + }; + selector = new LeaderSelector(client, "/test", listener); + selector.start(); + + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "leader"); + server.close(); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "release"); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); + + selector.close(); + client.close(); + timing.sleepABit(); + changes.clear(); + + server = new TestingServer(); + client = CuratorFrameworkFactory + .builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new SessionErrorPolicy()) + .build(); + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + selector = new LeaderSelector(client, "/test", listener); + selector.start(); + + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "leader"); + server.stop(); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "release"); + } + finally + { + CloseableUtils.closeQuietly(selector); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testLeaderNodeDeleteOnInterrupt() throws Exception { Timing timing = new Timing();
