http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java deleted file mode 100644 index 98d36b6..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import java.util.Collection; -import java.util.Arrays; - -@RunWith(Parameterized.class) -public class TestHedwigHubSSL extends TestHedwigHub { - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { { true }, { false } }); - } - - public TestHedwigHubSSL(boolean isSubscriptionChannelSharingEnabled) { - super(Mode.SSL, isSubscriptionChannelSharingEnabled); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java deleted file mode 100644 index bce41e5..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java +++ /dev/null @@ -1,304 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.integration; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.SynchronousQueue; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.server.HedwigRegionTestBase; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.integration.TestHedwigHub.TestCallback; -import org.apache.hedwig.server.integration.TestHedwigHub.TestMessageHandler; -import org.apache.hedwig.util.HedwigSocketAddress; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestHedwigRegion extends HedwigRegionTestBase { - - // SynchronousQueues to verify async calls - private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>(); - private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>(); - - private static final int TEST_RETRY_REMOTE_SUBSCRIBE_INTERVAL_VALUE = 3000; - - protected class NewRegionServerConfiguration extends RegionServerConfiguration { - - public NewRegionServerConfiguration(int serverPort, int sslServerPort, - String regionName) { - super(serverPort, sslServerPort, regionName); - } - - @Override - public int getRetryRemoteSubscribeThreadRunInterval() { - return TEST_RETRY_REMOTE_SUBSCRIBE_INTERVAL_VALUE; - } - - } - - protected class NewRegionClientConfiguration extends ClientConfiguration { - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return isSubscriptionChannelSharingEnabled; - } - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return regionHubAddresses.get(0).get(0); - } - } - - protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort, String regionName) { - return new NewRegionServerConfiguration(serverPort, sslServerPort, regionName); - } - - protected ClientConfiguration getRegionClientConfiguration() { - return new NewRegionClientConfiguration(); - } - - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { { false }, { true } }); - } - - protected boolean isSubscriptionChannelSharingEnabled; - - public TestHedwigRegion(boolean isSubscriptionChannelSharingEnabled) { - this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled; - } - - @Override - @Before - public void setUp() throws Exception { - numRegions = 3; - numServersPerRegion = 4; - super.setUp(); - } - - @Test(timeout=60000) - public void testMultiRegionSubscribeAndConsume() throws Exception { - int batchSize = 10; - // Subscribe to topics for clients in all regions - for (HedwigClient client : regionClientsMap.values()) { - for (int i = 0; i < batchSize; i++) { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), opts, new TestCallback(queue), null); - assertTrue(queue.take()); - } - } - - // Start delivery for the local subscribers in all regions - for (HedwigClient client : regionClientsMap.values()) { - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue)); - } - } - - // Now start publishing messages for the subscribed topics in one of the - // regions and verify that it gets delivered and consumed in all of the - // other ones. - Publisher publisher = regionClientsMap.values().iterator().next().getPublisher(); - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), Message.newBuilder().setBody( - ByteString.copyFromUtf8("Message" + i)).build(), new TestCallback(queue), null); - assertTrue(queue.take()); - } - // Make sure each region consumes the same set of published messages. - for (int i = 0; i < regionClientsMap.size(); i++) { - for (int j = 0; j < batchSize; j++) { - assertTrue(consumeQueue.take()); - } - } - } - - /** - * Test region shuts down when first subscription. - * - * @throws Exception - */ - @Test(timeout=60000) - public void testSubscribeAndConsumeWhenARegionDown() throws Exception { - int batchSize = 10; - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - // first shut down a region - Random r = new Random(); - int regionId = r.nextInt(numRegions); - stopRegion(regionId); - // subscribe to topics when a region shuts down - for (HedwigClient client : regionClientsMap.values()) { - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), - opts, new TestCallback(queue), null); - assertFalse(queue.take()); - } - } - - // start region gain - startRegion(regionId); - - // sub it again - for (Map.Entry<String, HedwigClient> entry : regionClientsMap.entrySet()) { - HedwigClient client = entry.getValue(); - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), - opts, new TestCallback(queue), null); - assertTrue(queue.take()); - } - } - - // Start delivery for local subscribers in all regions - for (Map.Entry<String, HedwigClient> entry : regionClientsMap.entrySet()) { - HedwigClient client = entry.getValue(); - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue)); - } - } - - // Now start publishing messages for the subscribed topics in one of the - // regions and verify that it gets delivered and consumed in all of the - // other ones. - int rid = r.nextInt(numRegions); - String regionName = REGION_PREFIX + rid; - Publisher publisher = regionClientsMap.get(regionName).getPublisher(); - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), Message.newBuilder().setBody( - ByteString.copyFromUtf8(regionName + "-Message" + i)).build(), new TestCallback(queue), null); - assertTrue(queue.take()); - } - // Make sure each region consumes the same set of published messages. - for (int i = 0; i < regionClientsMap.size(); i++) { - for (int j = 0; j < batchSize; j++) { - assertTrue(consumeQueue.take()); - } - } - } - - /** - * Test region shuts down when attaching existing subscriptions. - * - * @throws Exception - */ - @Test(timeout=60000) - public void testAttachExistingSubscriptionsWhenARegionDown() throws Exception { - int batchSize = 10; - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - // sub it remotely to make subscriptions existed - for (Map.Entry<String, HedwigClient> entry : regionClientsMap.entrySet()) { - HedwigClient client = entry.getValue(); - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), - opts, new TestCallback(queue), null); - assertTrue(queue.take()); - } - } - - // stop regions - for (int i=0; i<numRegions; i++) { - stopRegion(i); - } - // start regions again - for (int i=0; i<numRegions; i++) { - startRegion(i); - } - - // first shut down a region - Random r = new Random(); - int regionId = r.nextInt(numRegions); - stopRegion(regionId); - // subscribe to topics when a region shuts down - // it should succeed since the subscriptions existed before - for (HedwigClient client : regionClientsMap.values()) { - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), - opts, new TestCallback(queue), null); - assertTrue(queue.take()); - } - } - - // Start delivery for local subscribers in all regions - for (Map.Entry<String, HedwigClient> entry : regionClientsMap.entrySet()) { - HedwigClient client = entry.getValue(); - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue)); - } - } - - // start region again - startRegion(regionId); - // wait for retry - Thread.sleep(3 * TEST_RETRY_REMOTE_SUBSCRIBE_INTERVAL_VALUE); - - String regionName = REGION_PREFIX + regionId; - HedwigClient client = regionClientsMap.get(regionName); - for (int i = 0; i < batchSize; i++) { - client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), - opts, new TestCallback(queue), null); - assertTrue(queue.take()); - client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i), - ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue)); - } - - // Now start publishing messages for the subscribed topics in one of the - // regions and verify that it gets delivered and consumed in all of the - // other ones. - Publisher publisher = client.getPublisher(); - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), Message.newBuilder().setBody( - ByteString.copyFromUtf8(regionName + "-Message" + i)).build(), new TestCallback(queue), null); - assertTrue(queue.take()); - } - // Make sure each region consumes the same set of published messages. - for (int i = 0; i < regionClientsMap.size(); i++) { - for (int j = 0; j < batchSize; j++) { - assertTrue(consumeQueue.take()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java deleted file mode 100644 index 644500d..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.hedwig.server.meta; - -import java.util.Arrays; -import java.util.Collection; - -import org.apache.bookkeeper.metastore.InMemoryMetaStore; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.meta.ZkMetadataManagerFactory; -import org.apache.hedwig.zookeeper.ZooKeeperTestBase; - -import org.junit.After; -import org.junit.Before; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(Parameterized.class) -public abstract class MetadataManagerFactoryTestCase extends ZooKeeperTestBase { - private static final Logger LOG = LoggerFactory.getLogger(MetadataManagerFactoryTestCase.class); - - protected MetadataManagerFactory metadataManagerFactory; - protected ServerConfiguration conf; - - public MetadataManagerFactoryTestCase(String metadataManagerFactoryCls) { - super(); - conf = new ServerConfiguration(); - conf.setMetadataManagerFactoryName(metadataManagerFactoryCls); - conf.getConf().setProperty("metastore_impl_class", InMemoryMetaStore.class.getName()); - InMemoryMetaStore.reset(); - } - - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { - { ZkMetadataManagerFactory.class.getName() }, - { MsMetadataManagerFactory.class.getName() }, - }); - } - - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - metadataManagerFactory = MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - } - - @After - @Override - public void tearDown() throws Exception { - metadataManagerFactory.shutdown(); - super.tearDown(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java deleted file mode 100644 index 7e15135..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.hedwig.server.meta; - -import java.io.IOException; - -import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.zookeeper.ZooKeeperTestBase; -import org.apache.hedwig.zookeeper.ZkUtils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; - -import org.junit.Test; -import org.junit.Assert; - -public class TestFactoryLayout extends ZooKeeperTestBase { - - @Test(timeout=60000) - public void testFactoryLayout() throws Exception { - ServerConfiguration conf = new ServerConfiguration(); - conf.setMetadataManagerFactoryName( - "org.apache.hedwig.server.meta.ZkMetadataManager"); - - FactoryLayout layout = FactoryLayout.readLayout(zk, conf); - Assert.assertTrue("Layout should be null", layout == null); - - String testName = "foobar"; - int testVersion = 0xdeadbeef; - // use layout defined in configuration also create it in zookeeper - writeFactoryLayout(conf, testName, testVersion); - - layout = FactoryLayout.readLayout(zk, conf); - Assert.assertEquals(testName, layout.getManagerMeta().getManagerImpl()); - Assert.assertEquals(testVersion, layout.getManagerMeta().getManagerVersion()); - } - - private void writeFactoryLayout(ServerConfiguration conf, String managerCls, - int managerVersion) - throws Exception { - ManagerMeta managerMeta = ManagerMeta.newBuilder() - .setManagerImpl(managerCls) - .setManagerVersion(managerVersion) - .build(); - FactoryLayout layout = new FactoryLayout(managerMeta); - layout.store(zk, conf); - } - - @Test(timeout=60000) - public void testCorruptedFactoryLayout() throws Exception { - ServerConfiguration conf = new ServerConfiguration(); - StringBuilder msb = new StringBuilder(); - String factoryLayoutPath = FactoryLayout.getFactoryLayoutPath(msb, conf); - // write corrupted manager layout - ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, "BadLayout".getBytes(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - try { - FactoryLayout.readLayout(zk, conf); - Assert.fail("Shouldn't reach here!"); - } catch (IOException ie) { - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java deleted file mode 100644 index 7e395e9..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java +++ /dev/null @@ -1,365 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.hedwig.server.meta; - -import java.util.Map; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.hedwig.StubCallback; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState; -import org.apache.hedwig.server.topics.HubInfo; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.util.HedwigSocketAddress; - -import org.junit.Test; -import org.junit.Assert; - -public class TestMetadataManager extends MetadataManagerFactoryTestCase { - - public TestMetadataManager(String metadataManagerFactoryCls) { - super(metadataManagerFactoryCls); - } - - @Test(timeout=60000) - public void testOwnerInfo() throws Exception { - TopicOwnershipManager toManager = metadataManagerFactory.newTopicOwnershipManager(); - - ByteString topic = ByteString.copyFromUtf8("testOwnerInfo"); - StubCallback<Versioned<HubInfo>> readCallback = new StubCallback<Versioned<HubInfo>>(); - StubCallback<Version> writeCallback = new StubCallback<Version>(); - StubCallback<Void> deleteCallback = new StubCallback<Void>(); - - Either<Version, PubSubException> res; - HubInfo owner = new HubInfo(new HedwigSocketAddress("127.0.0.1", 8008), 999); - - // Write non-existed owner info - toManager.writeOwnerInfo(topic, owner, Version.NEW, writeCallback, null); - res = writeCallback.queue.take(); - Assert.assertEquals(null, res.right()); - Version v1 = res.left(); - - // read owner info - toManager.readOwnerInfo(topic, readCallback, null); - Versioned<HubInfo> hubInfo = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v1.compare(hubInfo.getVersion())); - Assert.assertEquals(owner, hubInfo.getValue()); - - HubInfo newOwner = new HubInfo(new HedwigSocketAddress("127.0.0.1", 8008), 1000); - - // write exsited owner info with null version - toManager.writeOwnerInfo(topic, newOwner, Version.NEW, writeCallback, null); - res = writeCallback.queue.take(); - Assert.assertNotNull(res.right()); - Assert.assertTrue(res.right() instanceof PubSubException.TopicOwnerInfoExistsException); - - // write existed owner info with right version - toManager.writeOwnerInfo(topic, newOwner, v1, writeCallback, null); - res = writeCallback.queue.take(); - Assert.assertEquals(null, res.right()); - Version v2 = res.left(); - Assert.assertEquals(Version.Occurred.AFTER, v2.compare(v1)); - - // read owner info - toManager.readOwnerInfo(topic, readCallback, null); - hubInfo = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(hubInfo.getVersion())); - Assert.assertEquals(newOwner, hubInfo.getValue()); - - HubInfo newOwner2 = new HubInfo(new HedwigSocketAddress("127.0.0.1", 8008), 1001); - - // write existed owner info with bad version - toManager.writeOwnerInfo(topic, newOwner2, v1, - writeCallback, null); - res = writeCallback.queue.take(); - Assert.assertNotNull(res.right()); - Assert.assertTrue(res.right() instanceof PubSubException.BadVersionException); - - // read owner info - toManager.readOwnerInfo(topic, readCallback, null); - hubInfo = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(hubInfo.getVersion())); - Assert.assertEquals(newOwner, hubInfo.getValue()); - - // delete existed owner info with bad version - toManager.deleteOwnerInfo(topic, v1, deleteCallback, null); - Assert.assertTrue(deleteCallback.queue.take().right() instanceof - PubSubException.BadVersionException); - - // read owner info - toManager.readOwnerInfo(topic, readCallback, null); - hubInfo = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(hubInfo.getVersion())); - - // delete existed owner info with right version - toManager.deleteOwnerInfo(topic, v2, deleteCallback, null); - Assert.assertEquals(null, deleteCallback.queue.take().right()); - - // Empty owner info - toManager.readOwnerInfo(topic, readCallback, null); - Assert.assertEquals(null, readCallback.queue.take().left()); - - // delete non-existed owner info - toManager.deleteOwnerInfo(topic, Version.ANY, deleteCallback, null); - Assert.assertTrue(deleteCallback.queue.take().right() instanceof - PubSubException.NoTopicOwnerInfoException); - - toManager.close(); - } - - @Test(timeout=60000) - public void testPersistenceInfo() throws Exception { - TopicPersistenceManager tpManager = metadataManagerFactory.newTopicPersistenceManager(); - - ByteString topic = ByteString.copyFromUtf8("testPersistenceInfo"); - StubCallback<Versioned<LedgerRanges>> readCallback = new StubCallback<Versioned<LedgerRanges>>(); - StubCallback<Version> writeCallback = new StubCallback<Version>(); - StubCallback<Void> deleteCallback = new StubCallback<Void>(); - - // Write non-existed persistence info - tpManager.writeTopicPersistenceInfo(topic, LedgerRanges.getDefaultInstance(), - Version.NEW, writeCallback, null); - Either<Version, PubSubException> res = writeCallback.queue.take(); - Assert.assertEquals(null, res.right()); - Version v1 = res.left(); - - // read persistence info - tpManager.readTopicPersistenceInfo(topic, readCallback, null); - Versioned<LedgerRanges> ranges = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v1.compare(ranges.getVersion())); - Assert.assertEquals(LedgerRanges.getDefaultInstance(), ranges.getValue()); - - LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(1).build(); - LedgerRanges.Builder builder = LedgerRanges.newBuilder(); - builder.addRanges(lastRange); - LedgerRanges newRanges = builder.build(); - - // write existed persistence info with null version - tpManager.writeTopicPersistenceInfo(topic, newRanges, Version.NEW, - writeCallback, null); - res = writeCallback.queue.take(); - Assert.assertNotNull(res.right()); - Assert.assertTrue(res.right() instanceof PubSubException.TopicPersistenceInfoExistsException); - - // write existed persistence info with right version - tpManager.writeTopicPersistenceInfo(topic, newRanges, v1, - writeCallback, null); - res = writeCallback.queue.take(); - Assert.assertEquals(null, res.right()); - Version v2 = res.left(); - Assert.assertEquals(Version.Occurred.AFTER, v2.compare(v1)); - - // read persistence info - tpManager.readTopicPersistenceInfo(topic, readCallback, null); - ranges = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(ranges.getVersion())); - Assert.assertEquals(newRanges, ranges.getValue()); - - lastRange = LedgerRange.newBuilder().setLedgerId(2).build(); - builder = LedgerRanges.newBuilder(); - builder.addRanges(lastRange); - LedgerRanges newRanges2 = builder.build(); - - // write existed persistence info with bad version - tpManager.writeTopicPersistenceInfo(topic, newRanges2, v1, - writeCallback, null); - res = writeCallback.queue.take(); - Assert.assertNotNull(res.right()); - Assert.assertTrue(res.right() instanceof PubSubException.BadVersionException); - - // read persistence info - tpManager.readTopicPersistenceInfo(topic, readCallback, null); - ranges = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(ranges.getVersion())); - Assert.assertEquals(newRanges, ranges.getValue()); - - // delete with bad version - tpManager.deleteTopicPersistenceInfo(topic, v1, deleteCallback, null); - Assert.assertTrue(deleteCallback.queue.take().right() instanceof - PubSubException.BadVersionException); - - // read persistence info - tpManager.readTopicPersistenceInfo(topic, readCallback, null); - ranges = readCallback.queue.take().left(); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(ranges.getVersion())); - Assert.assertEquals(newRanges, ranges.getValue()); - - // delete existed persistence info with right version - tpManager.deleteTopicPersistenceInfo(topic, v2, deleteCallback, null); - Assert.assertEquals(null, deleteCallback.queue.take().right()); - - // read empty persistence info - tpManager.readTopicPersistenceInfo(topic, readCallback, null); - Assert.assertEquals(null, readCallback.queue.take().left()); - - // delete non-existed persistence info - tpManager.deleteTopicPersistenceInfo(topic, Version.ANY, deleteCallback, null); - Assert.assertTrue(deleteCallback.queue.take().right() instanceof - PubSubException.NoTopicPersistenceInfoException); - - tpManager.close(); - } - - @Test(timeout=60000) - public void testSubscriptionData() throws Exception { - SubscriptionDataManager subManager = metadataManagerFactory.newSubscriptionDataManager(); - - ByteString topic = ByteString.copyFromUtf8("testSubscriptionData"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - final StubCallback<Version> callback = new StubCallback<Version>(); - StubCallback<Versioned<SubscriptionData>> readCallback = new StubCallback<Versioned<SubscriptionData>>(); - StubCallback<Map<ByteString, Versioned<SubscriptionData>>> subsCallback - = new StubCallback<Map<ByteString, Versioned<SubscriptionData>>>(); - - subManager.readSubscriptionData(topic, subid, readCallback, null); - Either<Versioned<SubscriptionData>, PubSubException> readRes = readCallback.queue.take(); - Assert.assertEquals("Found inconsistent subscription state", null, readRes.left()); - Assert.assertEquals("Should not fail with PubSubException", null, readRes.right()); - - // read non-existed subscription state - subManager.readSubscriptions(topic, subsCallback, null); - Either<Map<ByteString, Versioned<SubscriptionData>>, PubSubException> res = subsCallback.queue.take(); - Assert.assertEquals("Found more than 0 subscribers", 0, res.left().size()); - Assert.assertEquals("Should not fail with PubSubException", null, res.right()); - - // update non-existed subscription state - if (subManager.isPartialUpdateSupported()) { - subManager.updateSubscriptionData(topic, subid, - SubscriptionData.getDefaultInstance(), Version.ANY, callback, null); - } else { - subManager.replaceSubscriptionData(topic, subid, - SubscriptionData.getDefaultInstance(), Version.ANY, callback, null); - } - Assert.assertTrue("Should fail to update a non-existed subscriber with PubSubException", - callback.queue.take().right() instanceof PubSubException.NoSubscriptionStateException); - - Callback<Void> voidCallback = new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - callback.operationFinished(ctx, null); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - callback.operationFailed(ctx, exception); - } - }; - - // delete non-existed subscription state - subManager.deleteSubscriptionData(topic, subid, Version.ANY, voidCallback, null); - Assert.assertTrue("Should fail to delete a non-existed subscriber with PubSubException", - callback.queue.take().right() instanceof PubSubException.NoSubscriptionStateException); - - long seqId = 10; - MessageSeqId.Builder builder = MessageSeqId.newBuilder(); - builder.setLocalComponent(seqId); - MessageSeqId msgId = builder.build(); - - SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder(SubscriptionState.getDefaultInstance()).setMsgId(msgId); - SubscriptionData data = SubscriptionData.newBuilder().setState(stateBuilder).build(); - - // create a subscription state - subManager.createSubscriptionData(topic, subid, data, callback, null); - Either<Version, PubSubException> cbResult = callback.queue.take(); - Version v1 = cbResult.left(); - Assert.assertEquals("Should not fail with PubSubException", - null, cbResult.right()); - - // read subscriptions - subManager.readSubscriptions(topic, subsCallback, null); - res = subsCallback.queue.take(); - Assert.assertEquals("Should find just 1 subscriber", 1, res.left().size()); - Assert.assertEquals("Should not fail with PubSubException", null, res.right()); - Versioned<SubscriptionData> versionedSubData = res.left().get(subid); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v1.compare(versionedSubData.getVersion())); - SubscriptionData imss = versionedSubData.getValue(); - Assert.assertEquals("Found inconsistent subscription state", - data, imss); - Assert.assertEquals("Found inconsistent last consumed seq id", - seqId, imss.getState().getMsgId().getLocalComponent()); - - // move consume seq id - seqId = 99; - builder = MessageSeqId.newBuilder(); - builder.setLocalComponent(seqId); - msgId = builder.build(); - - stateBuilder = SubscriptionState.newBuilder(data.getState()).setMsgId(msgId); - data = SubscriptionData.newBuilder().setState(stateBuilder).build(); - - // update subscription state - if (subManager.isPartialUpdateSupported()) { - subManager.updateSubscriptionData(topic, subid, data, versionedSubData.getVersion(), callback, null); - } else { - subManager.replaceSubscriptionData(topic, subid, data, versionedSubData.getVersion(), callback, null); - } - cbResult = callback.queue.take(); - Assert.assertEquals("Fail to update a subscription state", null, cbResult.right()); - Version v2 = cbResult.left(); - // read subscription state - subManager.readSubscriptionData(topic, subid, readCallback, null); - Assert.assertEquals("Found inconsistent subscription state", - data, readCallback.queue.take().left().getValue()); - - // read subscriptions again - subManager.readSubscriptions(topic, subsCallback, null); - res = subsCallback.queue.take(); - Assert.assertEquals("Should find just 1 subscriber", 1, res.left().size()); - Assert.assertEquals("Should not fail with PubSubException", null, res.right()); - versionedSubData = res.left().get(subid); - Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(versionedSubData.getVersion())); - imss = res.left().get(subid).getValue(); - Assert.assertEquals("Found inconsistent subscription state", - data, imss); - Assert.assertEquals("Found inconsistent last consumed seq id", - seqId, imss.getState().getMsgId().getLocalComponent()); - - // update or replace subscription data with bad version - if (subManager.isPartialUpdateSupported()) { - subManager.updateSubscriptionData(topic, subid, data, v1, callback, null); - } else { - subManager.replaceSubscriptionData(topic, subid, data, v1, callback, null); - } - Assert.assertTrue(callback.queue.take().right() instanceof PubSubException.BadVersionException); - - // delete with bad version - subManager.deleteSubscriptionData(topic, subid, v1, voidCallback, null); - Assert.assertTrue(callback.queue.take().right() instanceof PubSubException.BadVersionException); - subManager.deleteSubscriptionData(topic, subid, res.left().get(subid).getVersion(), voidCallback, null); - Assert.assertEquals("Fail to delete an existed subscriber", null, callback.queue.take().right()); - - // read subscription states again - subManager.readSubscriptions(topic, subsCallback, null); - res = subsCallback.queue.take(); - Assert.assertEquals("Found more than 0 subscribers", 0, res.left().size()); - Assert.assertEquals("Should not fail with PubSubException", null, res.right()); - - subManager.close(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java deleted file mode 100644 index 8b9016a..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.hedwig.server.meta; - -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import java.io.IOException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.CountDownLatch; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.zookeeper.ZooKeeperTestBase; -import org.junit.Test; -import org.junit.Assert; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestMetadataManagerFactory extends ZooKeeperTestBase { - private static final Logger LOG = LoggerFactory.getLogger(TestMetadataManagerFactory.class); - - static class TestServerConfiguration extends ServerConfiguration { - String hedwigPrefix = "/hedwig"; - - @Override - public String getZkPrefix() { - return hedwigPrefix; - } - - public void setZkPrefix(String prefix) { - this.hedwigPrefix = prefix; - } - } - - static class DummyMetadataManagerFactory extends MetadataManagerFactory { - static int VERSION = 10; - - public int getCurrentVersion() { return VERSION; } - - - public MetadataManagerFactory initialize(ServerConfiguration cfg, - ZooKeeper zk, - int version) - throws IOException { - if (version != VERSION) { - throw new IOException("unmatched manager version"); - } - // do nothing - return this; - } - - public void shutdown() {} - - public Iterator<ByteString> getTopics() { - return null; - } - - public TopicPersistenceManager newTopicPersistenceManager() { - return null; - } - - public SubscriptionDataManager newSubscriptionDataManager() { - return null; - } - - public TopicOwnershipManager newTopicOwnershipManager() { - return null; - } - - public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException { - // do nothing - } - } - - private void writeFactoryLayout(ServerConfiguration conf, - String factoryCls, - int factoryVersion) - throws Exception { - ManagerMeta meta = ManagerMeta.newBuilder() - .setManagerImpl(factoryCls) - .setManagerVersion(factoryVersion).build(); - new FactoryLayout(meta).store(zk, conf); - } - - /** - * Test bad server configuration - */ - @Test(timeout=60000) - public void testBadConf() throws Exception { - TestServerConfiguration conf = new TestServerConfiguration(); - - String root0 = "/goodconf"; - conf.setZkPrefix(root0); - - MetadataManagerFactory m = - MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - Assert.assertTrue("MetadataManagerFactory is unexpected type", - (m instanceof ZkMetadataManagerFactory)); - - // mismatching conf - conf.setMetadataManagerFactoryName(DummyMetadataManagerFactory.class.getName()); - try { - MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - Assert.fail("Shouldn't reach here"); - } catch (Exception e) { - Assert.assertTrue("Invalid exception", - e.getMessage().contains("does not match existing factory")); - } - - // invalid metadata manager - String root1 = "/badconf1"; - conf.setZkPrefix(root1); - conf.setMetadataManagerFactoryName("DoesNotExist"); - try { - MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - Assert.fail("Shouldn't reach here"); - } catch (Exception e) { - Assert.assertTrue("Invalid exception", - e.getMessage().contains("Failed to get metadata manager factory class from configuration")); - } - } - - /** - * Test bad zk configuration - */ - @Test(timeout=60000) - public void testBadZkContents() throws Exception { - TestServerConfiguration conf = new TestServerConfiguration(); - - // bad type in zookeeper - String root0 = "/badzk0"; - conf.setZkPrefix(root0); - - writeFactoryLayout(conf, "DoesNotExist", 0xdeadbeef); - try { - MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - Assert.fail("Shouldn't reach here"); - } catch (Exception e) { - Assert.assertTrue("Invalid exception", - e.getMessage().contains("No class found to instantiate metadata manager factory")); - } - - // bad version in zookeeper - String root1 = "/badzk1"; - conf.setZkPrefix(root1); - - writeFactoryLayout(conf, ZkMetadataManagerFactory.class.getName(), 0xdeadbeef); - try { - MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - Assert.fail("Shouldn't reach here"); - } catch (Exception e) { - Assert.assertTrue("Invalid exception", - e.getMessage().contains("Incompatible ZkMetadataManagerFactory version")); - } - } - - private class CreateMMThread extends Thread { - private boolean success = false; - private final String factoryCls; - private final String root; - private final CyclicBarrier barrier; - private ZooKeeper zkc; - - CreateMMThread(String root, String factoryCls, CyclicBarrier barrier) throws Exception { - this.factoryCls = factoryCls; - this.barrier = barrier; - this.root = root; - final CountDownLatch latch = new CountDownLatch(1); - zkc = new ZooKeeper(hostPort, 10000, new Watcher() { - public void process(WatchedEvent event) { - latch.countDown(); - } - }); - latch.await(); - } - - public void run() { - TestServerConfiguration conf = new TestServerConfiguration(); - conf.setZkPrefix(root); - conf.setMetadataManagerFactoryName(factoryCls); - - try { - barrier.await(); - MetadataManagerFactory.newMetadataManagerFactory(conf, zkc); - success = true; - } catch (Exception e) { - LOG.error("Failed to create metadata manager factory", e); - } - } - - public boolean isSuccessful() { - return success; - } - - public void close() throws Exception { - zkc.close(); - } - } - - // test concurrent - @Test(timeout=60000) - public void testConcurrent1() throws Exception { - /// everyone creates the same - int numThreads = 50; - - // bad version in zookeeper - String root0 = "/lmroot0"; - - CyclicBarrier barrier = new CyclicBarrier(numThreads+1); - List<CreateMMThread> threads = new ArrayList<CreateMMThread>(numThreads); - for (int i = 0; i < numThreads; i++) { - CreateMMThread t = new CreateMMThread(root0, ZkMetadataManagerFactory.class.getName(), barrier); - t.start(); - threads.add(t); - } - - barrier.await(); - - boolean success = true; - for (CreateMMThread t : threads) { - t.join(); - t.close(); - success = t.isSuccessful() && success; - } - Assert.assertTrue("Not all metadata manager factories created", success); - } - - @Test(timeout=60000) - public void testConcurrent2() throws Exception { - /// odd create different - int numThreadsEach = 25; - - // bad version in zookeeper - String root0 = "/lmroot0"; - - CyclicBarrier barrier = new CyclicBarrier(numThreadsEach*2+1); - List<CreateMMThread> threadsA = new ArrayList<CreateMMThread>(numThreadsEach); - for (int i = 0; i < numThreadsEach; i++) { - CreateMMThread t = new CreateMMThread(root0, ZkMetadataManagerFactory.class.getName(), barrier); - t.start(); - threadsA.add(t); - } - List<CreateMMThread> threadsB = new ArrayList<CreateMMThread>(numThreadsEach); - for (int i = 0; i < numThreadsEach; i++) { - CreateMMThread t = new CreateMMThread(root0, DummyMetadataManagerFactory.class.getName(), barrier); - t.start(); - threadsB.add(t); - } - - barrier.await(); - - int numSuccess = 0; - int numFails = 0; - for (CreateMMThread t : threadsA) { - t.join(); - t.close(); - if (t.isSuccessful()) { - numSuccess++; - } else { - numFails++; - } - } - - for (CreateMMThread t : threadsB) { - t.join(); - t.close(); - if (t.isSuccessful()) { - numSuccess++; - } else { - numFails++; - } - } - Assert.assertEquals("Incorrect number of successes", numThreadsEach, numSuccess); - Assert.assertEquals("Incorrect number of failures", numThreadsEach, numFails); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java b/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java deleted file mode 100644 index 241f45b..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.netty; - -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.junit.Test; - -import org.apache.bookkeeper.test.PortManager; -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.server.PubSubServerStandAloneTestBase; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.topics.AbstractTopicManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.server.LoggingExceptionHandler; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.hedwig.zookeeper.SafeAsyncZKCallback; - -import static org.junit.Assert.*; - -public class TestPubSubServer extends PubSubServerStandAloneTestBase { - - @Test(timeout=60000) - public void testSecondServer() throws Exception { - PubSubServer server1 = new PubSubServer(new StandAloneServerConfiguration() { - @Override - public int getServerPort() { - return super.getServerPort() + 1; - } - }, new ClientConfiguration(), new LoggingExceptionHandler()); - server1.start(); - server1.shutdown(); - } - - class RecordingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { - SynchronousQueue<Throwable> queue; - - public RecordingUncaughtExceptionHandler(SynchronousQueue<Throwable> queue) { - this.queue = queue; - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - queue.add(e); - } - - } - - private interface TopicManagerInstantiator { - public TopicManager instantiateTopicManager() throws IOException; - } - - PubSubServer startServer(final UncaughtExceptionHandler uncaughtExceptionHandler, final int port, - final TopicManagerInstantiator instantiator) throws Exception { - PubSubServer server = new PubSubServer(new StandAloneServerConfiguration() { - @Override - public int getServerPort() { - return port; - } - - }, new ClientConfiguration(), uncaughtExceptionHandler) { - - @Override - protected TopicManager instantiateTopicManager() throws IOException { - return instantiator.instantiateTopicManager(); - } - }; - server.start(); - return server; - - } - - public void runPublishRequest(final int port) throws Exception { - Publisher publisher = new HedwigClient(new ClientConfiguration() { - @Override - public InetSocketAddress getDefaultServerHost() { - return new InetSocketAddress("localhost", port); - } - }).getPublisher(); - - publisher.asyncPublish(ByteString.copyFromUtf8("blah"), Message.newBuilder().setBody( - ByteString.copyFromUtf8("blah")).build(), new Callback<Void>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - assertTrue(false); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - assertTrue(false); - } - - }, null); - } - - @Test(timeout=60000) - public void testUncaughtExceptionInNettyThread() throws Exception { - - SynchronousQueue<Throwable> queue = new SynchronousQueue<Throwable>(); - RecordingUncaughtExceptionHandler uncaughtExceptionHandler = new RecordingUncaughtExceptionHandler(queue); - final int port = PortManager.nextFreePort(); - - PubSubServer server = startServer(uncaughtExceptionHandler, port, new TopicManagerInstantiator() { - - @Override - public TopicManager instantiateTopicManager() throws IOException { - return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) { - @Override - protected void realGetOwner(ByteString topic, boolean shouldClaim, - Callback<HedwigSocketAddress> cb, Object ctx) { - throw new RuntimeException("this exception should be uncaught"); - } - - @Override - protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) { - } - }; - } - }); - - runPublishRequest(port); - assertEquals(RuntimeException.class, queue.take().getClass()); - server.shutdown(); - } - - @Test(timeout=60000) - public void testUncaughtExceptionInZKThread() throws Exception { - - SynchronousQueue<Throwable> queue = new SynchronousQueue<Throwable>(); - RecordingUncaughtExceptionHandler uncaughtExceptionHandler = new RecordingUncaughtExceptionHandler(queue); - final int port = PortManager.nextFreePort(); - final String hostPort = "127.0.0.1:" + PortManager.nextFreePort(); - - PubSubServer server = startServer(uncaughtExceptionHandler, port, new TopicManagerInstantiator() { - - @Override - public TopicManager instantiateTopicManager() throws IOException { - return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) { - - @Override - protected void realGetOwner(ByteString topic, boolean shouldClaim, - Callback<HedwigSocketAddress> cb, Object ctx) { - ZooKeeper zookeeper; - try { - zookeeper = new ZooKeeper(hostPort, 60000, new Watcher() { - @Override - public void process(WatchedEvent event) { - // TODO Auto-generated method stub - - } - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - - zookeeper.getData("/fake", false, new SafeAsyncZKCallback.DataCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, byte[] data, - org.apache.zookeeper.data.Stat stat) { - throw new RuntimeException("This should go to the uncaught exception handler"); - } - - }, null); - } - - @Override - protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) { - } - }; - } - }); - - runPublishRequest(port); - assertEquals(RuntimeException.class, queue.take().getClass()); - server.shutdown(); - } - - @Test(timeout=60000) - public void testInvalidServerConfiguration() throws Exception { - boolean success = false; - ServerConfiguration conf = new ServerConfiguration() { - @Override - public boolean isInterRegionSSLEnabled() { - return conf.getBoolean(INTER_REGION_SSL_ENABLED, true); - } - - @Override - public List<String> getRegions() { - List<String> regionsList = new LinkedList<String>(); - regionsList.add("regionHost1:4080:9876"); - regionsList.add("regionHost2:4080"); - regionsList.add("regionHost3:4080:9876"); - return regionsList; - } - }; - try { - conf.validate(); - } - catch (ConfigurationException e) { - logger.error("Invalid configuration: ", e); - success = true; - } - assertTrue(success); - } - - @Test(timeout=60000) - public void testValidServerConfiguration() throws Exception { - boolean success = true; - ServerConfiguration conf = new ServerConfiguration() { - @Override - public boolean isInterRegionSSLEnabled() { - return conf.getBoolean(INTER_REGION_SSL_ENABLED, true); - } - - @Override - public List<String> getRegions() { - List<String> regionsList = new LinkedList<String>(); - regionsList.add("regionHost1:4080:9876"); - regionsList.add("regionHost2:4080:2938"); - regionsList.add("regionHost3:4080:9876"); - return regionsList; - } - }; - try { - conf.validate(); - } - catch (ConfigurationException e) { - logger.error("Invalid configuration: ", e); - success = false; - } - assertTrue(success); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java b/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java deleted file mode 100644 index 08f5ad8..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.netty; - -import static org.junit.Assert.assertEquals; - -import org.apache.hedwig.server.netty.ServerStats.OpStats; -import org.junit.Test; - -/** Tests that Statistics updation in hedwig Server */ -public class TestServerStats { - - /** - * Tests that updatLatency should not fail with - * ArrayIndexOutOfBoundException when latency time coming as negative. - */ - @Test(timeout=60000) - public void testUpdateLatencyShouldNotFailWithAIOBEWithNegativeLatency() - throws Exception { - OpStats opStat = new OpStats(); - opStat.updateLatency(-10); - assertEquals("Should not update any latency metrics", 0, - opStat.numSuccessOps); - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java b/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java deleted file mode 100644 index 91cf2fe..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.netty; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.LinkedList; -import java.util.List; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelConfig; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.DefaultChannelFuture; -import org.jboss.netty.channel.SucceededChannelFuture; - -public class WriteRecordingChannel implements Channel { - - public boolean closed = false; - ChannelFuture closingFuture = new DefaultChannelFuture(this, false); - List<Object> messagesWritten = new LinkedList<Object>(); - - public List<Object> getMessagesWritten() { - return messagesWritten; - } - - public void clearMessages() { - messagesWritten.clear(); - } - - @Override - public ChannelFuture bind(SocketAddress localAddress) { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelFuture close() { - closed = true; - closingFuture.setSuccess(); - return new SucceededChannelFuture(this); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress) { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelFuture disconnect() { - return close(); - } - - @Override - public ChannelFuture getCloseFuture() { - return closingFuture; - } - - @Override - public ChannelConfig getConfig() { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelFactory getFactory() { - throw new RuntimeException("Not intended"); - } - - @Override - public Integer getId() { - throw new RuntimeException("Not intended"); - } - - @Override - public int getInterestOps() { - throw new RuntimeException("Not intended"); - } - - @Override - public SocketAddress getLocalAddress() { - return new InetSocketAddress("localhost", 1234); - } - - @Override - public Channel getParent() { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelPipeline getPipeline() { - throw new RuntimeException("Not intended"); - } - - @Override - public SocketAddress getRemoteAddress() { - return new InetSocketAddress("www.yahoo.com", 80); - } - - @Override - public boolean isBound() { - throw new RuntimeException("Not intended"); - } - - @Override - public boolean isConnected() { - return closed == false; - } - - @Override - public boolean isOpen() { - throw new RuntimeException("Not intended"); - } - - @Override - public boolean isReadable() { - throw new RuntimeException("Not intended"); - } - - @Override - public boolean isWritable() { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelFuture setInterestOps(int interestOps) { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelFuture setReadable(boolean readable) { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelFuture unbind() { - throw new RuntimeException("Not intended"); - } - - @Override - public ChannelFuture write(Object message) { - messagesWritten.add(message); - return new SucceededChannelFuture(this); - } - - @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - throw new RuntimeException("Not intended"); - } - - @Override - public int compareTo(Channel o) { - throw new RuntimeException("Not intended"); - } - - @Override - public void setAttachment(Object attachment) {} - - @Override - public Object getAttachment() { throw new RuntimeException("Not intended"); } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java deleted file mode 100644 index b71d037..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java +++ /dev/null @@ -1,267 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.io.File; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; - -import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; -import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; -import org.apache.bookkeeper.test.PortManager; - -import org.apache.bookkeeper.bookie.Bookie; -import org.apache.bookkeeper.bookie.BookieException; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import org.apache.hedwig.util.FileUtils; -import org.apache.hedwig.zookeeper.ZooKeeperTestBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a base class for any tests that require a BookKeeper client/server - * setup. - * - */ -public class BookKeeperTestBase extends ZooKeeperTestBase { - private static final Logger LOG = LoggerFactory.getLogger(BookKeeperTestBase.class); - - class TestBookie extends Bookie { - final long readDelay; - - public TestBookie(ServerConfiguration conf, long readDelay) - throws IOException, KeeperException, InterruptedException, BookieException { - super(conf); - this.readDelay = readDelay; - } - - @Override - public ByteBuffer readEntry(long ledgerId, long entryId) - throws IOException, NoLedgerException { - if (readDelay > 0) { - try { - Thread.sleep(readDelay); - } catch (InterruptedException ie) { - } - } - return super.readEntry(ledgerId, entryId); - } - } - - class TestBookieServer extends BookieServer { - public TestBookieServer(ServerConfiguration conf) - throws IOException, - KeeperException, InterruptedException, BookieException, - UnavailableException, CompatibilityException { - super(conf); - } - - protected Bookie newBookie(ServerConfiguration conf) - throws IOException, KeeperException, InterruptedException, BookieException { - return new TestBookie(conf, readDelay); - } - } - - // BookKeeper Server variables - private List<BookieServer> bookiesList; - private List<ServerConfiguration> bkConfsList; - - // String constants used for creating the bookie server files. - private static final String PREFIX = "bookie"; - private static final String SUFFIX = "test"; - - // readDelay - protected long readDelay; - - // Variable to decide how many bookie servers to set up. - private final int numBookies; - // BookKeeper client instance - protected BookKeeper bk; - - protected ServerConfiguration baseConf = newServerConfiguration(); - protected ClientConfiguration baseClientConf = new ClientConfiguration(); - - // Constructor - public BookKeeperTestBase(int numBookies) { - this(numBookies, 0L); - } - - public BookKeeperTestBase(int numBookies, long readDelay) { - this.numBookies = numBookies; - this.readDelay = readDelay; - } - - public BookKeeperTestBase() { - // By default, use 3 bookies. - this(3); - } - - // Getter for the ZooKeeper client instance that the parent class sets up. - protected ZooKeeper getZooKeeperClient() { - return zk; - } - - // Give junit a fake test so that its happy - @Test(timeout=60000) - public void testNothing() throws Exception { - - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - // Initialize the zk client with values - try { - zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException e) { - LOG.error("Error setting up", e); - } catch (InterruptedException e) { - LOG.error("Error setting up", e); - } - - // Create Bookie Servers - bookiesList = new LinkedList<BookieServer>(); - bkConfsList = new LinkedList<ServerConfiguration>(); - - for (int i = 0; i < numBookies; i++) { - startUpNewBookieServer(); - } - - // Create the BookKeeper client - bk = new BookKeeper(hostPort); - } - - public String getZkHostPort() { - return hostPort; - } - - @Override - @After - public void tearDown() throws Exception { - // Shutdown all of the bookie servers - for (BookieServer bs : bookiesList) { - bs.shutdown(); - } - // Close the BookKeeper client - bk.close(); - super.tearDown(); - } - - public void stopAllBookieServers() throws Exception { - for (BookieServer bs : bookiesList) { - bs.shutdown(); - } - bookiesList.clear(); - } - - public void startAllBookieServers() throws Exception { - for (ServerConfiguration conf : bkConfsList) { - bookiesList.add(startBookie(conf)); - } - } - - public void suspendAllBookieServers() throws Exception { - for (BookieServer bs : bookiesList) { - bs.suspendProcessing(); - } - } - - public void resumeAllBookieServers() throws Exception { - for (BookieServer bs : bookiesList) { - bs.resumeProcessing(); - } - } - - public void tearDownOneBookieServer() throws Exception { - Random r = new Random(); - int bi = r.nextInt(bookiesList.size()); - BookieServer bs = bookiesList.get(bi); - bs.shutdown(); - bookiesList.remove(bi); - bkConfsList.remove(bi); - } - - public void startUpNewBookieServer() throws Exception { - int port = PortManager.nextFreePort(); - File tmpDir = FileUtils.createTempDirectory( - PREFIX + port, SUFFIX); - ServerConfiguration conf = newServerConfiguration( - port, hostPort, tmpDir, new File[] { tmpDir }); - bookiesList.add(startBookie(conf)); - bkConfsList.add(conf); - } - - /** - * Helper method to startup a bookie server using a configuration object - * - * @param conf - * Server Configuration Object - * - */ - private BookieServer startBookie(ServerConfiguration conf) throws Exception { - BookieServer server = new TestBookieServer(conf); - server.start(); - - int port = conf.getBookiePort(); - while(zk.exists("/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) { - Thread.sleep(500); - } - - return server; - } - - // construct the basic server configuration for bookkeeper testing - private static ServerConfiguration newServerConfiguration() { - ServerConfiguration conf = new ServerConfiguration(); - conf.setJournalFlushWhenQueueEmpty(true); - conf.setJournalAdaptiveGroupWrites(false); - return conf; - } - - protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) { - ServerConfiguration conf = new ServerConfiguration(baseConf); - conf.setAllowLoopback(true); - conf.setBookiePort(port); - conf.setZkServers(zkServers); - conf.setJournalDirName(journalDir.getPath()); - String[] ledgerDirNames = new String[ledgerDirs.length]; - for (int i=0; i<ledgerDirs.length; i++) { - ledgerDirNames[i] = ledgerDirs[i].getPath(); - } - conf.setLedgerDirNames(ledgerDirNames); - return conf; - } - -}