Repository: aurora Updated Branches: refs/heads/master 06ddaadbc -> 356eeac97
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/zookeeper/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/PartitionerTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/PartitionerTest.java deleted file mode 100644 index fbc2cdb..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/PartitionerTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import com.google.common.testing.TearDown; - -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * @author John Sirois - */ -public class PartitionerTest extends BaseZooKeeperTest { - private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private static final String PARTITION_NAMESPACE = "/twitter/puffin/hosebird"; - - @Test - public void testHeterogeneousPartitionGroup() throws Exception { - ZooKeeperClient zkClient = createZkClient(); - ZooKeeperUtils.ensurePath(zkClient, ACL, PARTITION_NAMESPACE + "/not-a-partition-node"); - Partitioner partitioner = new Partitioner(zkClient, ACL, PARTITION_NAMESPACE); - join(partitioner); - - assertEquals("Expected Partitioner to be tolerant of foreign nodes", - 1, partitioner.getGroupSize()); - } - - private static class InstrumentedPartitioner extends Partitioner { - private final AtomicInteger myViewOfGroupSize = new AtomicInteger(); - - public InstrumentedPartitioner(ZooKeeperClient zkClient) throws IOException { - super(zkClient, ACL, PARTITION_NAMESPACE); - } - - @Override - Group.GroupChangeListener createGroupChangeListener(Group.Membership membership) { - final Group.GroupChangeListener listener = super.createGroupChangeListener(membership); - return new Group.GroupChangeListener() { - @Override public void onGroupChange(Iterable<String> memberIds) { - listener.onGroupChange(memberIds); - synchronized (myViewOfGroupSize) { - myViewOfGroupSize.set(getGroupSize()); - myViewOfGroupSize.notify(); - } - } - }; - } - - public void observeGroupSize(int expectedSize) throws InterruptedException { - while (expectedSize != myViewOfGroupSize.get()) { - synchronized (myViewOfGroupSize) { - myViewOfGroupSize.wait(); - } - } - } - } - - @Test - public void testJoin() throws Exception { - // Test that the 1st member of the partition group owns the whole space. - InstrumentedPartitioner firstPartitioner = new InstrumentedPartitioner(createZkClient()); - Partitioner.Partition firstPartition = join(firstPartitioner); - - assertTrue(firstPartition.isMember(0L)); - assertTrue(firstPartition.isMember(1L)); - assertTrue(firstPartition.isMember(2L)); - - // Test that when additional members join partitions are added and existing partitions shrink. - InstrumentedPartitioner secondPartitioner = new InstrumentedPartitioner(createZkClient()); - Partitioner.Partition secondPartition = join(secondPartitioner); - - firstPartitioner.observeGroupSize(2); - - assertTrue(firstPartition.isMember(0L)); - assertFalse(secondPartition.isMember(0L)); - - assertFalse(firstPartition.isMember(1L)); - assertTrue(secondPartition.isMember(1L)); - - assertTrue(firstPartition.isMember(2L)); - assertFalse(secondPartition.isMember(2L)); - - InstrumentedPartitioner thirdPartitioner = new InstrumentedPartitioner(createZkClient()); - Partitioner.Partition thirdPartition = join(thirdPartitioner); - - firstPartitioner.observeGroupSize(3); - secondPartitioner.observeGroupSize(3); - - assertTrue(firstPartition.isMember(0L)); - assertFalse(secondPartition.isMember(0L)); - assertFalse(thirdPartition.isMember(0L)); - - assertFalse(firstPartition.isMember(1L)); - assertTrue(secondPartition.isMember(1L)); - assertFalse(thirdPartition.isMember(1L)); - - assertFalse(firstPartition.isMember(2L)); - assertFalse(secondPartition.isMember(2L)); - assertTrue(thirdPartition.isMember(2L)); - - assertTrue(firstPartition.isMember(3L)); - assertFalse(secondPartition.isMember(3L)); - assertFalse(thirdPartition.isMember(3L)); - - // Test that members leaving the partition group results in the partitions being merged. - firstPartition.cancel(); - - secondPartitioner.observeGroupSize(2); - thirdPartitioner.observeGroupSize(2); - - assertTrue(secondPartition.isMember(0L)); - assertFalse(thirdPartition.isMember(0L)); - - assertFalse(secondPartition.isMember(1L)); - assertTrue(thirdPartition.isMember(1L)); - - assertTrue(secondPartition.isMember(2L)); - assertFalse(thirdPartition.isMember(2L)); - - thirdPartition.cancel(); - - secondPartitioner.observeGroupSize(1); - - assertTrue(secondPartition.isMember(0L)); - assertTrue(secondPartition.isMember(1L)); - assertTrue(secondPartition.isMember(2L)); - } - - private Partitioner.Partition join(Partitioner partitioner) throws Group.JoinException, InterruptedException { - final Partitioner.Partition partition = partitioner.join(); - addTearDown(new TearDown() { - @Override public void tearDown() throws Group.JoinException { - partition.cancel(); - } - }); - return partition; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java index 1dec34e..4db578c 100644 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java @@ -15,16 +15,10 @@ package org.apache.aurora.common.zookeeper; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.lang.Override; import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; import java.util.logging.Logger; import com.google.common.base.Function; @@ -32,39 +26,27 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.testing.TearDown; -import com.google.gson.GsonBuilder; - -import org.apache.thrift.protocol.TProtocol; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.easymock.IMocksControl; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; import org.apache.aurora.common.base.Command; import org.apache.aurora.common.io.Codec; -import org.apache.aurora.common.io.JsonCodec; import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.TResourceExhaustedException; -import org.apache.aurora.common.thrift.Thrift; -import org.apache.aurora.common.thrift.ThriftFactory; -import org.apache.aurora.common.thrift.ThriftFactory.ThriftFactoryException; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createControl; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -256,25 +238,16 @@ public class ServerSetImplTest extends BaseZooKeeperTest { assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); } - @Ignore("TODO(zmanji): Fix to work with thrift 0.9.0") @Test - public void testJsonCodecCompatibility() throws IOException { + public void testJsonCompatibility() throws IOException { ServiceInstance instance = new ServiceInstance( new Endpoint("foo", 1000), ImmutableMap.of("http", new Endpoint("foo", 8080)), Status.ALIVE).setShard(42); - ByteArrayOutputStream legacy = new ByteArrayOutputStream(); - JsonCodec.create( - ServiceInstance.class, - new GsonBuilder().setExclusionStrategies(JsonCodec.getThriftExclusionStrategy()) - .create()).serialize(instance, legacy); - ByteArrayOutputStream results = new ByteArrayOutputStream(); ServerSetImpl.createJsonCodec().serialize(instance, results); - assertEquals(legacy.toString(), results.toString()); - results = new ByteArrayOutputStream(); ServerSetImpl.createJsonCodec().serialize(instance, results); assertEquals( @@ -285,48 +258,6 @@ public class ServerSetImplTest extends BaseZooKeeperTest { results.toString()); } - //TODO(Jake Mannix) move this test method to ServerSetConnectionPoolTest, which should be renamed - // to DynamicBackendConnectionPoolTest, and refactor assertChangeFired* methods to be used both - // here and there - @Test - public void testThriftWithServerSet() throws Exception { - final AtomicReference<Socket> clientConnection = new AtomicReference<Socket>(); - final CountDownLatch connected = new CountDownLatch(1); - final ServerSocket server = new ServerSocket(0); - Thread service = new Thread(new Runnable() { - @Override public void run() { - try { - clientConnection.set(server.accept()); - } catch (IOException e) { - LOG.log(Level.WARNING, "Problem accepting a connection to thrift server", e); - } finally { - connected.countDown(); - } - } - }); - service.setDaemon(true); - service.start(); - - ServerSetImpl serverSetImpl = new ServerSetImpl(createZkClient(), SERVICE); - serverSetImpl.watch(serverSetMonitor); - assertChangeFiredEmpty(); - InetSocketAddress localSocket = new InetSocketAddress(server.getLocalPort()); - serverSetImpl.join(localSocket, Maps.<String, InetSocketAddress>newHashMap()); - assertChangeFired(ImmutableMap.<InetSocketAddress, Status>of(localSocket, Status.ALIVE)); - - Service.Iface svc = createThriftClient(serverSetImpl); - try { - String value = svc.getString(); - LOG.info("Got value: " + value + " from server"); - assertEquals(Service.Iface.DONE, value); - } catch (TResourceExhaustedException e) { - fail("ServerSet is not empty, should not throw exception here"); - } finally { - connected.await(); - server.close(); - } - } - @Test public void testUnwatchOnException() throws Exception { IMocksControl control = createControl(); @@ -356,38 +287,10 @@ public class ServerSetImplTest extends BaseZooKeeperTest { control.verify(); } - private Service.Iface createThriftClient(DynamicHostSet<ServiceInstance> serverSet) - throws ThriftFactoryException { - - final Thrift<Service.Iface> thrift = ThriftFactory.create(Service.Iface.class).build(serverSet); - addTearDown(new TearDown() { - @Override public void tearDown() { - thrift.close(); - } - }); - return thrift.create(); - } - private static Map<String, InetSocketAddress> makePortMap(String name, int port) { return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port)); } - public static class Service { - public static interface Iface { - public static final String DONE = "done"; - public String getString() throws TResourceExhaustedException; - } - - public static class Client implements Iface { - public Client(TProtocol protocol) { - assertNotNull(protocol); - } - @Override public String getString() { - return DONE; - } - } - } - private ServerSet.EndpointStatus join(ServerSet serverSet, String host) throws JoinException, InterruptedException { http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/zookeeper/StaticServerSetTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/StaticServerSetTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/StaticServerSetTest.java deleted file mode 100644 index 9c98951..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/StaticServerSetTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.junit.Before; -import org.junit.Test; - -import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; -import org.apache.aurora.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; - -public class StaticServerSetTest extends EasyMockTest { - - private static final ServiceInstance BACKEND_1 = new ServiceInstance( - new Endpoint("host_1", 12345), - ImmutableMap.of("http", new Endpoint("host_1", 80)), - Status.ALIVE); - private static final ServiceInstance BACKEND_2 = new ServiceInstance( - new Endpoint("host_2", 12346), - ImmutableMap.of("http", new Endpoint("host_1", 80)), - Status.ALIVE); - - private HostChangeMonitor<ServiceInstance> monitor; - - @Before - public void setUp() { - monitor = createMock(new Clazz<HostChangeMonitor<ServiceInstance>>() { }); - } - - @Test - public void testMonitor() throws Exception { - ImmutableSet<ServiceInstance> hosts = ImmutableSet.of(BACKEND_1, BACKEND_2); - monitor.onChange(hosts); - - control.replay(); - - ServerSet serverSet = new StaticServerSet(hosts); - serverSet.monitor(monitor); - } - - @Test - public void testMonitorEmpty() throws Exception { - ImmutableSet<ServiceInstance> hosts = ImmutableSet.of(); - monitor.onChange(hosts); - - control.replay(); - - ServerSet serverSet = new StaticServerSet(hosts); - serverSet.monitor(monitor); - } - - @Test - public void testJoin() throws Exception { - // Ensure join/update calls don't break. - ImmutableSet<ServiceInstance> hosts = ImmutableSet.of(); - - control.replay(); - - ServerSet serverSet = new StaticServerSet(hosts); - ServerSet.EndpointStatus status = serverSet.join( - InetSocketAddress.createUnresolved("host", 1000), - ImmutableMap.<String, InetSocketAddress>of(), - Status.ALIVE); - status.update(Status.DEAD); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperMapTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperMapTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperMapTest.java deleted file mode 100644 index 993ab64..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperMapTest.java +++ /dev/null @@ -1,418 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.AbstractMap.SimpleEntry; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.junit.Before; -import org.junit.Test; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.collections.Pair; -import org.apache.aurora.common.zookeeper.ZooKeeperMap.Listener; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class ZooKeeperMapTest extends BaseZooKeeperTest { - - private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private static final Function<byte[], String> BYTES_TO_STRING = - new Function<byte[], String>() { - @Override - public String apply(byte[] from) { - return new String(from); - }}; - - private static class TestListener implements ZooKeeperMap.Listener<String> { - private final BlockingQueue<Pair<String, String>> queue = - new LinkedBlockingQueue<Pair<String, String>>(); - - public Pair<String, String> waitForUpdate() throws InterruptedException { - return queue.take(); - } - - @Override - public void nodeChanged(String name, String value) { - queue.offer(Pair.of(name, value)); - } - - @Override - public void nodeRemoved(String name) { - queue.offer(Pair.of(name, (String) null)); - } - } - - private ZooKeeperClient zkClient; - private BlockingQueue<Pair<String, String>> entryChanges; - - @Before - public void mySetUp() throws Exception { - zkClient = createZkClient(); - entryChanges = new LinkedBlockingQueue<Pair<String, String>>(); - } - - @Test(expected = KeeperException.NoNodeException.class) - public void testMissingPath() throws Exception { - makeMap("/twitter/doesnt/exist"); - } - - @Test(expected = KeeperException.class) - public void testZooKeeperUnavailableAtConstruction() throws Exception { - final String parentPath = "/twitter/path"; - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - - shutdownNetwork(); // Make zk unavailable. - - makeUninitializedMap(parentPath); - } - - @Test(expected = KeeperException.class) - public void testZooKeeperUnavailableAtInit() throws Exception { - final String parentPath = "/twitter/path"; - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - ZooKeeperMap<String> zkMap = makeUninitializedMap(parentPath); - - shutdownNetwork(); // Make zk unavailable. - - zkMap.init(); - } - - @Test - public void testInitialization() throws Exception { - final String parentPath = "/twitter/path"; - final String node = "node"; - final String nodePath = parentPath + "/" + node; - final String data = "abcdefg"; - - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - ZooKeeperMap<String> zkMap = makeUninitializedMap(parentPath); - - // Map should be empty before initialization - assertTrue(zkMap.isEmpty()); - - zkMap.init(); - - // Now that we've initialized, the data should be synchronously reflected. - assertFalse(zkMap.isEmpty()); - assertEquals(1, zkMap.size()); - assertEquals(data, zkMap.get(node)); - } - - @Test - public void testEmptyStaticMap() throws Exception { - final String parentPath = "/twitter/path"; - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - Map<String, String> zkMap = makeMap(parentPath); - - assertEquals(0, zkMap.size()); - assertTrue(zkMap.isEmpty()); - } - - @Test - public void testStaticMapWithValues() throws Exception { - final String parentPath = "/twitter/path"; - final String node1 = "node1"; - final String node2 = "node2"; - final String nodePath1 = parentPath + "/" + node1; - final String nodePath2 = parentPath + "/" + node2; - final String data1 = "hello World!"; - final String data2 = "evrver232&$"; - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath1, data1.getBytes(), ACL, CreateMode.PERSISTENT); - zkClient.get().create(nodePath2, data2.getBytes(), ACL, CreateMode.PERSISTENT); - - Map<String, String> zkMap = makeMap(parentPath); - - // Test all java.util.Map operations that are implemented. - assertTrue(zkMap.containsKey(node1)); - assertTrue(zkMap.containsKey(node2)); - assertTrue(zkMap.containsValue(data1)); - assertTrue(zkMap.containsValue(data2)); - assertEquals(ImmutableSet.of(new SimpleEntry<String, String>(node1, data1), - new SimpleEntry<String, String>(node2, data2)), zkMap.entrySet()); - assertEquals(data1, zkMap.get(node1)); - assertEquals(data2, zkMap.get(node2)); - assertFalse(zkMap.isEmpty()); - assertEquals(ImmutableSet.of(node1, node2), - zkMap.keySet()); - assertEquals(2, zkMap.size()); - } - - @Test - public void testChangingChildren() throws Exception { - final String parentPath = "/twitter/path"; - final String node1 = "node1"; - final String node2 = "node2"; - final String nodePath1 = parentPath + "/" + node1; - final String nodePath2 = parentPath + "/" + node2; - final String data1 = "wefwe"; - final String data2 = "rtgrtg"; - - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath1, data1.getBytes(), ACL, CreateMode.PERSISTENT); - - Map<String, String> zkMap = makeMap(parentPath); - assertEquals(1, zkMap.size()); - assertEquals(data1, zkMap.get(node1)); - assertEquals(null, zkMap.get(node2)); - - // Make sure the map is updated when a child is added. - zkClient.get().create(nodePath2, data2.getBytes(), ACL, CreateMode.PERSISTENT); - waitForEntryChange(node2, data2); - assertEquals(2, zkMap.size()); - assertEquals(data1, zkMap.get(node1)); - assertEquals(data2, zkMap.get(node2)); - - // Make sure the map is updated when a child is deleted. - zkClient.get().delete(nodePath1, -1); - waitForEntryChange(node1, null); - assertEquals(1, zkMap.size()); - assertEquals(null, zkMap.get(node1)); - assertEquals(data2, zkMap.get(node2)); - } - - @Test - public void testChangingChildValues() throws Exception { - final String parentPath = "/twitter/path"; - final String node = "node"; - final String nodePath = parentPath + "/" + node; - - final String data1 = ""; - final String data2 = "abc"; - final String data3 = "lalala"; - - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath, data1.getBytes(), ACL, CreateMode.PERSISTENT); - - TestListener testListener = new TestListener(); - Map<String, String> zkMap = makeMap(parentPath, testListener); - - assertEquals(Pair.of(node, data1), testListener.waitForUpdate()); - - assertEquals(1, zkMap.size()); - assertEquals(data1, zkMap.get(node)); - - zkClient.get().setData(nodePath, data2.getBytes(), -1); - waitForEntryChange(node, data2); - assertEquals(1, zkMap.size()); - - assertEquals(Pair.of(node, data2), testListener.waitForUpdate()); - - zkClient.get().setData(nodePath, data3.getBytes(), -1); - waitForEntryChange(node, data3); - assertEquals(1, zkMap.size()); - - assertEquals(Pair.of(node, data3), testListener.waitForUpdate()); - } - - @Test - public void testRemoveParentNode() throws Exception { - final String parentPath = "/twitter/path"; - final String node = "node"; - final String nodePath = parentPath + "/" + node; - final String data = "testdata"; - - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - - TestListener testListener = new TestListener(); - Map<String, String> zkMap = makeMap(parentPath, testListener); - assertEquals(1, zkMap.size()); - assertEquals(data, zkMap.get(node)); - - assertEquals(Pair.of(node, data), testListener.waitForUpdate()); - - zkClient.get().delete(nodePath, -1); - zkClient.get().delete(parentPath, -1); - - assertEquals(Pair.of(node, null), testListener.waitForUpdate()); - - waitForEntryChange(node, null); - assertEquals(0, zkMap.size()); - assertTrue(zkMap.isEmpty()); - - // Recreate our node, make sure the map observes it. - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - waitForEntryChange(node, data); - } - - @Test - public void testSessionExpireLogic() throws Exception { - final String parentPath = "/twitter/path"; - final String node1 = "node1"; - final String nodePath1 = parentPath + "/" + node1; - final String data1 = "testdata"; - - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath1, data1.getBytes(), ACL, CreateMode.PERSISTENT); - - Map<String, String> zkMap = makeMap(parentPath); - assertEquals(1, zkMap.size()); - assertEquals(data1, zkMap.get(node1)); - - expireSession(zkClient); - assertEquals(1, zkMap.size()); - assertEquals(data1, zkMap.get(node1)); - - final String node2 = "node2"; - final String nodePath2 = parentPath + "/" + node2; - final String data2 = "testdata2"; - zkClient = createZkClient(); - zkClient.get().create(nodePath2, data2.getBytes(), ACL, CreateMode.PERSISTENT); - - waitForEntryChange(node2, data2); - assertEquals(2, zkMap.size()); - assertEquals(data2, zkMap.get(node2)); - } - - @Test - public void testStaticCreate() throws Exception { - String parentPath = "/twitter/path"; - String node = "node"; - String nodePath = parentPath + "/" + node; - String data = "DaTa"; - - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - - Map<String, String> zkMap = ZooKeeperMap.create(zkClient, parentPath, BYTES_TO_STRING); - assertEquals(1, zkMap.size()); - assertEquals(data, zkMap.get(node)); - } - - private static void checkUnsupported(Command test) { - try { - test.execute(); - fail("Expected UnsupportedOperationException to be thrown."); - } catch (UnsupportedOperationException e) { - // expected - } - } - - @Test - public void testReadOnly() throws Exception { - String parentPath = "/twitter/path"; - final String node = "node"; - String nodePath = parentPath + "/" + node; - String data = "DaTa"; - - ZooKeeperUtils.ensurePath(zkClient, ACL, parentPath); - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - - final Map<String, String> zkMap = ZooKeeperMap.create(zkClient, parentPath, BYTES_TO_STRING); - checkUnsupported(new Command() { - @Override public void execute() { - zkMap.clear(); - } - }); - checkUnsupported(new Command() { - @Override public void execute() { - zkMap.remove(node); - } - }); - checkUnsupported(new Command() { - @Override public void execute() { - zkMap.put("othernode", "othervalue"); - } - }); - checkUnsupported(new Command() { - @Override public void execute() { - zkMap.putAll(ImmutableMap.of("othernode", "othervalue")); - } - }); - checkUnsupported(new Command() { - @Override public void execute() { - zkMap.keySet().iterator().remove(); - } - }); - checkUnsupported(new Command() { - @Override public void execute() { - zkMap.values().iterator().remove(); - } - }); - checkUnsupported(new Command() { - @Override public void execute() { - zkMap.entrySet().iterator().remove(); - } - }); - - // Ensure contents didn't change - assertEquals(1, zkMap.size()); - assertEquals(data, zkMap.get(node)); - } - - private void waitForEntryChange(String key, String value) throws Exception { - Pair<String, String> expectedEntry = Pair.of(key, value); - while (true) { - Pair<String, String> nextEntry = entryChanges.take(); - if (expectedEntry.equals(nextEntry)) { - return; - } - } - } - - private Map<String, String> makeMap(String path) throws Exception { - return makeMap(path, ZooKeeperMap.<String>noopListener()); - } - - private Map<String, String> makeMap(String path, ZooKeeperMap.Listener<String> listener) - throws Exception { - - ZooKeeperMap<String> zkMap = makeUninitializedMap(path, listener); - zkMap.init(); - return zkMap; - } - - private ZooKeeperMap<String> makeUninitializedMap(String path) throws Exception { - return makeUninitializedMap(path, ZooKeeperMap.<String>noopListener()); - } - - private ZooKeeperMap<String> makeUninitializedMap(String path, Listener<String> listener) - throws Exception { - - return new ZooKeeperMap<String>(zkClient, path, BYTES_TO_STRING, listener) { - @Override void putEntry(String key, String value) { - super.putEntry(key, value); - recordEntryChange(key); - } - - @Override void removeEntry(String key) { - super.removeEntry(key); - recordEntryChange(key); - } - - private void recordEntryChange(String key) { - entryChanges.offer(Pair.of(key, get(key))); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperNodeTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperNodeTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperNodeTest.java deleted file mode 100644 index fd1bb22..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperNodeTest.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.easymock.Capture; -import org.junit.Before; -import org.junit.Test; - -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.zookeeper.ZooKeeperNode.NodeDeserializer; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; - -import static org.easymock.EasyMock.aryEq; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; - -public class ZooKeeperNodeTest { - public static class LightWeightTests extends EasyMockTest { - private ZooKeeperClient zooKeeperClient; - private ZooKeeper zk; - private NodeDeserializer<String> deserializer; - private Closure<String> dataUpdateListener; - private ZooKeeperNode<String> node; - - @Before - public void setUp() { - zooKeeperClient = createMock(ZooKeeperClient.class); - zk = createMock(ZooKeeper.class); - deserializer = createMock(new Clazz<NodeDeserializer<String>>() { }); - dataUpdateListener = createMock(new Clazz<Closure<String>>() { }); - node = new ZooKeeperNode<String>(zooKeeperClient, "/foo", deserializer, dataUpdateListener); - } - - @Test - public void testWatchersReused() throws Exception { - // 1st init with initial no node exception - expect(zooKeeperClient.registerExpirationHandler(isA(Command.class))).andReturn(null); - expect(zooKeeperClient.get()).andReturn(zk); - Capture<Watcher> dataWatcher1 = createCapture(); - expect(zk.getData(eq("/foo"), capture(dataWatcher1), isA(Stat.class))) - .andThrow(new NoNodeException()); // Force an existence watch to be set - dataUpdateListener.execute(null); - - expect(zooKeeperClient.get()).andReturn(zk); - Capture<Watcher> existenceWatcher1 = createCapture(); - expect(zk.exists(eq("/foo"), capture(existenceWatcher1))).andReturn(new Stat()); - - expect(zooKeeperClient.get()).andReturn(zk); - Capture<Watcher> dataWatcher2 = createCapture(); - expect(zk.getData(eq("/foo"), capture(dataWatcher2), isA(Stat.class))) - .andReturn("bob".getBytes()); - expect(deserializer.deserialize(aryEq("bob".getBytes()), isA(Stat.class))).andReturn("fred"); - dataUpdateListener.execute("fred"); - - // 2nd init with initial no node exception - expect(zooKeeperClient.registerExpirationHandler(isA(Command.class))).andReturn(null); - expect(zooKeeperClient.get()).andReturn(zk); - Capture<Watcher> dataWatcher3 = createCapture(); - expect(zk.getData(eq("/foo"), capture(dataWatcher3), isA(Stat.class))) - .andThrow(new NoNodeException()); // Force an existence watch to be set - dataUpdateListener.execute(null); - - expect(zooKeeperClient.get()).andReturn(zk); - Capture<Watcher> existenceWatcher2 = createCapture(); - expect(zk.exists(eq("/foo"), capture(existenceWatcher2))).andReturn(new Stat()); - - expect(zooKeeperClient.get()).andReturn(zk); - Capture<Watcher> dataWatcher4 = createCapture(); - expect(zk.getData(eq("/foo"), capture(dataWatcher4), isA(Stat.class))) - .andReturn("bip".getBytes()); - expect(deserializer.deserialize(aryEq("bip".getBytes()), isA(Stat.class))).andReturn("frog"); - dataUpdateListener.execute("frog"); - - control.replay(); - - node.init(); - node.init(); - - assertSame(dataWatcher1.getValue(), dataWatcher2.getValue()); - assertSame(dataWatcher2.getValue(), dataWatcher3.getValue()); - assertSame(dataWatcher3.getValue(), dataWatcher4.getValue()); - - assertSame(existenceWatcher1.getValue(), existenceWatcher2.getValue()); - } - } - - public static class HeavyWeightTests extends BaseZooKeeperTest { - - private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; - - private ZooKeeperClient zkClient; - - private static class Listener<T> implements Closure<T> { - // We use AtomicReference as a wrapper since LinkedBlockingQueue does not allow null values. - private final BlockingQueue<AtomicReference<T>> queue = - new LinkedBlockingQueue<AtomicReference<T>>(); - - public void execute(T item) { - queue.offer(new AtomicReference<T>(item)); - } - - public T waitForUpdate() throws InterruptedException { - return queue.take().get(); - } - } - - private String nodePath; - - @Before - public void mySetUp() throws Exception { - zkClient = createZkClient(); - ZooKeeperUtils.ensurePath(zkClient, ACL, "/twitter"); - nodePath = "/twitter/node"; - } - - @Test - public void testZooKeeperUnavailableAtConstruction() throws Exception { - shutdownNetwork(); // Make zk unavailable. - - // Should be fine. - makeUninitializedNode(nodePath, Closures.<String>noop()); - } - - @Test(expected = KeeperException.class) - public void testZooKeeperUnavailableAtInit() throws Exception { - ZooKeeperNode zkNode = makeUninitializedNode(nodePath, Closures.<String>noop()); - - shutdownNetwork(); // Make zk unavailable. - - zkNode.init(); - } - - @Test - public void testInitialization() throws Exception { - String data = "abcdefg"; - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - ZooKeeperNode zkNode = makeUninitializedNode(nodePath, Closures.<String>noop()); - - // get() should return null before initialization - assertEquals(null, zkNode.get()); - - zkNode.init(); - - // Now that init has been called, the data should be synchronously reflected. - assertEquals(data, zkNode.get()); - } - - @Test - public void testInitialEmptyNode() throws Exception { - Listener<String> listener = new Listener<String>(); - ZooKeeperNode<String> zkNode = makeUninitializedNode(nodePath, listener); - - assertEquals(null, zkNode.get()); - zkNode.init(); - assertEquals(null, listener.waitForUpdate()); - - String data = "abcdefg"; - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - assertEquals(data, listener.waitForUpdate()); - } - - @Test - public void testChangingData() throws Exception { - String data1 = "test_data"; - zkClient.get().create(nodePath, data1.getBytes(), ACL, CreateMode.PERSISTENT); - Listener<String> listener = new Listener<String>(); - TestDeserializer deserializer = new TestDeserializer(); - makeNode(deserializer, nodePath, listener); - - assertEquals(data1, listener.waitForUpdate()); - assertNotNull(deserializer.getStat()); - assertEquals(0, deserializer.getStat().getVersion()); - String data2 = "BLAH"; - zkClient.get().setData(nodePath, data2.getBytes(), -1); - assertEquals(data2, listener.waitForUpdate()); - assertEquals(1, deserializer.getStat().getVersion()); - } - - @Test - public void testRemoveNode() throws Exception { - String data = "testdata"; - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - Listener<String> listener = new Listener<String>(); - TestDeserializer deserializer = new TestDeserializer(); - makeNode(deserializer, nodePath, listener); - - assertEquals(data, listener.waitForUpdate()); - assertNotNull(deserializer.getStat()); - assertEquals(0, deserializer.getStat().getVersion()); - - zkClient.get().delete(nodePath, -1); - assertEquals(null, listener.waitForUpdate()); - assertEquals(0, deserializer.getStat().getVersion()); - - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - assertEquals(data, listener.waitForUpdate()); - assertEquals(0, deserializer.getStat().getVersion()); - } - - @Test - public void testSessionExpireLogic() throws Exception { - String data1 = "testdata"; - zkClient.get().create(nodePath, data1.getBytes(), ACL, CreateMode.PERSISTENT); - Listener<String> listener = new Listener<String>(); - TestDeserializer deserializer = new TestDeserializer(); - makeNode(deserializer, nodePath, listener); - - assertEquals(data1, listener.waitForUpdate()); - assertNotNull(deserializer.getStat()); - assertEquals(0, deserializer.getStat().getVersion()); - - expireSession(zkClient); - assertEquals(data1, listener.waitForUpdate()); - - String data2 = "avewf"; - zkClient = createZkClient(); - zkClient.get().setData(nodePath, data2.getBytes(), -1); - assertEquals(data2, listener.waitForUpdate()); - assertEquals(1, deserializer.getStat().getVersion()); - } - - @Test - public void testStaticCreate() throws Exception { - String data = "stuff"; - zkClient.get().create(nodePath, data.getBytes(), ACL, CreateMode.PERSISTENT); - ZooKeeperNode<String> zkNode = ZooKeeperNode.create(zkClient, nodePath, new TestDeserializer()); - assertEquals(data, zkNode.get()); - } - - private ZooKeeperNode<String> makeNode(TestDeserializer deserializer, String path, - Closure<String> listener) throws Exception { - ZooKeeperNode<String> zkNode = makeUninitializedNode(deserializer, path, listener); - zkNode.init(); - return zkNode; - } - - private ZooKeeperNode<String> makeUninitializedNode(String path, Closure<String> listener) - throws Exception { - return makeUninitializedNode(new TestDeserializer(), path, listener); - } - - private ZooKeeperNode<String> makeUninitializedNode( - ZooKeeperNode.NodeDeserializer<String> deserializer, String path, Closure<String> listener) - throws Exception { - // we test deserializertionWithPair primarily because it is deserializertionally a proper - // superset of deserializertionWithByteArray - return new ZooKeeperNode<String>(zkClient, path, deserializer, listener); - } - - // helper to test Stat population and retrieval - private static final class TestDeserializer implements ZooKeeperNode.NodeDeserializer<String> { - private Stat stat = null; - - @Override - public String deserialize(byte[] data, Stat stat) { - this.stat = stat; - return new String(data); - } - - Stat getStat() { - return stat; - } - } - } -}
