Repository: aurora Updated Branches: refs/heads/master 75129b694 -> 1e2a9e160
Make leader elections resilient to ZK disconnections. As documented in AURORA-1840 the Curator `LeaderLatch` recipe abdicates leadership if the ZK connection is lost or if there is a timeout. This is not compatible with the commons based implementation which would only abdicate leadership if the ZK session timeout occurred. This replaces the `LeaderLatch` recipe with the `LeaderSelector` recipe with a custom listener that only loses leadership if a connection loss occurs. Bugs closed: AURORA-1669 Reviewed at https://reviews.apache.org/r/54288/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1e2a9e16 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1e2a9e16 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1e2a9e16 Branch: refs/heads/master Commit: 1e2a9e160a41c3d916ff1a152b1d00e5b1ee380d Parents: 75129b6 Author: Zameer Manji <[email protected]> Authored: Mon Jan 23 14:38:56 2017 -0800 Committer: Zameer Manji <[email protected]> Committed: Mon Jan 23 14:38:56 2017 -0800 ---------------------------------------------------------------------- .../zookeeper/testing/ZooKeeperTestServer.java | 2 +- .../discovery/CuratorSingletonService.java | 56 ++++++++++++++---- .../discovery/BaseCuratorDiscoveryTest.java | 5 ++ .../discovery/CuratorSingletonServiceTest.java | 60 ++++++++++++++++++-- 4 files changed, 107 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java index 50acaeb..29204cd 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java @@ -82,7 +82,7 @@ public class ZooKeeperTestServer { /** * Starts zookeeper back up on the last used port. */ - final void restartNetwork() throws IOException, InterruptedException { + public final void restartNetwork() throws IOException, InterruptedException { checkEphemeralPortAssigned(); Preconditions.checkState(connectionFactory == null); startNetwork(); http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java index c378172..2847c41 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java @@ -17,6 +17,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.collect.Maps; @@ -28,18 +30,25 @@ import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderLatch; -import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.CancelLeadershipException; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.recipes.nodes.PersistentNode; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; class CuratorSingletonService implements SingletonService { + private static final Logger LOG = LoggerFactory.getLogger(CuratorSingletonService.class); + // This is the complement of the CuratorServiceGroupMonitor, it allows advertisement of a leader // in a service group. private static class Advertiser { @@ -151,11 +160,22 @@ class CuratorSingletonService implements SingletonService { requireNonNull(additionalEndpoints); requireNonNull(listener); - LeaderLatch leaderLatch = new LeaderLatch(client, groupPath, endpoint.getHostName()); Closer closer = Closer.create(); - leaderLatch.addListener(new LeaderLatchListener() { + + CountDownLatch giveUpLeadership = new CountDownLatch(1); + + // We do not use the suggested `LeaderSelectorListenerAdapter` or the LeaderLatch class + // because we want to have precise control over state changes. By default the listener and the + // latch class treat `SUSPENDED` (connection loss) as fatal and a reason to lose leadership. + // To make the scheduler resilient to connection blips and long GC pauses, we only treat + // `LOST` (session loss) as fatal. + + ExecutorService executor = + AsyncUtil.singleThreadLoggingScheduledExecutor("LeaderSelector-%d", LOG); + + LeaderSelectorListener leaderSelectorListener = new LeaderSelectorListener() { @Override - public void isLeader() { + public void takeLeadership(CuratorFramework curatorFramework) throws Exception { listener.onLeading(new LeaderControl() { @Override public void advertise() throws AdvertiseException, InterruptedException { @@ -165,27 +185,43 @@ class CuratorSingletonService implements SingletonService { @Override public void leave() throws LeaveException { try { + giveUpLeadership.countDown(); closer.close(); } catch (IOException e) { throw new LeaveException("Failed to abdicate leadership of group at " + groupPath, e); } } }); + + // The contract is to block as long as we want leadership. The leader never gives up + // leadership voluntarily, only when asked to shutdown so we block until our shutdown + // callback has been executed or we have lost our ZK connection. + giveUpLeadership.await(); } @Override - public void notLeader() { - listener.onDefeated(); + public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState) { + if (newState == ConnectionState.LOST) { + giveUpLeadership.countDown(); + listener.onDefeated(); + throw new CancelLeadershipException(); + } + } - }); + }; + + LeaderSelector leaderSelector = + new LeaderSelector(client, groupPath, executor, leaderSelectorListener); + + leaderSelector.setId(endpoint.getHostName()); try { - leaderLatch.start(); + leaderSelector.start(); } catch (Exception e) { // NB: We failed to lead; so we never could have advertised and there is no need to close the // closer. throw new LeadException("Failed to begin awaiting leadership of group " + groupPath, e); } - closer.register(leaderLatch); + closer.register(leaderSelector); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java index a2b4125..226b068 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java @@ -73,6 +73,11 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { getServer().expireClientSession(curator.getZookeeperClient().getZooKeeper().getSessionId()); } + final void causeDisconnection() throws Exception { + getServer().stop(); + getServer().restartNetwork(); + } + final CuratorFramework getClient() { return client; } http://git-wip-us.apache.org/repos/asf/aurora/blob/1e2a9e16/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java index 6ea49b0..946a78e 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java @@ -15,6 +15,8 @@ package org.apache.aurora.scheduler.discovery; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -23,10 +25,11 @@ import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.easymock.Capture; -import org.easymock.IAnswer; import org.easymock.IMocksControl; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createControl; @@ -39,6 +42,13 @@ import static org.junit.Assert.assertTrue; public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { private IMocksControl control; + // This test has a lot of blocking, this ensures we don't deadlock. + private final Timeout timeout = new Timeout(1, TimeUnit.MINUTES); + + @Rule + public Timeout getTimeout() { + return timeout; + } @Before public void setUpSingletonService() throws Exception { @@ -80,7 +90,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { // Wait to become leader. expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); - assertTrue(capture.hasCaptured()); + awaitCapture(capture); // Leadership nodes should not be seen as service group nodes. assertEquals(ImmutableSet.of(), getGroupMonitor().get()); @@ -111,7 +121,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { // Have host1 become leader. newLeader(getClient(), "host1", host1Listener); expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); - assertTrue(host1OnLeadingCapture.hasCaptured()); + awaitCapture(host1OnLeadingCapture); host1OnLeadingCapture.getValue().advertise(); expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); @@ -140,7 +150,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { CountDownLatch host1Defeated = new CountDownLatch(1); host1Listener.onDefeated(); - expectLastCall().andAnswer((IAnswer<Void>) () -> { + expectLastCall().andAnswer(() -> { host1Defeated.countDown(); return null; }); @@ -157,7 +167,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { // Have host1 become leader. newLeader(getClient(), "host1", host1Listener); expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); - assertTrue(host1OnLeadingCapture.hasCaptured()); + awaitCapture(host1OnLeadingCapture); host1OnLeadingCapture.getValue().advertise(); expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); @@ -186,6 +196,46 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { host1Defeated.await(); } + @Test + public void testZKDisconnection() throws Exception { + CountDownLatch leading = new CountDownLatch(1); + AtomicBoolean leader = new AtomicBoolean(false); + CountDownLatch defeated = new CountDownLatch(1); + + // The listener is executed in an internal thread of Curator, where it executes the leader + // listener callbacks. The exceptions there are not propagated out, so we have our own + // listener to validate behaviour. + SingletonService.LeadershipListener listener = new SingletonService.LeadershipListener() { + @Override + public void onLeading(SingletonService.LeaderControl leaderControl) { + leader.set(true); + leading.countDown(); + } + + @Override + public void onDefeated() { + leader.set(false); + defeated.countDown(); + } + }; + + control.replay(); + + startGroupMonitor(); + + CuratorFramework client = getClient(); + newLeader(client, "host1", listener); + leading.await(); + + causeDisconnection(); + assertTrue(leader.get()); + + expireSession(client); + defeated.await(); + + assertFalse(leader.get()); + } + private void awaitCapture(Capture<?> capture) throws InterruptedException { while (!capture.hasCaptured()) { Thread.sleep(1L);
