Repository: aurora Updated Branches: refs/heads/master f402899bc -> f729fd4d2
Extract a SingletonService interface. This makes space for introducing an Apache Curator implementation. Bugs closed: AURORA-1468 Reviewed at https://reviews.apache.org/r/45723/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f729fd4d Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f729fd4d Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f729fd4d Branch: refs/heads/master Commit: f729fd4d2a57e6a83904d680b59429aa5daf9a78 Parents: f402899 Author: John Sirois <[email protected]> Authored: Mon Apr 4 20:42:17 2016 -0600 Committer: John Sirois <[email protected]> Committed: Mon Apr 4 20:42:17 2016 -0600 ---------------------------------------------------------------------- .../common/zookeeper/SingletonService.java | 133 +++------- .../common/zookeeper/SingletonServiceImpl.java | 122 ++++++++++ .../zookeeper/SingletonServiceImplTest.java | 243 ++++++++++++++++++ .../common/zookeeper/SingletonServiceTest.java | 244 ------------------- .../aurora/scheduler/SchedulerLifecycle.java | 14 +- .../aurora/scheduler/app/SchedulerMain.java | 7 +- .../scheduler/app/ServiceDiscoveryModule.java | 5 +- .../scheduler/SchedulerLifecycleTest.java | 2 +- 8 files changed, 415 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java index 20accd2..3561d07 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java @@ -15,56 +15,38 @@ package org.apache.aurora.common.zookeeper; import java.net.InetSocketAddress; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.zookeeper.Candidate.Leader; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.zookeeper.data.ACL; /** - * A service that uses master election to only allow a single instance of the server to join - * the {@link ServerSet} at a time. + * A service that uses master election to only allow a single service instance to be active amongst + * a set of potential servers at a time. */ -public class SingletonService { - @VisibleForTesting - static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_"; +public interface SingletonService { /** - * Creates a candidate that can be combined with an existing server set to form a singleton - * service using {@link #SingletonService(ServerSet, Candidate)}. - * - * @param zkClient The ZooKeeper client to use. - * @param servicePath The path where service nodes live. - * @param acl The acl to apply to newly created candidate nodes and serverset nodes. - * @return A candidate that can be housed with a standard server set under a single zk path. + * Indicates an error attempting to lead a group of servers. */ - public static Candidate createSingletonCandidate( - ZooKeeperClient zkClient, - String servicePath, - Iterable<ACL> acl) { - - return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX)); + class LeadException extends Exception { + public LeadException(String message, Throwable cause) { + super(message, cause); + } } - private final ServerSet serverSet; - private final Candidate candidate; + /** + * Indicates an error attempting to advertise leadership of a group of servers. + */ + class AdvertiseException extends Exception { + public AdvertiseException(String message, Throwable cause) { + super(message, cause); + } + } /** - * Creates a new singleton service that uses the supplied candidate to vie for leadership and then - * advertises itself in the given server set once elected. - * - * @param serverSet The server set to advertise in on election. - * @param candidate The candidacy to use to vie for election. + * Indicates an error attempting to leave a group of servers, abdicating leadership of the group. */ - public SingletonService(ServerSet serverSet, Candidate candidate) { - this.serverSet = Preconditions.checkNotNull(serverSet); - this.candidate = Preconditions.checkNotNull(candidate); + class LeaveException extends Exception { + public LeaveException(String message, Throwable cause) { + super(message, cause); + } } /** @@ -73,55 +55,20 @@ public class SingletonService { * @param endpoint The primary endpoint to register as a leader candidate in the service. * @param additionalEndpoints Additional endpoints that are available on the host. * @param listener Handler to call when the candidate is elected or defeated. - * @throws Group.WatchException If there was a problem watching the ZooKeeper group. - * @throws Group.JoinException If there was a problem joining the ZooKeeper group. + * @throws LeadException If there was a problem joining or watching the ZooKeeper group. * @throws InterruptedException If the thread watching/joining the group was interrupted. */ - public void lead(final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final LeadershipListener listener) - throws Group.WatchException, Group.JoinException, InterruptedException { - - Preconditions.checkNotNull(listener); - - candidate.offerLeadership(new Leader() { - private ServerSet.EndpointStatus endpointStatus = null; - @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) { - listener.onLeading(new LeaderControl() { - ServerSet.EndpointStatus endpointStatus = null; - final AtomicBoolean left = new AtomicBoolean(false); - - // Methods are synchronized to prevent simultaneous invocations. - @Override public synchronized void advertise() - throws JoinException, InterruptedException { - - Preconditions.checkState(!left.get(), "Cannot advertise after leaving."); - Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once."); - endpointStatus = serverSet.join(endpoint, additionalEndpoints); - } - - @Override public synchronized void leave() throws ServerSet.UpdateException, JoinException { - Preconditions.checkState(left.compareAndSet(false, true), - "Cannot leave more than once."); - if (endpointStatus != null) { - endpointStatus.leave(); - } - abdicate.execute(); - } - }); - } - - @Override public void onDefeated() { - listener.onDefeated(endpointStatus); - } - }); - } + void lead( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + LeadershipListener listener) + throws LeadException, InterruptedException; /** * A listener to be notified of changes in the leadership status. * Implementers should be careful to avoid blocking operations in these callbacks. */ - public interface LeadershipListener { + interface LeadershipListener { /** * Notifies the listener that is is current leader. @@ -131,35 +78,33 @@ public class SingletonService { void onLeading(LeaderControl control); /** - * Notifies the listener that it is no longer leader. The leader should take this opportunity - * to remove its advertisement gracefully. - * - * @param status A handle on the endpoint status for the advertised leader. + * Notifies the listener that it is no longer leader. */ - void onDefeated(@Nullable ServerSet.EndpointStatus status); + void onDefeated(); } /** * A controller for the state of the leader. This will be provided to the leader upon election, - * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and - * terminate leadership at will. + * which allows the leader to decide when to advertise as leader of the server set and terminate + * leadership at will. */ - public interface LeaderControl { + interface LeaderControl { /** * Advertises the leader's server presence to clients. * - * @throws JoinException If there was an error advertising. + * @throws AdvertiseException If there was an error advertising the singleton leader to clients + * of the server set. * @throws InterruptedException If interrupted while advertising. */ - void advertise() throws JoinException, InterruptedException; + void advertise() throws AdvertiseException, InterruptedException; /** * Leaves candidacy for leadership, removing advertised server presence if applicable. * - * @throws ServerSet.UpdateException If the leader's status could not be updated. - * @throws JoinException If there was an error abdicating from leader election. + * @throws LeaveException If the leader's status could not be updated or there was an error + * abdicating server set leadership. */ - void leave() throws ServerSet.UpdateException, JoinException; + void leave() throws LeaveException; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java new file mode 100644 index 0000000..d9978a9 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Candidate.Leader; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.zookeeper.data.ACL; + +public class SingletonServiceImpl implements SingletonService { + @VisibleForTesting + static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_"; + + /** + * Creates a candidate that can be combined with an existing server set to form a singleton + * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}. + * + * @param zkClient The ZooKeeper client to use. + * @param servicePath The path where service nodes live. + * @param acl The acl to apply to newly created candidate nodes and serverset nodes. + * @return A candidate that can be housed with a standard server set under a single zk path. + */ + public static Candidate createSingletonCandidate( + ZooKeeperClient zkClient, + String servicePath, + Iterable<ACL> acl) { + + return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX)); + } + + private final ServerSet serverSet; + private final Candidate candidate; + + /** + * Creates a new singleton service that uses the supplied candidate to vie for leadership and then + * advertises itself in the given server set once elected. + * + * @param serverSet The server set to advertise in on election. + * @param candidate The candidacy to use to vie for election. + */ + public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) { + this.serverSet = Preconditions.checkNotNull(serverSet); + this.candidate = Preconditions.checkNotNull(candidate); + } + + @Override + public void lead(final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final LeadershipListener listener) + throws LeadException, InterruptedException { + + Preconditions.checkNotNull(listener); + + try { + candidate.offerLeadership(new Leader() { + @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) { + listener.onLeading(new LeaderControl() { + ServerSet.EndpointStatus endpointStatus = null; + final AtomicBoolean left = new AtomicBoolean(false); + + // Methods are synchronized to prevent simultaneous invocations. + @Override public synchronized void advertise() + throws AdvertiseException, InterruptedException { + + Preconditions.checkState(!left.get(), "Cannot advertise after leaving."); + Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once."); + try { + endpointStatus = serverSet.join(endpoint, additionalEndpoints); + } catch (JoinException e) { + throw new AdvertiseException("Problem advertising endpoint " + endpoint, e); + } + } + + @Override public synchronized void leave() throws LeaveException { + Preconditions.checkState(left.compareAndSet(false, true), + "Cannot leave more than once."); + if (endpointStatus != null) { + try { + endpointStatus.leave(); + } catch (ServerSet.UpdateException e) { + throw new LeaveException("Problem updating endpoint status for abdicating leader " + + "at endpoint " + endpoint, e); + } + } + try { + abdicate.execute(); + } catch (JoinException e) { + throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e); + } + } + }); + } + + @Override public void onDefeated() { + listener.onDefeated(); + } + }); + } catch (JoinException e) { + throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e); + } catch (Group.WatchException e) { + throw new LeadException("Problem getting initial membership list for leadership group.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java new file mode 100644 index 0000000..82df845 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java @@ -0,0 +1,243 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Candidate.Leader; +import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; +import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; +import org.easymock.Capture; +import org.easymock.IExpectationSetters; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.fail; + +public class SingletonServiceImplTest extends BaseZooKeeperTest { + private static final int PORT_A = 1234; + private static final int PORT_B = 8080; + private static final InetSocketAddress PRIMARY_ENDPOINT = + InetSocketAddress.createUnresolved("foo", PORT_A); + private static final Map<String, InetSocketAddress> AUX_ENDPOINTS = + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)); + + private IMocksControl control; + private SingletonServiceImpl.LeadershipListener listener; + private ServerSet serverSet; + private ServerSet.EndpointStatus endpointStatus; + private Candidate candidate; + private ExceptionalCommand<Group.JoinException> abdicate; + + private SingletonService service; + + @Before + @SuppressWarnings("unchecked") + public void mySetUp() throws IOException { + control = createControl(); + addTearDown(control::verify); + listener = control.createMock(SingletonServiceImpl.LeadershipListener.class); + serverSet = control.createMock(ServerSet.class); + candidate = control.createMock(Candidate.class); + endpointStatus = control.createMock(ServerSet.EndpointStatus.class); + abdicate = control.createMock(ExceptionalCommand.class); + + service = new SingletonServiceImpl(serverSet, candidate); + } + + private void newLeader( + final String hostName, + Capture<Leader> leader, + LeadershipListener listener) throws Exception { + + service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A), + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)), + listener); + + // This actually elects the leader. + leader.getValue().onElected(abdicate); + } + + private void newLeader(String hostName, Capture<Leader> leader) throws Exception { + newLeader(hostName, leader, listener); + } + + private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception { + return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS)); + } + + @Test + public void testLeadAdvertise() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().leave(); + } + + @Test + public void teatLeadLeaveNoAdvertise() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + abdicate.execute(); + + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().leave(); + } + + @Test + public void testLeadJoinFailure() throws Exception { + Capture<Leader> leaderCapture = new Capture<Leader>(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception())); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + + try { + controlCapture.getValue().advertise(); + fail("Join should have failed."); + } catch (SingletonService.AdvertiseException e) { + // Expected. + } + + controlCapture.getValue().leave(); + } + + @Test(expected = IllegalStateException.class) + public void testMultipleAdvertise() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().advertise(); + } + + @Test(expected = IllegalStateException.class) + public void testMultipleLeave() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().leave(); + controlCapture.getValue().leave(); + } + + @Test(expected = IllegalStateException.class) + public void testAdvertiseAfterLeave() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().leave(); + controlCapture.getValue().advertise(); + } + + @Test + public void testLeadMulti() throws Exception { + List<Capture<Leader>> leaderCaptures = Lists.newArrayList(); + List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList(); + + for (int i = 0; i < 5; i++) { + Capture<Leader> leaderCapture = new Capture<Leader>(); + leaderCaptures.add(leaderCapture); + Capture<LeaderControl> controlCapture = createCapture(); + leaderControlCaptures.add(controlCapture); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + listener.onLeading(capture(controlCapture)); + InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A); + Map<String, InetSocketAddress> aux = + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B)); + expect(serverSet.join(primary, aux)).andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + } + + control.replay(); + + for (int i = 0; i < 5; i++) { + final String leaderName = "foo" + i; + newLeader(leaderName, leaderCaptures.get(i)); + leaderControlCaptures.get(i).getValue().advertise(); + leaderControlCaptures.get(i).getValue().leave(); + } + } + + @Test + public void testLeaderLeaves() throws Exception { + control.replay(); + shutdownNetwork(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java deleted file mode 100644 index 454ae22..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.zookeeper.Candidate.Leader; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; -import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; -import org.easymock.Capture; -import org.easymock.IExpectationSetters; -import org.easymock.IMocksControl; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.fail; - -public class SingletonServiceTest extends BaseZooKeeperTest { - private static final int PORT_A = 1234; - private static final int PORT_B = 8080; - private static final InetSocketAddress PRIMARY_ENDPOINT = - InetSocketAddress.createUnresolved("foo", PORT_A); - private static final Map<String, InetSocketAddress> AUX_ENDPOINTS = - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)); - - private IMocksControl control; - private SingletonService.LeadershipListener listener; - private ServerSet serverSet; - private ServerSet.EndpointStatus endpointStatus; - private Candidate candidate; - private ExceptionalCommand<Group.JoinException> abdicate; - - private SingletonService service; - - @Before - @SuppressWarnings("unchecked") - public void mySetUp() throws IOException { - control = createControl(); - addTearDown(control::verify); - listener = control.createMock(SingletonService.LeadershipListener.class); - serverSet = control.createMock(ServerSet.class); - candidate = control.createMock(Candidate.class); - endpointStatus = control.createMock(ServerSet.EndpointStatus.class); - abdicate = control.createMock(ExceptionalCommand.class); - - service = new SingletonService(serverSet, candidate); - } - - private void newLeader( - final String hostName, - Capture<Leader> leader, - LeadershipListener listener) throws Exception { - - service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A), - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)), - listener); - - // This actually elects the leader. - leader.getValue().onElected(abdicate); - } - - private void newLeader(String hostName, Capture<Leader> leader) throws Exception { - newLeader(hostName, leader, listener); - } - - private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception { - return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS)); - } - - @Test - public void testLeadAdvertise() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().leave(); - } - - @Test - public void teatLeadLeaveNoAdvertise() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - abdicate.execute(); - - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().leave(); - } - - @Test - public void testLeadJoinFailure() throws Exception { - Capture<Leader> leaderCapture = new Capture<Leader>(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception())); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - - try { - controlCapture.getValue().advertise(); - fail("Join should have failed."); - } catch (JoinException e) { - // Expected. - } - - controlCapture.getValue().leave(); - } - - @Test(expected = IllegalStateException.class) - public void testMultipleAdvertise() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().advertise(); - } - - @Test(expected = IllegalStateException.class) - public void testMultipleLeave() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().leave(); - controlCapture.getValue().leave(); - } - - @Test(expected = IllegalStateException.class) - public void testAdvertiseAfterLeave() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().leave(); - controlCapture.getValue().advertise(); - } - - @Test - public void testLeadMulti() throws Exception { - List<Capture<Leader>> leaderCaptures = Lists.newArrayList(); - List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList(); - - for (int i = 0; i < 5; i++) { - Capture<Leader> leaderCapture = new Capture<Leader>(); - leaderCaptures.add(leaderCapture); - Capture<LeaderControl> controlCapture = createCapture(); - leaderControlCaptures.add(controlCapture); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - listener.onLeading(capture(controlCapture)); - InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A); - Map<String, InetSocketAddress> aux = - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B)); - expect(serverSet.join(primary, aux)).andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - } - - control.replay(); - - for (int i = 0; i < 5; i++) { - final String leaderName = "foo" + i; - newLeader(leaderName, leaderCaptures.get(i)); - leaderControlCaptures.get(i).getValue().advertise(); - leaderControlCaptures.get(i).getValue().leave(); - } - } - - @Test - public void testLeaderLeaves() throws Exception { - control.replay(); - shutdownNetwork(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java index b15540c..debe899 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Qualifier; @@ -48,8 +47,7 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.StateMachine; import org.apache.aurora.common.util.StateMachine.Transition; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; @@ -83,7 +81,7 @@ import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipList * {@link java.lang.IllegalStateException}. * <p> * At any point in the lifecycle, the scheduler will respond to - * {@link LeadershipListener#onDefeated(ServerSet.EndpointStatus) + * {@link LeadershipListener#onDefeated() * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}. * A clean shutdown will also be initiated if control actions fail during normal state transitions. */ @@ -274,7 +272,7 @@ public class SchedulerLifecycle implements EventSubscriber { schedulerActiveServiceManager.startAsync().awaitHealthy(); try { leaderControl.get().advertise(); - } catch (JoinException | InterruptedException e) { + } catch (SingletonService.AdvertiseException | InterruptedException e) { LOG.error("Failed to advertise leader, shutting down."); throw Throwables.propagate(e); } @@ -297,10 +295,8 @@ public class SchedulerLifecycle implements EventSubscriber { if (control != null) { try { control.leave(); - } catch (JoinException e) { + } catch (SingletonService.LeaveException e) { LOG.warn("Failed to leave leadership: " + e, e); - } catch (ServerSet.UpdateException e) { - LOG.warn("Failed to leave server set: " + e, e); } } @@ -395,7 +391,7 @@ public class SchedulerLifecycle implements EventSubscriber { } @Override - public void onDefeated(@Nullable ServerSet.EndpointStatus status) { + public void onDefeated() { LOG.error("Lost leadership, committing suicide."); stateMachine.transition(State.DEAD); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 60b66e8..11f6ad1 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -42,7 +42,6 @@ import org.apache.aurora.common.args.constraints.NotEmpty; import org.apache.aurora.common.args.constraints.NotNull; import org.apache.aurora.common.inject.Bindings; import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.common.zookeeper.Group; import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; import org.apache.aurora.gen.ServerInfo; @@ -120,10 +119,8 @@ public class SchedulerMain { httpSocketAddress, ImmutableMap.of("http", httpSocketAddress), leaderListener); - } catch (Group.WatchException e) { - throw new IllegalStateException("Failed to watch group and lead service.", e); - } catch (Group.JoinException e) { - throw new IllegalStateException("Failed to join scheduler service group.", e); + } catch (SingletonService.LeadException e) { + throw new IllegalStateException("Failed to lead service.", e); } catch (InterruptedException e) { throw new IllegalStateException("Interrupted while joining scheduler service group.", e); } http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java index 97977fd..240164f 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java @@ -25,6 +25,7 @@ import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.zookeeper.ServerSet; import org.apache.aurora.common.zookeeper.ServerSetImpl; import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.SingletonServiceImpl; import org.apache.aurora.common.zookeeper.ZooKeeperClient; import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials; import org.apache.aurora.common.zookeeper.ZooKeeperUtils; @@ -85,8 +86,8 @@ public class ServiceDiscoveryModule extends AbstractModule { ServerSet serverSet, List<ACL> zookeeperAcls) { - return new SingletonService( + return new SingletonServiceImpl( serverSet, - SingletonService.createSingletonCandidate(client, serverSetPath, zookeeperAcls)); + SingletonServiceImpl.createSingletonCandidate(client, serverSetPath, zookeeperAcls)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java index e225ae5..051c520 100644 --- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java @@ -179,7 +179,7 @@ public class SchedulerLifecycleTest extends EasyMockTest { LeadershipListener leaderListener = schedulerLifecycle.prepare(); leaderListener.onLeading(leaderControl); - leaderListener.onDefeated(null); + leaderListener.onDefeated(); } @Test
