http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java b/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java deleted file mode 100644 index e93ecae..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java +++ /dev/null @@ -1,439 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; - -import static org.junit.Assert.*; - -public class TestMultiplexing extends HedwigHubTestBase { - - private static final int DEFAULT_MSG_WINDOW_SIZE = 10; - - protected class TestServerConfiguration extends HubServerConfiguration { - TestServerConfiguration(int serverPort, int sslServerPort) { - super(serverPort, sslServerPort); - } - @Override - public int getDefaultMessageWindowSize() { - return DEFAULT_MSG_WINDOW_SIZE; - } - } - - class TestMessageHandler implements MessageHandler { - - int expected; - final int numMsgsAtFirstRun; - final int numMsgsAtSecondRun; - final CountDownLatch firstLatch; - final CountDownLatch secondLatch; - final boolean receiveSecondRun; - - public TestMessageHandler(int start, int numMsgsAtFirstRun, - boolean receiveSecondRun, - int numMsgsAtSecondRun) { - expected = start; - this.numMsgsAtFirstRun = numMsgsAtFirstRun; - this.numMsgsAtSecondRun = numMsgsAtSecondRun; - this.receiveSecondRun = receiveSecondRun; - firstLatch = new CountDownLatch(1); - secondLatch = new CountDownLatch(1); - } - - @Override - public synchronized void deliver(ByteString topic, ByteString subscriberId, - Message msg, - Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - logger.debug("Received message {}.", value); - - if (value == expected) { - ++expected; - } else { - // error condition - logger.error("Did not receive expected value, expected {}, got {}", - expected, value); - expected = 0; - firstLatch.countDown(); - secondLatch.countDown(); - } - if (numMsgsAtFirstRun + 1 == expected) { - firstLatch.countDown(); - } - if (receiveSecondRun) { - if (numMsgsAtFirstRun + numMsgsAtSecondRun + 1 == expected) { - secondLatch.countDown(); - } - } else { - if (numMsgsAtFirstRun + 1 < expected) { - secondLatch.countDown(); - } - } - callback.operationFinished(context, null); - subscriber.consume(topic, subscriberId, msg.getMsgId()); - } catch (Throwable t) { - logger.error("Received bad message.", t); - firstLatch.countDown(); - secondLatch.countDown(); - } - } - - public void checkFirstRun() throws Exception { - assertTrue("Timed out waiting for messages " + (numMsgsAtFirstRun + 1), - firstLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected messages with " + (numMsgsAtFirstRun + 1), - numMsgsAtFirstRun + 1, expected); - } - - public void checkSecondRun() throws Exception { - if (receiveSecondRun) { - assertTrue("Timed out waiting for messages " - + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1), - secondLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected messages with " - + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1), - numMsgsAtFirstRun + numMsgsAtSecondRun + 1, expected); - } else { - assertFalse("Receive more messages than " + numMsgsAtFirstRun, - secondLatch.await(3, TimeUnit.SECONDS)); - assertEquals("Should be expected messages with ony " + (numMsgsAtFirstRun + 1), - numMsgsAtFirstRun + 1, expected); - } - } - } - - class ThrottleMessageHandler implements MessageHandler { - - int expected; - final int numMsgs; - final int numMsgsThrottle; - final CountDownLatch throttleLatch; - final CountDownLatch nonThrottleLatch; - final boolean enableThrottle; - - public ThrottleMessageHandler(int start, int numMsgs, - boolean enableThrottle, - int numMsgsThrottle) { - expected = start; - this.numMsgs = numMsgs; - this.enableThrottle = enableThrottle; - this.numMsgsThrottle = numMsgsThrottle; - throttleLatch = new CountDownLatch(1); - nonThrottleLatch = new CountDownLatch(1); - } - - @Override - public synchronized void deliver(ByteString topic, ByteString subscriberId, - Message msg, - Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - logger.debug("Received message {}.", value); - - if (value == expected) { - ++expected; - } else { - // error condition - logger.error("Did not receive expected value, expected {}, got {}", - expected, value); - expected = 0; - throttleLatch.countDown(); - nonThrottleLatch.countDown(); - } - if (expected == numMsgsThrottle + 2) { - throttleLatch.countDown(); - } - if (expected == numMsgs + 1) { - nonThrottleLatch.countDown(); - } - callback.operationFinished(context, null); - if (enableThrottle) { - if (expected > numMsgsThrottle + 1) { - subscriber.consume(topic, subscriberId, msg.getMsgId()); - } - } else { - subscriber.consume(topic, subscriberId, msg.getMsgId()); - } - } catch (Throwable t) { - logger.error("Received bad message.", t); - throttleLatch.countDown(); - nonThrottleLatch.countDown(); - } - } - - public void checkThrottle() throws Exception { - if (enableThrottle) { - assertFalse("Received more messages than throttle value " + numMsgsThrottle, - throttleLatch.await(3, TimeUnit.SECONDS)); - assertEquals("Should be expected messages with only " + (numMsgsThrottle + 1), - numMsgsThrottle + 1, expected); - } else { - assertTrue("Should not be throttled.", throttleLatch.await(10, TimeUnit.SECONDS)); - assertTrue("Timed out waiting for messages " + (numMsgs + 1), - nonThrottleLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected messages with " + (numMsgs + 1), - numMsgs + 1, expected); - } - } - - public void checkAfterThrottle() throws Exception { - if (enableThrottle) { - assertTrue("Timed out waiting for messages " + (numMsgs + 1), - nonThrottleLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected messages with " + (numMsgs + 1), - numMsgs + 1, expected); - } - } - } - - HedwigClient client; - Publisher publisher; - Subscriber subscriber; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - client = new HedwigClient(new HubClientConfiguration() { - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return true; - } - @Override - public boolean isAutoSendConsumeMessageEnabled() { - return false; - } - }); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - @Override - @After - public void tearDown() throws Exception { - client.close(); - super.tearDown(); - } - - @Override - protected ServerConfiguration getServerConfiguration(int port, int sslPort) { - return new TestServerConfiguration(port, sslPort); - } - - @Test(timeout=60000) - public void testStopDelivery() throws Exception { - ByteString topic1 = ByteString.copyFromUtf8("testStopDelivery-1"); - ByteString topic2 = ByteString.copyFromUtf8("testStopDelivery-2"); - ByteString subid1 = ByteString.copyFromUtf8("mysubid-1"); - ByteString subid2 = ByteString.copyFromUtf8("mysubid-2"); - - final int X = 20; - - TestMessageHandler csHandler11 = - new TestMessageHandler(1, X, true, X); - TestMessageHandler csHandler12 = - new TestMessageHandler(1, X, false, 0); - TestMessageHandler csHandler21 = - new TestMessageHandler(1, X, false, 0); - TestMessageHandler csHandler22 = - new TestMessageHandler(1, X, true, X); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - - subscriber.subscribe(topic1, subid1, opts); - subscriber.subscribe(topic1, subid2, opts); - subscriber.subscribe(topic2, subid1, opts); - subscriber.subscribe(topic2, subid2, opts); - - // start deliveries - subscriber.startDelivery(topic1, subid1, csHandler11); - subscriber.startDelivery(topic1, subid2, csHandler12); - subscriber.startDelivery(topic2, subid1, csHandler21); - subscriber.startDelivery(topic2, subid2, csHandler22); - - // first publish - for (int i = 1; i<=X; i++) { - publisher.publish(topic1, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - publisher.publish(topic2, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - - csHandler11.checkFirstRun(); - csHandler12.checkFirstRun(); - csHandler21.checkFirstRun(); - csHandler22.checkFirstRun(); - - // stop delivery for <topic1, subscriber2> and <topic2, subscriber1> - subscriber.stopDelivery(topic1, subid2); - subscriber.stopDelivery(topic2, subid1); - - // second publish - for (int i = X+1; i<=2*X; i++) { - publisher.publish(topic1, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - publisher.publish(topic2, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - - csHandler11.checkSecondRun(); - csHandler22.checkSecondRun(); - csHandler12.checkSecondRun(); - csHandler21.checkSecondRun(); - } - - @Test(timeout=60000) - public void testCloseSubscription() throws Exception { - ByteString topic1 = ByteString.copyFromUtf8("testCloseSubscription-1"); - ByteString topic2 = ByteString.copyFromUtf8("testCloseSubscription-2"); - ByteString subid1 = ByteString.copyFromUtf8("mysubid-1"); - ByteString subid2 = ByteString.copyFromUtf8("mysubid-2"); - - final int X = 20; - - TestMessageHandler csHandler11 = - new TestMessageHandler(1, X, true, X); - TestMessageHandler csHandler12 = - new TestMessageHandler(1, X, false, 0); - TestMessageHandler csHandler21 = - new TestMessageHandler(1, X, false, 0); - TestMessageHandler csHandler22 = - new TestMessageHandler(1, X, true, X); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - subscriber.subscribe(topic1, subid1, opts); - subscriber.subscribe(topic1, subid2, opts); - subscriber.subscribe(topic2, subid1, opts); - subscriber.subscribe(topic2, subid2, opts); - - // start deliveries - subscriber.startDelivery(topic1, subid1, csHandler11); - subscriber.startDelivery(topic1, subid2, csHandler12); - subscriber.startDelivery(topic2, subid1, csHandler21); - subscriber.startDelivery(topic2, subid2, csHandler22); - - // first publish - for (int i = 1; i<=X; i++) { - publisher.publish(topic1, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - publisher.publish(topic2, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - - csHandler11.checkFirstRun(); - csHandler12.checkFirstRun(); - csHandler21.checkFirstRun(); - csHandler22.checkFirstRun(); - - // close subscription for <topic1, subscriber2> and <topic2, subscriber1> - subscriber.closeSubscription(topic1, subid2); - subscriber.closeSubscription(topic2, subid1); - - // second publish - for (int i = X+1; i<=2*X; i++) { - publisher.publish(topic1, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - publisher.publish(topic2, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - - csHandler11.checkSecondRun(); - csHandler22.checkSecondRun(); - csHandler12.checkSecondRun(); - csHandler21.checkSecondRun(); - } - - @Test(timeout=60000) - public void testThrottle() throws Exception { - ByteString topic1 = ByteString.copyFromUtf8("testThrottle-1"); - ByteString topic2 = ByteString.copyFromUtf8("testThrottle-2"); - ByteString subid1 = ByteString.copyFromUtf8("mysubid-1"); - ByteString subid2 = ByteString.copyFromUtf8("mysubid-2"); - - final int X = DEFAULT_MSG_WINDOW_SIZE; - - ThrottleMessageHandler csHandler11 = - new ThrottleMessageHandler(1, 3*X, false, X); - ThrottleMessageHandler csHandler12 = - new ThrottleMessageHandler(1, 3*X, true, X); - ThrottleMessageHandler csHandler21 = - new ThrottleMessageHandler(1, 3*X, true, X); - ThrottleMessageHandler csHandler22 = - new ThrottleMessageHandler(1, 3*X, false, X); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - subscriber.subscribe(topic1, subid1, opts); - subscriber.subscribe(topic1, subid2, opts); - subscriber.subscribe(topic2, subid1, opts); - subscriber.subscribe(topic2, subid2, opts); - - // start deliveries - subscriber.startDelivery(topic1, subid1, csHandler11); - subscriber.startDelivery(topic1, subid2, csHandler12); - subscriber.startDelivery(topic2, subid1, csHandler21); - subscriber.startDelivery(topic2, subid2, csHandler22); - - // publish - for (int i = 1; i<=3*X; i++) { - publisher.publish(topic1, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - publisher.publish(topic2, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - - csHandler11.checkThrottle(); - csHandler12.checkThrottle(); - csHandler21.checkThrottle(); - csHandler22.checkThrottle(); - - // consume messages to not throttle them - for (int i=1; i<=X; i++) { - MessageSeqId seqId = - MessageSeqId.newBuilder().setLocalComponent(i).build(); - subscriber.consume(topic1, subid2, seqId); - subscriber.consume(topic2, subid1, seqId); - } - - csHandler11.checkAfterThrottle(); - csHandler22.checkAfterThrottle(); - csHandler12.checkAfterThrottle(); - csHandler21.checkAfterThrottle(); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java deleted file mode 100644 index 81e0314..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server; - -import java.util.LinkedList; -import java.util.List; - -import junit.framework.TestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.PubSubServer; -import org.apache.hedwig.server.persistence.BookKeeperTestBase; -import org.apache.hedwig.util.HedwigSocketAddress; - -import org.apache.bookkeeper.test.PortManager; - -/** - * This is a base class for any tests that need a Hedwig Hub(s) setup with an - * associated BookKeeper and ZooKeeper instance. - * - */ -public abstract class HedwigHubTestBase extends TestCase { - - protected static final Logger logger = LoggerFactory.getLogger(HedwigHubTestBase.class); - - // BookKeeper variables - // Default number of bookie servers to setup. Extending classes can - // override this. - protected int numBookies = 3; - protected long readDelay = 0L; - protected BookKeeperTestBase bktb; - - // PubSubServer variables - // Default number of PubSubServer hubs to setup. Extending classes can - // override this. - protected final int numServers; - protected List<PubSubServer> serversList; - protected List<HedwigSocketAddress> serverAddresses; - - protected boolean sslEnabled = true; - protected boolean standalone = false; - - protected static final String HOST = "localhost"; - - public HedwigHubTestBase() { - this(1); - } - - protected HedwigHubTestBase(int numServers) { - this.numServers = numServers; - - init(); - } - - public HedwigHubTestBase(String name, int numServers) { - this.numServers = numServers; - init(); - } - - private void init() { - - serverAddresses = new LinkedList<HedwigSocketAddress>(); - for (int i = 0; i < numServers; i++) { - serverAddresses.add(new HedwigSocketAddress(HOST, - PortManager.nextFreePort(), PortManager.nextFreePort())); - } - } - - // Default child class of the ServerConfiguration to be used here. - // Extending classes can define their own (possibly extending from this) and - // override the getServerConfiguration method below to return their own - // configuration. - protected class HubServerConfiguration extends ServerConfiguration { - private final int serverPort, sslServerPort; - - public HubServerConfiguration(int serverPort, int sslServerPort) { - this.serverPort = serverPort; - this.sslServerPort = sslServerPort; - } - - @Override - public boolean isStandalone() { - return standalone; - } - - @Override - public int getServerPort() { - return serverPort; - } - - @Override - public int getSSLServerPort() { - return sslServerPort; - } - - @Override - public String getZkHost() { - return null != bktb ? bktb.getZkHostPort() : null; - } - - @Override - public boolean isSSLEnabled() { - return sslEnabled; - } - - @Override - public String getCertName() { - return isSSLEnabled() ? "/server.p12" : null; - } - - @Override - public String getPassword() { - return isSSLEnabled() ? "eUySvp2phM2Wk" : null; - } - } - - public class HubClientConfiguration extends ClientConfiguration { - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return serverAddresses.get(0); - } - } - - // Method to get a ServerConfiguration for the PubSubServers created using - // the specified ports. Extending child classes can override this. This - // default implementation will return the HubServerConfiguration object - // defined above. - protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) { - return new HubServerConfiguration(serverPort, sslServerPort); - } - - protected void startHubServers() throws Exception { - // Now create the PubSubServer Hubs - serversList = new LinkedList<PubSubServer>(); - - for (int i = 0; i < numServers; i++) { - ServerConfiguration conf = getServerConfiguration(serverAddresses.get(i).getPort(), - sslEnabled ? serverAddresses.get(i).getSSLPort() : -1); - PubSubServer s = new PubSubServer(conf, new ClientConfiguration(), new LoggingExceptionHandler()); - serversList.add(s); - s.start(); - } - } - - protected void stopHubServers() throws Exception { - // Shutdown all of the PubSubServers - for (PubSubServer server : serversList) { - server.shutdown(); - } - serversList.clear(); - } - - @Before - protected void setUp() throws Exception { - logger.info("STARTING " + getClass()); - if (! standalone) { - bktb = new BookKeeperTestBase(numBookies, readDelay); - bktb.setUp(); - } - startHubServers(); - logger.info("HedwigHub test setup finished"); - } - - @After - protected void tearDown() throws Exception { - logger.info("tearDown starting"); - stopHubServers(); - if (null != bktb) bktb.tearDown(); - logger.info("FINISHED " + getClass()); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java deleted file mode 100644 index 0a574b6..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java +++ /dev/null @@ -1,282 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.PubSubServer; -import org.apache.hedwig.server.persistence.BookKeeperTestBase; -import org.apache.hedwig.util.HedwigSocketAddress; - -import org.apache.bookkeeper.test.PortManager; - -/** - * This is a base class for any tests that need a Hedwig Region(s) setup with a - * number of Hedwig hubs per region, an associated HedwigClient per region and - * the required BookKeeper and ZooKeeper instances. - * - */ -public abstract class HedwigRegionTestBase { - - protected static final Logger logger = LoggerFactory.getLogger(HedwigRegionTestBase.class); - - // BookKeeper variables - // Default number of bookie servers to setup. Extending classes - // can override this. We should be able to reuse the same BookKeeper - // ensemble among all of the regions, at least for unit testing purposes. - protected int numBookies = 3; - protected BookKeeperTestBase bktb; - - // Hedwig Region variables - // Default number of Hedwig Regions to setup. Extending classes can - // override this. - protected int numRegions = 2; - protected int numServersPerRegion = 1; - - // Map with keys being Region names and values being the list of Hedwig - // Hubs (PubSubServers) for that particular region. - protected Map<String, List<PubSubServer>> regionServersMap; - // Map with keys being Region names and values being the Hedwig Client - // instance. - protected Map<String, HedwigClient> regionClientsMap; - - protected Map<String, Integer> regionNameToIndexMap; - protected Map<Integer, List<HedwigSocketAddress>> regionHubAddresses; - - // String constant used as the prefix for the region names. - protected static final String REGION_PREFIX = "region"; - - // Default child class of the ServerConfiguration to be used here. - // Extending classes can define their own (possibly extending from this) and - // override the getServerConfiguration method below to return their own - // configuration. - protected class RegionServerConfiguration extends ServerConfiguration { - private final int serverPort, sslServerPort; - private final String regionName; - - public RegionServerConfiguration(int serverPort, int sslServerPort, String regionName) { - this.serverPort = serverPort; - this.sslServerPort = sslServerPort; - this.regionName = regionName; - conf.setProperty(REGION, regionName); - setRegionList(); - } - - protected void setRegionList() { - List<String> myRegionList = new LinkedList<String>(); - for (int i = 0; i < numRegions; i++) { - int curDefaultServerPort = regionHubAddresses.get(i).get(0).getPort(); - int curDefaultSSLServerPort = regionHubAddresses.get(i).get(0).getSSLPort(); - // Add this region default server port if it is for a region - // other than its own. - if (regionNameToIndexMap.get(regionName) != i) { - myRegionList.add("localhost:" + curDefaultServerPort + ":" + curDefaultSSLServerPort); - } - } - - regionList = myRegionList; - } - - @Override - public int getServerPort() { - return serverPort; - } - - @Override - public int getSSLServerPort() { - return sslServerPort; - } - - @Override - public String getZkHost() { - return bktb.getZkHostPort(); - } - - @Override - public String getMyRegion() { - return regionName; - } - - @Override - public boolean isSSLEnabled() { - return true; - } - - @Override - public boolean isInterRegionSSLEnabled() { - return true; - } - - @Override - public String getCertName() { - return "/server.p12"; - } - - @Override - public String getPassword() { - return "eUySvp2phM2Wk"; - } - } - - // Method to get a ServerConfiguration for the PubSubServers created using - // the specified ports and region name. Extending child classes can override - // this. This default implementation will return the - // RegionServerConfiguration object defined above. - protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort, String regionName) { - return new RegionServerConfiguration(serverPort, sslServerPort, regionName); - } - - // Default ClientConfiguration to use. This just points to the first - // Hedwig hub server in each region as the "default server host" to connect - // to. - protected class RegionClientConfiguration extends ClientConfiguration { - public RegionClientConfiguration(int serverPort, int sslServerPort) { - myDefaultServerAddress = new HedwigSocketAddress("localhost:" + serverPort + ":" + sslServerPort); - } - // Below you can override any of the default ClientConfiguration - // parameters if needed. - } - - // Method to get a ClientConfiguration for the HedwigClients created. - // Inputs are the default Hedwig hub server's ports to point to. - protected ClientConfiguration getClientConfiguration(int serverPort, int sslServerPort) { - return new RegionClientConfiguration(serverPort, sslServerPort); - } - - // Method to get a ClientConfiguration for the Cross Region Hedwig Client. - protected ClientConfiguration getRegionClientConfiguration() { - return new ClientConfiguration() { - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return regionHubAddresses.get(0).get(0); - } - }; - } - - @Before - public void setUp() throws Exception { - logger.info("STARTING " + getClass()); - bktb = new BookKeeperTestBase(numBookies); - bktb.setUp(); - - // Create the Hedwig PubSubServer Hubs for all of the regions - regionServersMap = new HashMap<String, List<PubSubServer>>(numRegions, 1.0f); - regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f); - - regionHubAddresses = new HashMap<Integer, List<HedwigSocketAddress>>(numRegions, 1.0f); - for (int i = 0; i < numRegions; i++) { - List<HedwigSocketAddress> addresses = new LinkedList<HedwigSocketAddress>(); - for (int j = 0; j < numServersPerRegion; j++) { - HedwigSocketAddress a = new HedwigSocketAddress("localhost", - PortManager.nextFreePort(), PortManager.nextFreePort()); - addresses.add(a); - } - regionHubAddresses.put(i, addresses); - } - regionNameToIndexMap = new HashMap<String, Integer>(); - - for (int i = 0; i < numRegions; i++) { - startRegion(i); - } - logger.info("HedwigRegion test setup finished"); - } - - @After - public void tearDown() throws Exception { - logger.info("tearDown starting"); - // Stop all of the HedwigClients for all regions - for (HedwigClient client : regionClientsMap.values()) { - client.close(); - } - regionClientsMap.clear(); - // Shutdown all of the PubSubServers in all regions - for (List<PubSubServer> serversList : regionServersMap.values()) { - for (PubSubServer server : serversList) { - server.shutdown(); - } - } - logger.info("Finished shutting down all of the hub servers!"); - regionServersMap.clear(); - // Shutdown the BookKeeper and ZooKeeper stuff - bktb.tearDown(); - logger.info("FINISHED " + getClass()); - } - - protected void stopRegion(int regionIdx) throws Exception { - String regionName = REGION_PREFIX + regionIdx; - if (logger.isDebugEnabled()) { - logger.debug("Stop region : " + regionName); - } - HedwigClient regionClient = regionClientsMap.remove(regionName); - if (null != regionClient) { - regionClient.close(); - } - List<PubSubServer> serversList = regionServersMap.remove(regionName); - if (null == serversList) { - return; - } - for (PubSubServer server : serversList) { - server.shutdown(); - } - logger.info("Finished shutting down all of the hub servers in region " + regionName); - } - - protected void startRegion(int i) throws Exception { - String regionName = REGION_PREFIX + i; - regionNameToIndexMap.put(regionName, i); - - if (logger.isDebugEnabled()) { - logger.debug("Start region : " + regionName); - } - - List<PubSubServer> serversList = new LinkedList<PubSubServer>(); - // For the current region, create the necessary amount of hub - // servers. We will basically increment through the port numbers - // starting from the initial ones defined. - for (int j = 0; j < numServersPerRegion; j++) { - HedwigSocketAddress a = regionHubAddresses.get(i).get(j); - PubSubServer s = new PubSubServer( - getServerConfiguration(a.getPort(), - a.getSSLPort(), - regionName), - getRegionClientConfiguration(), - new LoggingExceptionHandler()); - serversList.add(s); - s.start(); - } - // Store this list of servers created for the current region - regionServersMap.put(regionName, serversList); - // Create a Hedwig Client that points to the first Hub server - // created in the loop above for the current region. - HedwigClient regionClient = new HedwigClient( - getClientConfiguration(regionHubAddresses.get(i).get(0).getPort(), - regionHubAddresses.get(i).get(0).getSSLPort())); - regionClientsMap.put(regionName, regionClient); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java deleted file mode 100644 index 5ea0990..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Exception handler that simply logs the exception and - * does nothing more. To be used in tests instead of TerminateJVMExceptionHandler - */ -public class LoggingExceptionHandler implements Thread.UncaughtExceptionHandler { - private static final Logger logger = LoggerFactory.getLogger(LoggingExceptionHandler.class); - - @Override - public void uncaughtException(Thread t, Throwable e) { - logger.error("Uncaught exception in thread " + t.getName(), e); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java deleted file mode 100644 index 02d1f46..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; - -import org.apache.bookkeeper.test.PortManager; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.PubSubServer; -import org.apache.hedwig.util.HedwigSocketAddress; - -/** - * This is a base class for any tests that need a StandAlone PubSubServer setup. - */ -public abstract class PubSubServerStandAloneTestBase { - - protected static final Logger logger = LoggerFactory.getLogger(PubSubServerStandAloneTestBase.class); - - protected class StandAloneServerConfiguration extends ServerConfiguration { - final int port = PortManager.nextFreePort(); - final int sslPort = PortManager.nextFreePort(); - - @Override - public boolean isStandalone() { - return true; - } - - @Override - public int getServerPort() { - return port; - } - - @Override - public int getSSLServerPort() { - return sslPort; - } - } - - public ServerConfiguration getStandAloneServerConfiguration() { - return new StandAloneServerConfiguration(); - } - - protected PubSubServer server; - protected ServerConfiguration conf; - protected HedwigSocketAddress defaultAddress; - - @Before - public void setUp() throws Exception { - logger.info("STARTING " + getClass()); - conf = getStandAloneServerConfiguration(); - startHubServer(conf); - logger.info("Standalone PubSubServer test setup finished"); - } - - - @After - public void tearDown() throws Exception { - logger.info("tearDown starting"); - tearDownHubServer(); - logger.info("FINISHED " + getClass()); - } - - protected HedwigSocketAddress getDefaultHedwigAddress() { - return defaultAddress; - } - - protected void startHubServer(ServerConfiguration conf) throws Exception { - defaultAddress = new HedwigSocketAddress("localhost", conf.getServerPort(), - conf.getSSLServerPort()); - server = new PubSubServer(conf, new ClientConfiguration(), new LoggingExceptionHandler()); - server.start(); - } - - protected void tearDownHubServer() throws Exception { - server.shutdown(); - } - -}
