Repository: aurora Updated Branches: refs/heads/master f559e9306 -> 69cba786e
http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java index 2166123..a37808c 100644 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import com.google.gson.JsonIOException; -import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; @@ -42,35 +41,44 @@ import static org.junit.Assert.fail; @PrepareForTest(Gson.class) public class JsonCodecTest { - private static final Codec<ServiceInstance> STANDARD_JSON_CODEC = new JsonCodec(); + private static byte[] serializeServiceInstance(ServiceInstance serviceInstance) + throws IOException { + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + JsonCodec.INSTANCE.serialize(serviceInstance, output); + return output.toByteArray(); + } + + private static ServiceInstance deserializeServiceInstance(byte[] data) throws IOException { + return JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data)); + } @Test public void testJsonCodecRoundtrip() throws Exception { - Codec<ServiceInstance> codec = STANDARD_JSON_CODEC; ServiceInstance instance1 = new ServiceInstance( new Endpoint("foo", 1000), ImmutableMap.of("http", new Endpoint("foo", 8080)), Status.ALIVE) .setShard(0); - byte[] data = ServerSets.serializeServiceInstance(instance1, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + byte[] data = serializeServiceInstance(instance1); + assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort()); + assertTrue(deserializeServiceInstance(data).isSetShard()); ServiceInstance instance2 = new ServiceInstance( new Endpoint("foo", 1000), ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), Status.ALIVE); - data = ServerSets.serializeServiceInstance(instance2, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + data = serializeServiceInstance(instance2); + assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort()); + assertFalse(deserializeServiceInstance(data).isSetShard()); ServiceInstance instance3 = new ServiceInstance( new Endpoint("foo", 1000), ImmutableMap.<String, Endpoint>of(), Status.ALIVE); - data = ServerSets.serializeServiceInstance(instance3, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + data = serializeServiceInstance(instance3); + assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort()); + assertFalse(deserializeServiceInstance(data).isSetShard()); } @Test @@ -81,7 +89,7 @@ public class JsonCodecTest { Status.ALIVE).setShard(42); ByteArrayOutputStream results = new ByteArrayOutputStream(); - STANDARD_JSON_CODEC.serialize(instance, results); + JsonCodec.INSTANCE.serialize(instance, results); assertEquals( "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}," + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}}," @@ -115,7 +123,7 @@ public class JsonCodecTest { public void testDeserializeMinimal() throws IOException { String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}"; ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8)); - ServiceInstance actual = STANDARD_JSON_CODEC.deserialize(source); + ServiceInstance actual = JsonCodec.INSTANCE.deserialize(source); ServiceInstance expected = new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE); assertEquals(expected, actual); @@ -142,7 +150,7 @@ public class JsonCodecTest { private void assertInvalidDeserialize(byte[] data) { try { - STANDARD_JSON_CODEC.deserialize(new ByteArrayInputStream(data)); + JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data)); fail(); } catch (IOException e) { // Expected. http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java deleted file mode 100644 index f0c0cb4..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.easymock.IMocksControl; -import org.junit.Before; -import org.junit.Test; - -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * - * TODO(William Farner): Change this to remove thrift dependency. - */ -public class ServerSetImplTest extends BaseZooKeeperClientTest { - private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private static final String SERVICE = "/twitter/services/puffin_hosebird"; - - private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer; - private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor; - - @Before - public void mySetUp() throws IOException { - serverSetBuffer = new LinkedBlockingQueue<>(); - serverSetMonitor = serverSetBuffer::offer; - } - - private ServerSetImpl createServerSet() throws IOException { - return new ServerSetImpl(createZkClient(), ACL, SERVICE); - } - - @Test - public void testLifecycle() throws Exception { - ServerSetImpl client = createServerSet(); - client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - ServerSetImpl server = createServerSet(); - ServerSet.EndpointStatus status = server.join( - InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080)); - - ServiceInstance serviceInstance = new ServiceInstance( - new Endpoint("foo", 1234), - ImmutableMap.of("http-admin", new Endpoint("foo", 8080)), - Status.ALIVE); - - assertChangeFired(serviceInstance); - - status.leave(); - assertChangeFiredEmpty(); - assertTrue(serverSetBuffer.isEmpty()); - } - - @Test - public void testMembershipChanges() throws Exception { - ServerSetImpl client = createServerSet(); - client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - ServerSetImpl server = createServerSet(); - - ServerSet.EndpointStatus foo = join(server, "foo"); - assertChangeFired("foo"); - - expireSession(client.getZkClient()); - - ServerSet.EndpointStatus bar = join(server, "bar"); - - // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a - // change, just "foo", "bar" since this was an addition. - assertChangeFired("foo", "bar"); - - foo.leave(); - assertChangeFired("bar"); - - ServerSet.EndpointStatus baz = join(server, "baz"); - assertChangeFired("bar", "baz"); - - baz.leave(); - assertChangeFired("bar"); - - bar.leave(); - assertChangeFiredEmpty(); - - assertTrue(serverSetBuffer.isEmpty()); - } - - @Test - public void testStopMonitoring() throws Exception { - ServerSetImpl client = createServerSet(); - Command stopMonitoring = client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - ServerSetImpl server = createServerSet(); - - ServerSet.EndpointStatus foo = join(server, "foo"); - assertChangeFired("foo"); - ServerSet.EndpointStatus bar = join(server, "bar"); - assertChangeFired("foo", "bar"); - - stopMonitoring.execute(); - - // No new updates should be received since monitoring has stopped. - foo.leave(); - assertTrue(serverSetBuffer.isEmpty()); - - // Expiration event. - assertTrue(serverSetBuffer.isEmpty()); - } - - @Test - public void testOrdering() throws Exception { - ServerSetImpl client = createServerSet(); - client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080); - Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081); - Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082); - - ServerSetImpl server1 = createServerSet(); - ServerSetImpl server2 = createServerSet(); - ServerSetImpl server3 = createServerSet(); - - ServiceInstance instance1 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), - Status.ALIVE); - ServiceInstance instance2 = new ServiceInstance( - new Endpoint("foo", 1001), - ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)), - Status.ALIVE); - ServiceInstance instance3 = new ServiceInstance( - new Endpoint("foo", 1002), - ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)), - Status.ALIVE); - - server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports); - assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take())); - - ServerSet.EndpointStatus status2 = server2.join( - InetSocketAddress.createUnresolved("foo", 1001), - server2Ports); - assertEquals(ImmutableList.of(instance1, instance2), - ImmutableList.copyOf(serverSetBuffer.take())); - - server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports); - assertEquals(ImmutableList.of(instance1, instance2, instance3), - ImmutableList.copyOf(serverSetBuffer.take())); - - status2.leave(); - assertEquals(ImmutableList.of(instance1, instance3), - ImmutableList.copyOf(serverSetBuffer.take())); - } - - @Test - public void testUnwatchOnException() throws Exception { - IMocksControl control = createControl(); - - ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class); - Watcher onExpirationWatcher = control.createMock(Watcher.class); - - expect(zkClient.registerExpirationHandler(anyObject(Command.class))) - .andReturn(onExpirationWatcher); - - expect(zkClient.get()).andThrow(new InterruptedException()); // See interrupted() note below. - expect(zkClient.unregister(onExpirationWatcher)).andReturn(true); - control.replay(); - - Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla"); - ServerSetImpl serverset = new ServerSetImpl(zkClient, group); - - try { - serverset.watch(hostSet -> {}); - fail("Expected MonitorException"); - } catch (DynamicHostSet.MonitorException e) { - // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is. - // That call both returns the current interrupted status as well as clearing it. The clearing - // is crucial depending on the order tests are run in this class. If this test runs before - // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail - // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in - // the interruption mechanism and so immediately throws `InterruptedException` based on the - // un-cleared interrupted bit. - assertTrue(Thread.interrupted()); - } - control.verify(); - } - - private static Map<String, InetSocketAddress> makePortMap(String name, int port) { - return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port)); - } - - private ServerSet.EndpointStatus join(ServerSet serverSet, String host) - throws JoinException, InterruptedException { - - return serverSet.join( - InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of()); - } - - private void assertChangeFired(String... serviceHosts) - throws InterruptedException { - - assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts), - serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42), - ImmutableMap.<String, Endpoint>of(), Status.ALIVE)))); - } - - protected void assertChangeFiredEmpty() throws InterruptedException { - assertChangeFired(ImmutableSet.<ServiceInstance>of()); - } - - protected void assertChangeFired(ServiceInstance... serviceInstances) - throws InterruptedException { - assertChangeFired(ImmutableSet.copyOf(serviceInstances)); - } - - protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances) - throws InterruptedException { - assertEquals(serviceInstances, serverSetBuffer.take()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java deleted file mode 100644 index 0e67191..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; - -import com.google.common.collect.ImmutableMap; - -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class ServerSetsTest { - @Test - public void testSimpleSerialization() throws Exception { - InetSocketAddress endpoint = new InetSocketAddress(12345); - Map<String, Endpoint > additionalEndpoints = ImmutableMap.of(); - Status status = Status.ALIVE; - - byte[] data = ServerSets.serializeServiceInstance( - endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC); - - ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC); - - assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort()); - assertEquals(additionalEndpoints, instance.getAdditionalEndpoints()); - assertEquals(Status.ALIVE, instance.getStatus()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java deleted file mode 100644 index 5f6cdd8..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.zookeeper.Candidate.Leader; -import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; -import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.easymock.Capture; -import org.easymock.IExpectationSetters; -import org.easymock.IMocksControl; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.fail; - -public class SingletonServiceImplTest extends BaseZooKeeperClientTest { - private static final int PORT_A = 1234; - private static final int PORT_B = 8080; - private static final InetSocketAddress PRIMARY_ENDPOINT = - InetSocketAddress.createUnresolved("foo", PORT_A); - private static final Map<String, InetSocketAddress> AUX_ENDPOINTS = - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)); - - private IMocksControl control; - private SingletonServiceImpl.LeadershipListener listener; - private ServerSet serverSet; - private ServerSet.EndpointStatus endpointStatus; - private Candidate candidate; - private ExceptionalCommand<Group.JoinException> abdicate; - - private SingletonService service; - - @Before - @SuppressWarnings("unchecked") - public void mySetUp() throws IOException { - control = createControl(); - addTearDown(control::verify); - listener = control.createMock(SingletonServiceImpl.LeadershipListener.class); - serverSet = control.createMock(ServerSet.class); - candidate = control.createMock(Candidate.class); - endpointStatus = control.createMock(ServerSet.EndpointStatus.class); - abdicate = control.createMock(ExceptionalCommand.class); - - service = new SingletonServiceImpl(serverSet, candidate); - } - - private void newLeader( - final String hostName, - Capture<Leader> leader, - LeadershipListener listener) throws Exception { - - service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A), - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)), - listener); - - // This actually elects the leader. - leader.getValue().onElected(abdicate); - } - - private void newLeader(String hostName, Capture<Leader> leader) throws Exception { - newLeader(hostName, leader, listener); - } - - private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception { - return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS)); - } - - @Test - public void testLeadAdvertise() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().leave(); - } - - @Test - public void teatLeadLeaveNoAdvertise() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - abdicate.execute(); - - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().leave(); - } - - @Test - public void testLeadJoinFailure() throws Exception { - Capture<Leader> leaderCapture = new Capture<Leader>(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception())); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - - try { - controlCapture.getValue().advertise(); - fail("Join should have failed."); - } catch (SingletonService.AdvertiseException e) { - // Expected. - } - - controlCapture.getValue().leave(); - } - - @Test(expected = IllegalStateException.class) - public void testMultipleAdvertise() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().advertise(); - } - - @Test(expected = IllegalStateException.class) - public void testMultipleLeave() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().leave(); - controlCapture.getValue().leave(); - } - - @Test(expected = IllegalStateException.class) - public void testAdvertiseAfterLeave() throws Exception { - Capture<Leader> leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture<LeaderControl> controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().leave(); - controlCapture.getValue().advertise(); - } - - @Test - public void testLeadMulti() throws Exception { - List<Capture<Leader>> leaderCaptures = Lists.newArrayList(); - List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList(); - - for (int i = 0; i < 5; i++) { - Capture<Leader> leaderCapture = new Capture<Leader>(); - leaderCaptures.add(leaderCapture); - Capture<LeaderControl> controlCapture = createCapture(); - leaderControlCaptures.add(controlCapture); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - listener.onLeading(capture(controlCapture)); - InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A); - Map<String, InetSocketAddress> aux = - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B)); - expect(serverSet.join(primary, aux)).andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - } - - control.replay(); - - for (int i = 0; i < 5; i++) { - final String leaderName = "foo" + i; - newLeader(leaderName, leaderCaptures.get(i)); - leaderControlCaptures.get(i).getValue().advertise(); - leaderControlCaptures.get(i).getValue().leave(); - } - } - - @Test - public void testLeaderLeaves() throws Exception { - control.replay(); - shutdownNetwork(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/docs/reference/scheduler-configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md index 31be714..e6b19f0 100644 --- a/docs/reference/scheduler-configuration.md +++ b/docs/reference/scheduler-configuration.md @@ -242,7 +242,5 @@ Optional flags: Launches an embedded zookeeper server for local testing causing -zk_endpoints to be ignored if specified. -zk_session_timeout (default (4, secs)) The ZooKeeper session timeout. --zk_use_curator (default true) - DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter commons/zookeeper (the legacy library) is used. ------------------------------------------------------------------------- ``` http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java deleted file mode 100644 index 339f63b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.net.InetSocketAddress; -import java.util.List; - -import javax.inject.Singleton; - -import com.google.inject.Exposed; -import com.google.inject.PrivateModule; -import com.google.inject.Provides; - -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.zookeeper.ServerSetImpl; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.common.zookeeper.SingletonServiceImpl; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; -import org.apache.aurora.common.zookeeper.ZooKeeperUtils; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; -import org.apache.zookeeper.data.ACL; - -import static java.util.Objects.requireNonNull; - -/** - * Binding module for utilities to advertise the network presence of the scheduler. - * - * Uses a fork of Twitter commons/zookeeper. - */ -class CommonsServiceDiscoveryModule extends PrivateModule { - - private final String discoveryPath; - private final ZooKeeperConfig zooKeeperConfig; - - CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { - this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath); - this.zooKeeperConfig = requireNonNull(zooKeeperConfig); - } - - @Override - protected void configure() { - requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); - requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); - - bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class); - expose(ServiceGroupMonitor.class); - } - - @Provides - @Singleton - ZooKeeperClient provideZooKeeperClient( - @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) { - - return new ZooKeeperClient( - zooKeeperConfig.getSessionTimeout(), - zooKeeperConfig.getCredentials(), - zooKeeperConfig.getChrootPath(), - zooKeeperCluster); - } - - @Provides - @Singleton - ServerSetImpl provideServerSet( - ZooKeeperClient client, - @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) { - - return new ServerSetImpl(client, zooKeeperAcls, discoveryPath); - } - - @Provides - @Singleton - DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) { - // Used for a type re-binding of the server set. - return serverSet; - } - - // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. - @Provides - @Singleton - @Exposed - SingletonService provideSingletonService( - ZooKeeperClient client, - ServerSetImpl serverSet, - @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) { - - return new SingletonServiceImpl( - serverSet, - SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java deleted file mode 100644 index 9161455..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - -import javax.inject.Inject; - -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; - -import static java.util.Objects.requireNonNull; - -class CommonsServiceGroupMonitor implements ServiceGroupMonitor { - private Optional<Command> closeCommand = Optional.empty(); - private final DynamicHostSet<ServiceInstance> serverSet; - private final AtomicReference<ImmutableSet<ServiceInstance>> services = - new AtomicReference<>(ImmutableSet.of()); - - @Inject - CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) { - this.serverSet = requireNonNull(serverSet); - } - - @Override - public void start() throws MonitorException { - try { - closeCommand = Optional.of(serverSet.watch(services::set)); - } catch (DynamicHostSet.MonitorException e) { - throw new MonitorException("Unable to watch scheduler host set.", e); - } - } - - @Override - public void close() { - closeCommand.ifPresent(Command::execute); - } - - @Override - public ImmutableSet<ServiceInstance> get() { - return services.get(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java index 999a542..6ccfef4 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java @@ -34,7 +34,7 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.JsonCodec; import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.RetryPolicy; @@ -68,7 +68,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule { requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); - bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC); + bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(JsonCodec.INSTANCE); } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java index e8aafe4..c3a524f 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java @@ -29,21 +29,12 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.zookeeper.Credentials; import org.apache.aurora.common.zookeeper.ZooKeeperUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A factory that creates a {@link ZooKeeperConfig} instance based on command line argument * values. */ public final class FlaggedZooKeeperConfig { - private static final Logger LOG = LoggerFactory.getLogger(FlaggedZooKeeperConfig.class); - - @CmdLine(name = "zk_use_curator", - help = "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter " - + "commons/zookeeper (the legacy library) is used.") - private static final Arg<Boolean> USE_CURATOR = Arg.create(true); - @CmdLine(name = "zk_in_proc", help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints " + "to be ignored if specified.") @@ -74,11 +65,7 @@ public final class FlaggedZooKeeperConfig { * @return Configuration instance. */ public static ZooKeeperConfig create() { - if (USE_CURATOR.hasAppliedValue()) { - LOG.warn("The -zk_use_curator flag is deprecated and will be removed in a future release."); - } return new ZooKeeperConfig( - USE_CURATOR.get(), ZK_ENDPOINTS.get(), Optional.fromNullable(CHROOT_PATH.get()), IN_PROCESS.get(), http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java index 3d228da..07bce96 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java @@ -26,7 +26,6 @@ import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; import com.google.inject.Inject; -import com.google.inject.Module; import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.binder.LinkedBindingBuilder; @@ -47,7 +46,7 @@ import static java.util.Objects.requireNonNull; */ public class ServiceDiscoveryModule extends AbstractModule { - private static final Logger LOG = LoggerFactory.getLogger(CommonsServiceDiscoveryModule.class); + private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class); private final ZooKeeperConfig zooKeeperConfig; private final String discoveryPath; @@ -83,15 +82,7 @@ public class ServiceDiscoveryModule extends AbstractModule { clusterBinder.toInstance(zooKeeperConfig.getServers()); } - install(discoveryModule()); - } - - private Module discoveryModule() { - if (zooKeeperConfig.isUseCurator()) { - return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig); - } else { - return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig); - } + install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig)); } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java index 3f32a62..e1dc57e 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java @@ -35,13 +35,11 @@ public class ZooKeeperConfig { /** * Creates a new client configuration with defaults for the session timeout and credentials. * - * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used. * @param servers ZooKeeper server addresses. * @return A new configuration. */ - public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) { + public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) { return new ZooKeeperConfig( - useCurator, servers, Optional.absent(), // chrootPath false, @@ -49,7 +47,6 @@ public class ZooKeeperConfig { Optional.absent()); // credentials } - private final boolean useCurator; private final Iterable<InetSocketAddress> servers; private final boolean inProcess; private final Amount<Integer, Time> sessionTimeout; @@ -66,14 +63,12 @@ public class ZooKeeperConfig { * @param credentials ZooKeeper authentication credentials. */ ZooKeeperConfig( - boolean useCurator, Iterable<InetSocketAddress> servers, Optional<String> chrootPath, boolean inProcess, Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials) { - this.useCurator = useCurator; this.servers = MorePreconditions.checkNotBlank(servers); this.chrootPath = requireNonNull(chrootPath); this.inProcess = inProcess; @@ -90,7 +85,6 @@ public class ZooKeeperConfig { */ public ZooKeeperConfig withCredentials(Credentials newCredentials) { return new ZooKeeperConfig( - useCurator, servers, chrootPath, inProcess, @@ -98,10 +92,6 @@ public class ZooKeeperConfig { Optional.of(newCredentials)); } - boolean isUseCurator() { - return useCurator; - } - public Iterable<InetSocketAddress> getServers() { return servers; } http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 29a3b4a..1eabb89 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -17,6 +17,7 @@ import java.io.File; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -44,8 +45,6 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.ServerSetImpl; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.MaintenanceMode; @@ -107,7 +106,6 @@ import static org.easymock.EasyMock.createControl; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class SchedulerIT extends BaseZooKeeperClientTest { @@ -145,7 +143,6 @@ public class SchedulerIT extends BaseZooKeeperClientTest { private Stream logStream; private StreamMatcher streamMatcher; private EntrySerializer entrySerializer; - private ZooKeeperClient zkClient; private File backupDir; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -173,11 +170,9 @@ public class SchedulerIT extends BaseZooKeeperClientTest { entrySerializer = new EntrySerializer.EntrySerializerImpl( LogStorageModule.MAX_LOG_ENTRY_SIZE.get(), Hashing.md5()); - - zkClient = createZkClient(); } - private void startScheduler() throws Exception { + private Callable<Void> startScheduler() throws Exception { // TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using // AppLauncher. Module testModule = new AbstractModule() { @@ -202,10 +197,8 @@ public class SchedulerIT extends BaseZooKeeperClientTest { }; ZooKeeperConfig zkClientConfig = ZooKeeperConfig.create( - true, // useCurator ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort()))) .withCredentials(Credentials.digestCredentials("mesos", "mesos")); - SchedulerMain main = SchedulerMain.class.newInstance(); Injector injector = Guice.createInjector( ImmutableList.<Module>builder() .add(SchedulerMain.getUniversalModule()) @@ -215,8 +208,8 @@ public class SchedulerIT extends BaseZooKeeperClientTest { .add(testModule) .build() ); + SchedulerMain main = new SchedulerMain(); injector.injectMembers(main); - Lifecycle lifecycle = injector.getInstance(Lifecycle.class); executor.submit(() -> { try { @@ -226,28 +219,32 @@ public class SchedulerIT extends BaseZooKeeperClientTest { executor.shutdownNow(); } }); + + Lifecycle lifecycle = injector.getInstance(Lifecycle.class); addTearDown(() -> { lifecycle.shutdown(); MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); }); + injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class)) .awaitHealthy(); - } - private void awaitSchedulerReady() throws Exception { + ServiceGroupMonitor schedulerMonitor = injector.getInstance(ServiceGroupMonitor.class); + CountDownLatch schedulerReady = new CountDownLatch(1); executor.submit(() -> { - ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH); - final CountDownLatch schedulerReady = new CountDownLatch(1); - schedulerService.watch(hostSet -> { - if (!hostSet.isEmpty()) { - schedulerReady.countDown(); + while (schedulerMonitor.get().isEmpty()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }); - // A timeout is used because certain types of assertion errors (mocks) will not surface - // until the main test thread exits this body of code. - assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES)); + } + schedulerReady.countDown(); + }); + return () -> { + schedulerReady.await(); return null; - }).get(); + }; } private final AtomicInteger curPosition = new AtomicInteger(); @@ -332,14 +329,14 @@ public class SchedulerIT extends BaseZooKeeperClientTest { expect(driver.stop(true)).andReturn(Status.DRIVER_STOPPED).anyTimes(); control.replay(); - startScheduler(); + Callable<Void> awaitSchedulerReady = startScheduler(); driverStarted.await(); scheduler.getValue().registered(driver, FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(), MasterInfo.getDefaultInstance()); - awaitSchedulerReady(); + awaitSchedulerReady.call(); assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue()); assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue()); http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java deleted file mode 100644 index d90192b..0000000 --- a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.net.InetSocketAddress; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.inject.AbstractModule; -import com.google.inject.Binder; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Module; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.TearDownTestCase; -import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.common.zookeeper.ZooKeeperUtils; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; - -abstract class AbstractDiscoveryModuleTest extends TearDownTestCase { - - @Test - public void testBindingContract() { - ZooKeeperConfig zooKeeperConfig = - new ZooKeeperConfig( - isCurator(), - ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)), - Optional.of("/chroot"), - false, // inProcess - Amount.of(1, Time.DAYS), - Optional.of(Credentials.digestCredentials("test", "user"))); - - Injector injector = - Guice.createInjector( - new AbstractModule() { - @Override - protected void configure() { - bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY) - .toInstance( - ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42))); - bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY) - .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE); - - bindExtraRequirements(binder()); - } - }, - createModule("/discovery/path", zooKeeperConfig)); - - assertNotNull(injector.getBinding(SingletonService.class).getProvider().get()); - assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get()); - } - - void bindExtraRequirements(Binder binder) { - // Noop. - } - - abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig); - - abstract boolean isCurator(); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java index a2b4125..eb8c114 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java @@ -21,11 +21,10 @@ import java.util.function.Predicate; import com.google.common.collect.ImmutableMap; -import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.JsonCodec; import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.framework.CuratorFramework; @@ -38,7 +37,6 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { static final String GROUP_PATH = "/group/root"; static final String MEMBER_TOKEN = "member_"; - static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC; static final int PRIMARY_PORT = 42; private CuratorFramework client; @@ -55,7 +53,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { groupCache.getListenable().addListener((c, event) -> groupEvents.put(event)); Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN); - groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC); + groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, JsonCodec.INSTANCE); } final CuratorFramework startNewClient() { @@ -101,7 +99,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { final byte[] serialize(ServiceInstance serviceInstance) throws IOException { ByteArrayOutputStream sink = new ByteArrayOutputStream(); - CODEC.serialize(serviceInstance, sink); + JsonCodec.INSTANCE.serialize(serviceInstance, sink); return sink.toByteArray(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java deleted file mode 100644 index 7a4c4dd..0000000 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import com.google.inject.Module; - -public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest { - - @Override - Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { - return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig); - } - - @Override - boolean isCurator() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java deleted file mode 100644 index 42a2224..0000000 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; -import org.easymock.Capture; -import org.junit.Before; -import org.junit.Test; - -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class CommonsServiceGroupMonitorTest extends EasyMockTest { - - private DynamicHostSet<ServiceInstance> serverSet; - private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture; - private Command stopCommand; - - @Before - public void setUp() throws Exception { - serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); - hostChangeMonitorCapture = createCapture(); - stopCommand = createMock(Command.class); - } - - private void expectSuccessfulWatch() throws Exception { - expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand); - } - - private void expectFailedWatch() throws Exception { - DynamicHostSet.MonitorException watchError = - new DynamicHostSet.MonitorException( - "Problem watching service group", - new RuntimeException()); - expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError); - } - - @Test - public void testNominalLifecycle() throws Exception { - expectSuccessfulWatch(); - - stopCommand.execute(); - expectLastCall(); - - control.replay(); - - CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); - groupMonitor.start(); - groupMonitor.close(); - } - - @Test - public void testExceptionalLifecycle() throws Exception { - expectFailedWatch(); - control.replay(); - - CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); - try { - groupMonitor.start(); - fail(); - } catch (ServiceGroupMonitor.MonitorException e) { - // expected - } - - // Close on a non-started monitor should be allowed. - groupMonitor.close(); - } - - @Test - public void testNoHosts() throws Exception { - expectSuccessfulWatch(); - control.replay(); - - CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); - assertEquals(ImmutableSet.of(), groupMonitor.get()); - - groupMonitor.start(); - assertEquals(ImmutableSet.of(), groupMonitor.get()); - - hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of()); - assertEquals(ImmutableSet.of(), groupMonitor.get()); - } - - @Test - public void testHostUpdates() throws Exception { - expectSuccessfulWatch(); - control.replay(); - - CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); - groupMonitor.start(); - - ImmutableSet<ServiceInstance> twoHosts = - ImmutableSet.of(serviceInstance("one"), serviceInstance("two")); - hostChangeMonitorCapture.getValue().onChange(twoHosts); - assertEquals(twoHosts, groupMonitor.get()); - - ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one")); - hostChangeMonitorCapture.getValue().onChange(oneHost); - assertEquals(oneHost, groupMonitor.get()); - - ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three")); - hostChangeMonitorCapture.getValue().onChange(anotherHost); - assertEquals(anotherHost, groupMonitor.get()); - - ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of(); - hostChangeMonitorCapture.getValue().onChange(noHosts); - assertEquals(noHosts, groupMonitor.get()); - } - - private ServiceInstance serviceInstance(String hostName) { - return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java index f1a02e4..ea4570a 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java @@ -13,37 +13,31 @@ */ package org.apache.aurora.scheduler.discovery; +import java.net.InetSocketAddress; + +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.Module; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; import org.apache.aurora.common.application.ShutdownRegistry; import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.testing.TearDownTestCase; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.framework.api.ACLProvider; import org.apache.zookeeper.data.ACL; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; -public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest { - - @Override - void bindExtraRequirements(Binder binder) { - ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); - binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry); - addTearDown(shutdownRegistry::execute); - } - - @Override - Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { - return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig); - } - - @Override - boolean isCurator() { - return false; - } +public class CuratorDiscoveryModuleTest extends TearDownTestCase { @Test public void testSingleACLProvider() { @@ -64,4 +58,36 @@ public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest { public void testSingleACLProviderEmpty() { new CuratorServiceDiscoveryModule.SingleACLProvider(ImmutableList.of()); } + + @Test + public void testBindingContract() { + ZooKeeperConfig zooKeeperConfig = + new ZooKeeperConfig( + ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)), + Optional.of("/chroot"), + false, // inProcess + Amount.of(1, Time.DAYS), + Optional.of(Credentials.digestCredentials("test", "user"))); + + Injector injector = + Guice.createInjector( + new AbstractModule() { + @Override + protected void configure() { + bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY) + .toInstance( + ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42))); + bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY) + .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE); + + ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); + binder().bind(ShutdownRegistry.class).toInstance(shutdownRegistry); + addTearDown(shutdownRegistry::execute); + } + }, + new CuratorServiceDiscoveryModule("/discovery/path", zooKeeperConfig)); + + assertNotNull(injector.getBinding(SingletonService.class).getProvider().get()); + assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get()); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java index 6ea49b0..a860ede 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.aurora.common.zookeeper.JsonCodec; import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -57,7 +58,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { throws Exception { CuratorSingletonService singletonService = - new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC); + new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, JsonCodec.INSTANCE); InetSocketAddress leaderEndpoint = InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT); singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener); } http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java index a065505..d45dbb5 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java @@ -37,7 +37,6 @@ public class ZooKeeperConfigTest { @Test(expected = IllegalArgumentException.class) public void testEmptyServers() { new ZooKeeperConfig( - false, ImmutableList.of(), Optional.absent(), false, @@ -49,7 +48,6 @@ public class ZooKeeperConfigTest { public void testWithCredentials() { ZooKeeperConfig config = new ZooKeeperConfig( - false, SERVERS, Optional.absent(), false, @@ -70,9 +68,8 @@ public class ZooKeeperConfigTest { @Test public void testCreateFactory() { - ZooKeeperConfig config = ZooKeeperConfig.create(true, SERVERS); + ZooKeeperConfig config = ZooKeeperConfig.create(SERVERS); - assertTrue(config.isUseCurator()); assertEquals(SERVERS, ImmutableList.copyOf(config.getServers())); assertFalse(config.getChrootPath().isPresent()); assertFalse(config.isInProcess());
