http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java deleted file mode 100644 index 46c0c17..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java +++ /dev/null @@ -1,375 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.delivery; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.filter.ClientMessageFilter; -import org.apache.hedwig.filter.MessageFilterBase; -import org.apache.hedwig.filter.ServerMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.protobuf.ByteString; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestThrottlingDelivery extends HedwigHubTestBase { - - private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10; - private static final String OPT_MOD = "MOD"; - - static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter { - - int mod; - - @Override - public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences) { - Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences); - ByteString modValue = userOptions.get(OPT_MOD); - if (null == modValue) { - mod = 0; - } else { - mod = Integer.valueOf(modValue.toStringUtf8()); - } - return this; - } - - @Override - public boolean testMessage(Message message) { - int value = Integer.valueOf(message.getBody().toStringUtf8()); - return 0 == value % mod; - } - - @Override - public ServerMessageFilter initialize(Configuration conf) throws ConfigurationException, IOException { - // do nothing - return this; - } - - @Override - public void uninitialize() { - // do nothing - } - - } - - - protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration { - - ThrottleDeliveryServerConfiguration(int serverPort, int sslServerPort) { - super(serverPort, sslServerPort); - } - - @Override - public int getDefaultMessageWindowSize() { - return TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE; - } - } - - protected class ThrottleDeliveryClientConfiguration extends HubClientConfiguration { - - int messageWindowSize; - - ThrottleDeliveryClientConfiguration() { - this(TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE); - } - - ThrottleDeliveryClientConfiguration(int messageWindowSize) { - this.messageWindowSize = messageWindowSize; - } - - @Override - public int getMaximumOutstandingMessages() { - return messageWindowSize; - } - - void setMessageWindowSize(int messageWindowSize) { - this.messageWindowSize = messageWindowSize; - } - - @Override - public boolean isAutoSendConsumeMessageEnabled() { - return false; - } - - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return isSubscriptionChannelSharingEnabled; - } - } - - private void publishNums(Publisher pub, ByteString topic, int start, int num, int M) throws Exception { - for (int i = 1; i <= num; i++) { - PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder().addEntries( - PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD) - .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M)))); - MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder); - Message msg = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(start + i))) - .setHeader(headerBuilder).build(); - pub.publish(topic, msg); - } - } - - private void throttleWithFilter(Publisher pub, final Subscriber sub, - ByteString topic, ByteString subid, - final int X) throws Exception { - // publish numbers with header (so only 3 messages would be delivered) - publishNums(pub, topic, 0, 3 * X, X); - - // subscribe the topic with filter - PubSubProtocol.Map userOptions = PubSubProtocol.Map - .newBuilder() - .addEntries( - PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD) - .setValue(ByteString.copyFromUtf8(String.valueOf(X)))).build(); - SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH) - .setOptions(userOptions).setMessageFilter(ModMessageFilter.class.getName()).build(); - sub.subscribe(topic, subid, opts); - - final AtomicInteger expected = new AtomicInteger(X); - final CountDownLatch latch = new CountDownLatch(1); - sub.startDelivery(topic, subid, new MessageHandler() { - @Override - public synchronized void deliver(ByteString topic, ByteString subscriberId, - Message msg, - Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - logger.debug("Received message {},", value); - - if (value == expected.get()) { - expected.addAndGet(X); - } else { - // error condition - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - latch.countDown(); - } - if (value == 3 * X) { - latch.countDown(); - } - callback.operationFinished(context, null); - sub.consume(topic, subscriberId, msg.getMsgId()); - } catch (Exception e) { - logger.error("Received bad message", e); - latch.countDown(); - } - } - }); - - assertTrue("Timed out waiting for messages " + 3 * X, latch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + 4 * X, 4 * X, expected.get()); - - sub.stopDelivery(topic, subid); - sub.closeSubscription(topic, subid); - } - - private void throttleX(Publisher pub, final Subscriber sub, - ByteString topic, ByteString subid, - final int X) throws Exception { - for (int i=1; i<=3*X; i++) { - pub.publish(topic, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.ATTACH).build(); - sub.subscribe(topic, subid, opts); - - final AtomicInteger expected = new AtomicInteger(1); - final CountDownLatch throttleLatch = new CountDownLatch(1); - final CountDownLatch nonThrottleLatch = new CountDownLatch(1); - sub.startDelivery(topic, subid, new MessageHandler() { - @Override - public synchronized void deliver(ByteString topic, ByteString subscriberId, - Message msg, - Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - logger.debug("Received message {},", value); - - if (value == expected.get()) { - expected.incrementAndGet(); - } else { - // error condition - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - throttleLatch.countDown(); - nonThrottleLatch.countDown(); - } - if (expected.get() > X+1) { - throttleLatch.countDown(); - } - if (expected.get() == (3 * X + 1)) { - nonThrottleLatch.countDown(); - } - callback.operationFinished(context, null); - if (expected.get() > X + 1) { - sub.consume(topic, subscriberId, msg.getMsgId()); - } - } catch (Exception e) { - logger.error("Received bad message", e); - throttleLatch.countDown(); - nonThrottleLatch.countDown(); - } - } - }); - assertFalse("Received more messages than throttle value " + X, - throttleLatch.await(3, TimeUnit.SECONDS)); - assertEquals("Should be expected messages with only " + (X+1), X+1, expected.get()); - - // consume messages to not throttle it - for (int i=1; i<=X; i++) { - sub.consume(topic, subid, - MessageSeqId.newBuilder().setLocalComponent(i).build()); - } - - assertTrue("Timed out waiting for messages " + (3*X + 1), - nonThrottleLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + (3*X + 1), - 3*X + 1, expected.get()); - - sub.stopDelivery(topic, subid); - sub.closeSubscription(topic, subid); - } - - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { { false }, { true } }); - } - - protected boolean isSubscriptionChannelSharingEnabled; - - public TestThrottlingDelivery(boolean isSubscriptionChannelSharingEnabled) { - super(1); - this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled; - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - } - - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); - } - - @Override - protected ServerConfiguration getServerConfiguration(int port, int sslPort) { - return new ThrottleDeliveryServerConfiguration(port, sslPort); - } - - @Test(timeout=60000) - public void testServerSideThrottle() throws Exception { - int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE; - ThrottleDeliveryClientConfiguration conf = - new ThrottleDeliveryClientConfiguration(); - HedwigClient client = new HedwigClient(conf); - Publisher pub = client.getPublisher(); - Subscriber sub = client.getSubscriber(); - - ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle"); - ByteString subid = ByteString.copyFromUtf8("serverThrottleSub"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - sub.subscribe(topic, subid, opts); - sub.closeSubscription(topic, subid); - - // throttle with hub server's setting - throttleX(pub, sub, topic, subid, DEFAULT_MESSAGE_WINDOW_SIZE); - - messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE / 2; - // throttle with a lower value than hub server's setting - SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE) - .setMessageWindowSize(messageWindowSize); - topic = ByteString.copyFromUtf8("testServerSideThrottleWithLowerValue"); - sub.subscribe(topic, subid, optionsBuilder.build()); - sub.closeSubscription(topic, subid); - throttleX(pub, sub, topic, subid, messageWindowSize); - - messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE + 5; - // throttle with a higher value than hub server's setting - optionsBuilder = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE) - .setMessageWindowSize(messageWindowSize); - topic = ByteString.copyFromUtf8("testServerSideThrottleWithHigherValue"); - sub.subscribe(topic, subid, optionsBuilder.build()); - sub.closeSubscription(topic, subid); - throttleX(pub, sub, topic, subid, messageWindowSize); - - client.close(); - } - - @Test(timeout = 60000) - public void testThrottleWithServerSideFilter() throws Exception { - int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE; - ThrottleDeliveryClientConfiguration conf = new ThrottleDeliveryClientConfiguration(); - HedwigClient client = new HedwigClient(conf); - Publisher pub = client.getPublisher(); - Subscriber sub = client.getSubscriber(); - - ByteString topic = ByteString.copyFromUtf8("testThrottleWithServerSideFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).build(); - sub.subscribe(topic, subid, opts); - sub.closeSubscription(topic, subid); - - // message gap: half of the throttle threshold - throttleWithFilter(pub, sub, topic, subid, messageWindowSize / 2); - // message gap: equals to the throttle threshold - throttleWithFilter(pub, sub, topic, subid, messageWindowSize); - // message gap: larger than the throttle threshold - throttleWithFilter(pub, sub, topic, subid, messageWindowSize + messageWindowSize / 2); - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java b/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java deleted file mode 100644 index 8e9b8f6..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java +++ /dev/null @@ -1,415 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.filter; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.MessageHandler; -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protoextensions.MapUtils; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; - -import org.apache.hedwig.client.api.Client; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.filter.ClientMessageFilter; -import org.apache.hedwig.filter.MessageFilterBase; -import org.apache.hedwig.filter.ServerMessageFilter; -import org.apache.hedwig.util.Callback; - -import org.apache.hedwig.server.HedwigHubTestBase; - -public class TestMessageFilter extends HedwigHubTestBase { - - // Client side variables - protected ClientConfiguration conf; - protected HedwigClient client; - protected Publisher publisher; - protected Subscriber subscriber; - - static final String OPT_MOD = "MOD"; - - static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter { - - int mod; - - @Override - public ServerMessageFilter initialize(Configuration conf) { - // do nothing - return this; - } - - @Override - public void uninitialize() { - // do nothing; - } - - @Override - public MessageFilterBase setSubscriptionPreferences(ByteString topic, - ByteString subscriberId, - SubscriptionPreferences preferences) { - Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences); - ByteString modValue = userOptions.get(OPT_MOD); - if (null == modValue) { - mod = 0; - } else { - mod = Integer.valueOf(modValue.toStringUtf8()); - } - return this; - } - - @Override - public boolean testMessage(Message msg) { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - return 0 == value % mod; - } - } - - static class HeaderMessageFilter implements ServerMessageFilter, ClientMessageFilter { - int mod; - @Override - public ServerMessageFilter initialize(Configuration conf) { - // do nothing - return this; - } - - @Override - public void uninitialize() { - // do nothing - } - - @Override - public MessageFilterBase setSubscriptionPreferences(ByteString topic, - ByteString subscriberId, - SubscriptionPreferences preferences) { - // do nothing now - return this; - } - - @Override - public boolean testMessage(Message msg) { - if (msg.hasHeader()) { - MessageHeader header = msg.getHeader(); - if (header.hasProperties()) { - Map<String, ByteString> props = MapUtils.buildMap(header.getProperties()); - ByteString value = props.get(OPT_MOD); - if (null == value) { - return false; - } - int intValue = Integer.valueOf(value.toStringUtf8()); - if (0 != intValue) { - return false; - } - return true; - } else { - return false; - } - } else { - return false; - } - } - } - - public TestMessageFilter() { - super(1); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - - conf = new HubClientConfiguration() { - @Override - public boolean isAutoSendConsumeMessageEnabled() { - return false; - } - }; - client = new HedwigClient(conf); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - @Override - @After - public void tearDown() throws Exception { - client.close(); - super.tearDown(); - } - - private void publishNums(ByteString topic, int start, int num, int M) throws Exception { - for (int i=1; i<=num; i++) { - PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder() - .addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD) - .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M)))); - MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder); - Message msg = Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf((start + i)))) - .setHeader(headerBuilder).build(); - publisher.publish(topic, msg); - } - } - - private void receiveNumModM(final ByteString topic, final ByteString subid, - final String filterClassName, final ClientMessageFilter filter, - final int start, final int num, final int M, - final boolean consume) - throws Exception { - PubSubProtocol.Map userOptions = PubSubProtocol.Map.newBuilder() - .addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD) - .setValue(ByteString.copyFromUtf8(String.valueOf(M)))).build(); - SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.ATTACH) - .setOptions(userOptions); - if (null != filterClassName) { - optionsBuilder.setMessageFilter(filterClassName); - } - subscriber.subscribe(topic, subid, optionsBuilder.build()); - - final int base = start + M - start % M; - - final AtomicInteger expected = new AtomicInteger(base); - final CountDownLatch latch = new CountDownLatch(1); - MessageHandler msgHandler = new MessageHandler() { - synchronized public void deliver(ByteString topic, ByteString subscriberId, - Message msg, Callback<Void> callback, - Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - // duplicated messages received, ignore them - if (value > start) { - if (value == expected.get()) { - expected.addAndGet(M); - } else { - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - latch.countDown(); - } - if (expected.get() == (base + num * M)) { - latch.countDown(); - } - } - callback.operationFinished(context, null); - if (consume) { - subscriber.consume(topic, subid, msg.getMsgId()); - } - } catch (Exception e) { - logger.error("Received bad message", e); - latch.countDown(); - } - } - }; - if (null != filter) { - subscriber.startDeliveryWithFilter(topic, subid, msgHandler, filter); - } else { - subscriber.startDelivery(topic, subid, msgHandler); - } - assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(), - latch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get()); - subscriber.stopDelivery(topic, subid); - subscriber.closeSubscription(topic, subid); - } - - @Test(timeout=60000) - public void testServerSideMessageFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestMessageFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - publishNums(topic, 0, 100, 2); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, true); - } - - @Test(timeout=60000) - public void testInvalidServerSideMessageFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestInvalidMessageFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions options = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH) - .setMessageFilter("Invalid_Message_Filter").build(); - try { - subscriber.subscribe(topic, subid, options); - // coun't reach here - fail("Should fail subscribe with invalid message filter"); - } catch (PubSubException pse) { - assertTrue("Should respond with INVALID_MESSAGE_FILTER", - pse.getMessage().contains("INVALID_MESSAGE_FILTER")); - } - } - - @Test(timeout=60000) - public void testChangeSubscriptionPreferences() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferences"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - - publishNums(topic, 0, 100, 2); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 33, 3, true); - - // change mod to receive numbers mod 5 - publishNums(topic, 100, 100, 5); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 100, 20, 5, true); - - // change mod to receive numbers mod 7 - publishNums(topic, 200, 100, 7); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 200, 14, 7, true); - } - - @Test(timeout=60000) - public void testChangeServerSideMessageFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestChangeMessageFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - - publishNums(topic, 0, 100, 3); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false); - receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false); - receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true); - - publishNums(topic, 200, 100, 7); - receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 200, 14, 7, true); - } - - @Test(timeout=60000) - public void testFixInvalidServerSideMessageFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestFixMessageFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - - publishNums(topic, 0, 100, 3); - try { - receiveNumModM(topic, subid, "Invalid_Message_Filter", null, 0, 33, 3, true); - // coun't reach here - fail("Should fail subscribe with invalid message filter"); - } catch (Exception pse) { - assertTrue("Should respond with INVALID_MESSAGE_FILTER", - pse.getMessage().contains("INVALID_MESSAGE_FILTER")); - } - receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true); - } - - @Test(timeout=60000) - public void testNullClientMessageFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestNullClientMessageFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - try { - subscriber.startDeliveryWithFilter(topic, subid, null, new ModMessageFilter()); - fail("Should fail start delivery with filter using null message handler."); - } catch (NullPointerException npe) { - } - - try { - subscriber.startDeliveryWithFilter(topic, subid, new MessageHandler() { - public void deliver(ByteString topic, ByteString subscriberId, - Message msg, Callback<Void> callback, Object context) { - // do nothing - } - }, null); - fail("Should fail start delivery with filter using null message filter."); - } catch (NullPointerException npe) { - } - } - - @Test(timeout=60000) - public void testClientSideMessageFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestClientMessageFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - publishNums(topic, 0, 100, 2); - receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, true); - } - - @Test(timeout=60000) - public void testChangeSubscriptionPreferencesForClientFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferencesForClientFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - - publishNums(topic, 0, 100, 2); - receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false); - receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 25, 4, false); - receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 33, 3, true); - } - - @Test(timeout=60000) - public void testChangeClientSideMessageFilter() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestChangeClientSideMessageFilter"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - - publishNums(topic, 0, 100, 3); - receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false); - receiveNumModM(topic, subid, null, new HeaderMessageFilter(), 0, 33, 3, true); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java deleted file mode 100644 index 4a5c63d..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import java.util.List; - -import org.jboss.netty.channel.Channel; -import org.junit.Before; -import org.junit.Test; - -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.WriteRecordingChannel; -import org.apache.hedwig.server.topics.StubTopicManager; -import org.apache.hedwig.server.topics.TopicManager; - -import static org.junit.Assert.*; - -public class TestBaseHandler { - - MyBaseHandler handler; - StubTopicManager tm; - PubSubRequest request = PubSubRequest.getDefaultInstance(); - WriteRecordingChannel channel = new WriteRecordingChannel(); - - protected class MyBaseHandler extends BaseHandler { - - public MyBaseHandler(TopicManager tm, ServerConfiguration conf) { - super(tm, conf); - } - - PubSubRequest request; - - public PubSubRequest getRequest() { - return request; - } - - @Override - public void handleRequestAtOwner(PubSubRequest request, Channel channel) { - this.request = request; - } - - } - - @Before - public void setUp() throws Exception { - ServerConfiguration conf = new ServerConfiguration(); - tm = new StubTopicManager(conf); - handler = new MyBaseHandler(tm, conf); - request = PubSubRequest.getDefaultInstance(); - channel = new WriteRecordingChannel(); - } - - public PubSubResponse getPubSubResponse(WriteRecordingChannel channel) { - List<Object> messages = channel.getMessagesWritten(); - assertEquals(messages.size(), 1); - - Object message = messages.get(0); - assertEquals(message.getClass(), PubSubResponse.class); - - return (PubSubResponse) message; - } - - @Test(timeout=60000) - public void testHandleRequestOnRedirect() throws Exception { - tm.setShouldOwnEveryNewTopic(false); - handler.handleRequest(request, channel); - - PubSubResponse response = getPubSubResponse(channel); - assertEquals(response.getStatusCode(), StatusCode.NOT_RESPONSIBLE_FOR_TOPIC); - assertEquals(request.getTxnId(), response.getTxnId()); - assertNull(handler.getRequest()); - - } - - @Test(timeout=60000) - public void testHandleRequestOnOwner() throws Exception { - - tm.setShouldOwnEveryNewTopic(true); - handler.handleRequest(request, channel); - assertEquals(0, channel.getMessagesWritten().size()); - assertEquals(handler.getRequest(), request); - - } - - @Test(timeout=60000) - public void testHandleRequestOnError() throws Exception { - - tm.setShouldError(true); - handler.handleRequest(request, channel); - - PubSubResponse response = getPubSubResponse(channel); - assertEquals(response.getStatusCode(), StatusCode.SERVICE_DOWN); - assertEquals(request.getTxnId(), response.getTxnId()); - assertNull(handler.getRequest()); - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java deleted file mode 100644 index 93f5c2e..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import org.junit.Before; -import org.junit.Test; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.StubCallback; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.filter.PipelineFilter; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.delivery.ChannelEndPoint; -import org.apache.hedwig.server.delivery.StubDeliveryManager; -import org.apache.hedwig.server.delivery.StubDeliveryManager.StartServingRequest; -import org.apache.hedwig.server.netty.WriteRecordingChannel; -import org.apache.hedwig.server.persistence.LocalDBPersistenceManager; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter; -import org.apache.hedwig.server.subscriptions.StubSubscriptionManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager; -import org.apache.hedwig.util.ConcurrencyUtils; - -import static org.junit.Assert.*; - -public class TestSubUnsubHandler { - - SubscribeHandler sh; - StubDeliveryManager dm; - StubSubscriptionManager sm; - SubscriptionChannelManager subChannelMgr; - ByteString topic = ByteString.copyFromUtf8("topic"); - WriteRecordingChannel channel; - - SubscribeRequest subRequestPrototype; - PubSubRequest pubSubRequestPrototype; - ByteString subscriberId; - UnsubscribeHandler ush; - - @Before - public void setUp() throws Exception { - ServerConfiguration conf = new ServerConfiguration(); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - - TopicManager tm = new TrivialOwnAllTopicManager(conf, executor); - dm = new StubDeliveryManager(); - PersistenceManager pm = LocalDBPersistenceManager.instance(); - sm = new StubSubscriptionManager(tm, pm, dm, conf, executor); - subChannelMgr = new SubscriptionChannelManager(); - sh = new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr); - channel = new WriteRecordingChannel(); - - subscriberId = ByteString.copyFromUtf8("subId"); - - subRequestPrototype = SubscribeRequest.newBuilder().setSubscriberId(subscriberId).build(); - pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType( - OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build(); - - ush = new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr); - } - - @Test(timeout=60000) - public void testNoSubscribeRequest() { - sh.handleRequestAtOwner(PubSubRequest.newBuilder(pubSubRequestPrototype).clearSubscribeRequest().build(), - channel); - assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0)) - .getStatusCode()); - } - - @Test(timeout=60000) - public void testSuccessCase() { - StubCallback<Void> callback = new StubCallback<Void>(); - sm.acquiredTopic(topic, callback, null); - assertNull(ConcurrencyUtils.take(callback.queue).right()); - - sh.handleRequestAtOwner(pubSubRequestPrototype, channel); - assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode()); - - // make sure the channel was put in the maps - Set<TopicSubscriber> topicSubs = new HashSet<TopicSubscriber>(); - topicSubs.add(new TopicSubscriber(topic, subscriberId)); - assertEquals(topicSubs, - subChannelMgr.channel2sub.get(channel)); - assertEquals(channel, - subChannelMgr.sub2Channel.get(new TopicSubscriber(topic, subscriberId))); - - // make sure delivery was started - StartServingRequest startRequest = (StartServingRequest) dm.lastRequest.poll(); - assertEquals(channel, ((ChannelEndPoint) startRequest.endPoint).getChannel()); - assertEquals(PipelineFilter.class, startRequest.filter.getClass()); - PipelineFilter pfilter = (PipelineFilter)(startRequest.filter); - assertEquals(1, pfilter.size()); - assertEquals(AllToAllTopologyFilter.class, pfilter.getFirst().getClass()); - assertEquals(1, startRequest.seqIdToStartFrom.getLocalComponent()); - assertEquals(subscriberId, startRequest.subscriberId); - assertEquals(topic, startRequest.topic); - - // make sure subscription was registered - StubCallback<SubscriptionData> callback1 = new StubCallback<SubscriptionData>(); - sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach( - CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1, - null); - - assertEquals(PubSubException.ClientAlreadySubscribedException.class, ConcurrencyUtils.take(callback1.queue) - .right().getClass()); - - // trying to subscribe again should throw an error - WriteRecordingChannel dupChannel = new WriteRecordingChannel(); - sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel); - assertEquals(StatusCode.TOPIC_BUSY, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode()); - - // after disconnecting the channel, subscribe should work again - subChannelMgr.channelDisconnected(channel); - - dupChannel = new WriteRecordingChannel(); - sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel); - assertEquals(StatusCode.SUCCESS, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode()); - - // test unsubscribe - channel = new WriteRecordingChannel(); - ush.handleRequestAtOwner(pubSubRequestPrototype, channel); - assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0)) - .getStatusCode()); - - PubSubRequest unsubRequest = PubSubRequest.newBuilder(pubSubRequestPrototype).setUnsubscribeRequest( - UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build(); - channel = new WriteRecordingChannel(); - dm.lastRequest.clear(); - - ush.handleRequestAtOwner(unsubRequest, channel); - assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode()); - - // make sure delivery has been stopped - assertEquals(new TopicSubscriber(topic, subscriberId), dm.lastRequest.poll()); - - // make sure the info is gone from the sm - StubCallback<SubscriptionData> callback2 = new StubCallback<SubscriptionData>(); - sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach( - CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2, - null); - assertEquals(PubSubException.ClientNotSubscribedException.class, ConcurrencyUtils.take(callback2.queue).right() - .getClass()); - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java deleted file mode 100644 index 1867f9c..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java +++ /dev/null @@ -1,777 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.integration; - -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.concurrent.SynchronousQueue; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.Client; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest; -import org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.server.netty.WriteRecordingChannel; -import org.apache.hedwig.server.proxy.HedwigProxy; -import org.apache.hedwig.server.proxy.ProxyConfiguration; -import org.apache.hedwig.server.regions.HedwigHubClient; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.bookkeeper.test.PortManager; -import org.apache.hedwig.server.LoggingExceptionHandler; - -import static org.junit.Assert.*; - -public abstract class TestHedwigHub extends HedwigHubTestBase { - - // Client side variables - protected HedwigClient client; - protected Publisher publisher; - protected Subscriber subscriber; - - // Common ByteStrings used in tests. - private final ByteString localSubscriberId = ByteString.copyFromUtf8("LocalSubscriber"); - private final ByteString hubSubscriberId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX - + "HubSubcriber"); - - enum Mode { - REGULAR, PROXY, SSL - }; - - protected Mode mode; - protected boolean isSubscriptionChannelSharingEnabled; - - public TestHedwigHub(Mode mode, boolean isSubscriptionChannelSharingEnabled) { - super(3); - this.mode = mode; - this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled; - } - - protected HedwigProxy proxy; - protected ProxyConfiguration proxyConf = new ProxyConfiguration() { - final int proxyPort = PortManager.nextFreePort(); - - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return serverAddresses.get(0); - } - - @Override - public int getProxyPort() { - return proxyPort; - } - }; - - // SynchronousQueues to verify async calls - private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>(); - private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>(); - - // Test implementation of Callback for async client actions. - static class TestCallback implements Callback<Void> { - private final SynchronousQueue<Boolean> queue; - - public TestCallback(SynchronousQueue<Boolean> queue) { - this.queue = queue; - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - new Thread(new Runnable() { - @Override - public void run() { - if (logger.isDebugEnabled()) - logger.debug("Operation finished!"); - ConcurrencyUtils.put(queue, true); - } - }).start(); - } - - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - new Thread(new Runnable() { - @Override - public void run() { - logger.error("Operation failed!", exception); - ConcurrencyUtils.put(queue, false); - } - }).start(); - } - } - - // Test implementation of subscriber's message handler. - static class TestMessageHandler implements MessageHandler { - // For subscribe reconnect testing, the server could send us back - // messages we've already processed and consumed. We need to keep - // track of the ones we've encountered so we only signal back to the - // consumeQueue once. - private HashSet<MessageSeqId> consumedMessages = new HashSet<MessageSeqId>(); - private long largestMsgSeqIdConsumed = -1; - private final SynchronousQueue<Boolean> consumeQueue; - - public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) { - this.consumeQueue = consumeQueue; - } - - public void deliver(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback, - Object context) { - if (!consumedMessages.contains(msg.getMsgId())) { - // New message to consume. Add it to the Set of consumed - // messages. - consumedMessages.add(msg.getMsgId()); - // Check that the msg seq ID is incrementing by 1 compared to - // the last consumed message. Don't do this check if this is the - // initial message being consumed. - if (largestMsgSeqIdConsumed >= 0 && msg.getMsgId().getLocalComponent() != largestMsgSeqIdConsumed + 1) { - new Thread(new Runnable() { - @Override - public void run() { - if (logger.isDebugEnabled()) - logger.debug("Consuming message that is out of order for msgId: " - + msg.getMsgId().getLocalComponent()); - ConcurrencyUtils.put(consumeQueue, false); - } - }).start(); - } else { - new Thread(new Runnable() { - @Override - public void run() { - if (logger.isDebugEnabled()) - logger.debug("Consume operation finished successfully!"); - ConcurrencyUtils.put(consumeQueue, true); - } - }).start(); - } - // Store the consumed message as the new last msg id consumed. - largestMsgSeqIdConsumed = msg.getMsgId().getLocalComponent(); - } else { - if (logger.isDebugEnabled()) - logger.debug("Consumed a message that we've processed already: " + msg); - } - callback.operationFinished(context, null); - } - } - - class TestClientConfiguration extends HubClientConfiguration { - - @Override - public InetSocketAddress getDefaultServerHost() { - if (mode == Mode.PROXY) { - return new InetSocketAddress(proxyConf.getProxyPort()); - } else { - return super.getDefaultServerHost(); - } - } - - @Override - public boolean isSSLEnabled() { - if (mode == Mode.SSL) - return true; - else - return false; - } - - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return isSubscriptionChannelSharingEnabled; - } - } - - // ClientConfiguration to use for this test. - protected ClientConfiguration getClientConfiguration() { - return new TestClientConfiguration(); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - if (mode == Mode.PROXY) { - proxy = new HedwigProxy(proxyConf, new LoggingExceptionHandler()); - proxy.start(); - } - client = new HedwigClient(getClientConfiguration()); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - @Override - @After - public void tearDown() throws Exception { - client.close(); - if (mode == Mode.PROXY) { - proxy.shutdown(); - } - super.tearDown(); - - } - - // Helper function to generate Messages - protected Message getMsg(int msgNum) { - return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build(); - } - - // Helper function to generate Topics - protected ByteString getTopic(int topicNum) { - return ByteString.copyFromUtf8("Topic" + topicNum); - } - - protected void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler handler) throws Exception { - startDelivery(subscriber, topic, subscriberId, handler); - } - - protected void startDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId, - MessageHandler handler) throws Exception { - subscriber.startDelivery(topic, subscriberId, handler); - if (mode == Mode.PROXY) { - WriteRecordingChannel channel = new WriteRecordingChannel(); - PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE) - .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest( - StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build(); - proxy.getStartDeliveryHandler().handleRequest(request, channel); - assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode()); - } - } - - protected void stopDelivery(ByteString topic, ByteString subscriberId) throws Exception { - stopDelivery(subscriber, topic, subscriberId); - } - - protected void stopDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId) throws Exception { - subscriber.stopDelivery(topic, subscriberId); - if (mode == Mode.PROXY) { - PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE) - .setTopic(topic).setTxnId(1).setType(OperationType.STOP_DELIVERY).setStopDeliveryRequest( - StopDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build(); - proxy.getStopDeliveryHandler().handleRequest(request, proxy.getChannelTracker().getChannel(topic, subscriberId)); - } - } - - protected void publishBatch(int batchSize, boolean expected, boolean messagesToBeConsumed, int loop) throws Exception { - if (logger.isDebugEnabled()) - logger.debug("Publishing " + loop + " batch of messages."); - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(getTopic(i), getMsg(i + loop * batchSize), new TestCallback(queue), null); - assertTrue(expected == queue.take()); - if (messagesToBeConsumed) - assertTrue(consumeQueue.take()); - } - } - - protected void subscribeToTopics(int batchSize) throws Exception { - if (logger.isDebugEnabled()) - logger.debug("Subscribing to topics and starting delivery."); - for (int i = 0; i < batchSize; i++) { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.asyncSubscribe(getTopic(i), localSubscriberId, opts, - new TestCallback(queue), null); - assertTrue(queue.take()); - } - - // Start delivery for the subscriber - for (int i = 0; i < batchSize; i++) { - startDelivery(getTopic(i), localSubscriberId, new TestMessageHandler(consumeQueue)); - } - } - - protected void shutDownLastServer() { - if (logger.isDebugEnabled()) - logger.debug("Shutting down the last server in the Hedwig hub cluster."); - serversList.get(serversList.size() - 1).shutdown(); - // Due to a possible race condition, after we've shutdown the server, - // the client could still be caching the channel connection to that - // server. It is possible for a publish request to go to the shutdown - // server using the closed/shutdown channel before the channel - // disconnect logic kicks in. What could happen is that the publish - // is done successfully on the channel but the server on the other end - // can't/won't read it. This publish request will time out and the - // Junit test will fail. Since that particular scenario is not what is - // tested here, use a workaround of sleeping in this thread (so the - // channel disconnect logic can complete) before we publish again. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - logger.error("Thread was interrupted while sleeping after shutting down last server!", e); - } - } - - // This tests out the manual sending of consume messages to the server - // instead of relying on the automatic sending by the client lib for it. - @Test(timeout=10000) - public void testManualConsumeClient() throws Exception { - HedwigClient myClient = new HedwigClient(new TestClientConfiguration() { - @Override - public boolean isAutoSendConsumeMessageEnabled() { - return false; - } - - }); - Subscriber mySubscriber = myClient.getSubscriber(); - Publisher myPublisher = myClient.getPublisher(); - ByteString myTopic = getTopic(0); - // Subscribe to a topic and start delivery on it - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - mySubscriber.asyncSubscribe(myTopic, localSubscriberId, opts, - new TestCallback(queue), null); - assertTrue(queue.take()); - startDelivery(mySubscriber, myTopic, localSubscriberId, new TestMessageHandler(consumeQueue)); - // Publish some messages - int batchSize = 10; - for (int i = 0; i < batchSize; i++) { - myPublisher.asyncPublish(myTopic, getMsg(i), new TestCallback(queue), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - } - // Now manually send a consume message for each message received - for (int i = 0; i < batchSize; i++) { - boolean success = true; - try { - mySubscriber.consume(myTopic, localSubscriberId, MessageSeqId.newBuilder().setLocalComponent(i + 1) - .build()); - } catch (ClientNotSubscribedException e) { - success = false; - } - assertTrue(success); - } - // Since the consume call eventually does an async write to the Netty - // channel, the writing of the consume requests may not have completed - // yet before we stop the client. Sleep a little before we stop the - // client just so error messages are not logged. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - logger.error("Thread was interrupted while waiting to stop client for manual consume test!!", e); - } - myClient.close(); - } - - @Test(timeout=10000) - public void testAttachToSubscriptionSuccess() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts1 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, opts1, new TestCallback(queue), - null); - assertTrue(queue.take()); - // Close the subscription asynchronously - subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null); - assertTrue(queue.take()); - - SubscriptionOptions opts2 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.ATTACH).build(); - // Now try to attach to the subscription - subscriber.asyncSubscribe(topic, localSubscriberId, opts2, new TestCallback(queue), null); - assertTrue(queue.take()); - // Start delivery and publish some messages. Make sure they are consumed - // correctly. - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - int batchSize = 5; - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - } - } - - @Test(timeout=10000) - public void testServerRedirect() throws Exception { - int batchSize = 10; - publishBatch(batchSize, true, false, 0); - } - - @Test(timeout=10000) - public void testSubscribeAndConsume() throws Exception { - int batchSize = 10; - subscribeToTopics(batchSize); - publishBatch(batchSize, true, true, 0); - } - - @Test(timeout=10000) - public void testServerFailoverPublishOnly() throws Exception { - int batchSize = 10; - publishBatch(batchSize, true, false, 0); - shutDownLastServer(); - publishBatch(batchSize, true, false, 1); - } - - @Test(timeout=10000) - public void testServerFailover() throws Exception { - int batchSize = 10; - subscribeToTopics(batchSize); - publishBatch(batchSize, true, true, 0); - shutDownLastServer(); - publishBatch(batchSize, true, true, 1); - } - - @Test(timeout=10000) - public void testUnsubscribe() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertTrue(queue.take()); - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - // Send an Unsubscribe request - subscriber.asyncUnsubscribe(topic, localSubscriberId, new TestCallback(queue), null); - assertTrue(queue.take()); - // Now publish a message and make sure it is not consumed by the client - publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null); - assertTrue(queue.take()); - // Wait a little bit just in case the message handler is still active, - // consuming the message, and then putting a true value in the - // consumeQueue. - Thread.sleep(1000); - // Put a False value on the consumeQueue so we can verify that it - // is not blocked by a message consume action which already put a True - // value into the queue. - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(consumeQueue, false); - } - }).start(); - assertFalse(consumeQueue.take()); - } - - @Test(timeout=10000) - public void testSyncUnsubscribeWithoutSubscription() throws Exception { - boolean unsubscribeSuccess = false; - try { - subscriber.unsubscribe(getTopic(0), localSubscriberId); - } catch (ClientNotSubscribedException e) { - unsubscribeSuccess = true; - } catch (Exception ex) { - unsubscribeSuccess = false; - } - assertTrue(unsubscribeSuccess); - } - - @Test(timeout=10000) - public void testAsyncUnsubscribeWithoutSubscription() throws Exception { - subscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null); - assertFalse(queue.take()); - } - - @Test(timeout=10000) - public void testCloseSubscription() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertTrue(queue.take()); - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - // Close the subscription asynchronously - subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null); - assertTrue(queue.take()); - // Now publish a message and make sure it is not consumed by the client - publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null); - assertTrue(queue.take()); - // Wait a little bit just in case the message handler is still active, - // consuming the message, and then putting a true value in the - // consumeQueue. - Thread.sleep(1000); - // Put a False value on the consumeQueue so we can verify that it - // is not blocked by a message consume action which already put a True - // value into the queue. - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(consumeQueue, false); - } - }).start(); - assertFalse(consumeQueue.take()); - } - - @Test(timeout=10000) - public void testStartDeliveryTwice() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertTrue(queue.take()); - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - try { - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - fail("Should not reach here!"); - } catch (AlreadyStartDeliveryException e) { - } - } - - @Test(timeout=10000) - public void testStopDelivery() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertTrue(queue.take()); - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - // Stop the delivery for this subscription - stopDelivery(topic, localSubscriberId); - // Publish some more messages so they are queued up to be delivered to - // the client - int batchSize = 10; - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(topic, getMsg(i + 1), new TestCallback(queue), null); - assertTrue(queue.take()); - } - // Wait a little bit just in case the message handler is still active, - // consuming the message, and then putting a true value in the - // consumeQueue. - Thread.sleep(1000); - // Put a False value on the consumeQueue so we can verify that it - // is not blocked by a message consume action which already put a True - // value into the queue. - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(consumeQueue, false); - } - }).start(); - assertFalse(consumeQueue.take()); - // Now start delivery again and verify that the queued up messages are - // consumed - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - for (int i = 0; i < batchSize; i++) { - assertTrue(consumeQueue.take()); - } - } - - @Test(timeout=10000) - public void testConsumedMessagesInOrder() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertTrue(queue.take()); - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - // Now publish some messages and verify that they are delivered in order - // to the subscriber - int batchSize = 100; - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null); - } - // We've sent out all of the publish messages asynchronously, - // now verify that they are consumed in the correct order. - for (int i = 0; i < batchSize; i++) { - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - } - } - - @Test(timeout=10000) - public void testCreateSubscriptionFailure() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertTrue(queue.take()); - // Close the subscription asynchronously - subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null); - assertTrue(queue.take()); - // Now try to create the subscription when it already exists - SubscriptionOptions optsCreate = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, optsCreate, new TestCallback(queue), null); - assertFalse(queue.take()); - } - - @Test(timeout=10000) - public void testCreateSubscriptionSuccess() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertTrue(queue.take()); - startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue)); - int batchSize = 5; - for (int i = 0; i < batchSize; i++) { - publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - } - } - - @Test(timeout=10000) - public void testAttachToSubscriptionFailure() throws Exception { - ByteString topic = getTopic(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.ATTACH).build(); - - subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null); - assertFalse(queue.take()); - } - - // The following 4 tests are to make sure that the subscriberId validation - // works when it is a local subscriber and we're expecting the subscriberId - // to be in the "local" specific format. - @Test(timeout=10000) - public void testSyncSubscribeWithInvalidSubscriberId() throws Exception { - boolean subscribeSuccess = false; - try { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(getTopic(0), hubSubscriberId, opts); - } catch (InvalidSubscriberIdException e) { - subscribeSuccess = true; - } catch (Exception ex) { - subscribeSuccess = false; - } - assertTrue(subscribeSuccess); - } - - @Test(timeout=10000) - public void testAsyncSubscribeWithInvalidSubscriberId() throws Exception { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, opts, - new TestCallback(queue), null); - assertFalse(queue.take()); - } - - @Test(timeout=10000) - public void testSyncUnsubscribeWithInvalidSubscriberId() throws Exception { - boolean unsubscribeSuccess = false; - try { - subscriber.unsubscribe(getTopic(0), hubSubscriberId); - } catch (InvalidSubscriberIdException e) { - unsubscribeSuccess = true; - } catch (Exception ex) { - unsubscribeSuccess = false; - } - assertTrue(unsubscribeSuccess); - } - - @Test(timeout=10000) - public void testAsyncUnsubscribeWithInvalidSubscriberId() throws Exception { - subscriber.asyncUnsubscribe(getTopic(0), hubSubscriberId, new TestCallback(queue), null); - assertFalse(queue.take()); - } - - // The following 4 tests are to make sure that the subscriberId validation - // also works when it is a hub subscriber and we're expecting the - // subscriberId to be in the "hub" specific format. - @Test(timeout=10000) - public void testSyncHubSubscribeWithInvalidSubscriberId() throws Exception { - Client hubClient = new HedwigHubClient(new HubClientConfiguration()); - Subscriber hubSubscriber = hubClient.getSubscriber(); - boolean subscribeSuccess = false; - try { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - hubSubscriber.subscribe(getTopic(0), localSubscriberId, opts); - } catch (InvalidSubscriberIdException e) { - subscribeSuccess = true; - } catch (Exception ex) { - subscribeSuccess = false; - } - assertTrue(subscribeSuccess); - hubClient.close(); - } - - @Test(timeout=10000) - public void testAsyncHubSubscribeWithInvalidSubscriberId() throws Exception { - Client hubClient = new HedwigHubClient(new HubClientConfiguration()); - Subscriber hubSubscriber = hubClient.getSubscriber(); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, opts, new TestCallback( - queue), null); - assertFalse(queue.take()); - hubClient.close(); - } - - @Test(timeout=10000) - public void testSyncHubUnsubscribeWithInvalidSubscriberId() throws Exception { - Client hubClient = new HedwigHubClient(new HubClientConfiguration()); - Subscriber hubSubscriber = hubClient.getSubscriber(); - boolean unsubscribeSuccess = false; - try { - hubSubscriber.unsubscribe(getTopic(0), localSubscriberId); - } catch (InvalidSubscriberIdException e) { - unsubscribeSuccess = true; - } catch (Exception ex) { - unsubscribeSuccess = false; - } - assertTrue(unsubscribeSuccess); - hubClient.close(); - } - - @Test(timeout=10000) - public void testAsyncHubUnsubscribeWithInvalidSubscriberId() throws Exception { - Client hubClient = new HedwigHubClient(new HubClientConfiguration()); - Subscriber hubSubscriber = hubClient.getSubscriber(); - hubSubscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null); - assertFalse(queue.take()); - hubClient.close(); - } - - @Test(timeout=10000) - public void testPublishWithBookKeeperError() throws Exception { - int batchSize = 10; - publishBatch(batchSize, true, false, 0); - // stop all bookie servers - bktb.stopAllBookieServers(); - // following publish would failed with NotEnoughBookies - publishBatch(batchSize, false, false, 1); - // start all bookie servers - bktb.startAllBookieServers(); - // following publish should succeed - publishBatch(batchSize, true, false, 1); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java deleted file mode 100644 index 64e5b4e..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import java.util.Collection; -import java.util.Arrays; - -@RunWith(Parameterized.class) -public class TestHedwigHubProxy extends TestHedwigHub { - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { { true }, { false } }); - } - - public TestHedwigHubProxy(boolean isSubscriptionChannelSharingEnabled) { - super(Mode.PROXY, isSubscriptionChannelSharingEnabled); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java deleted file mode 100644 index 2e370c0..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import java.util.Collection; -import java.util.Arrays; - -@RunWith(Parameterized.class) -public class TestHedwigHubRegular extends TestHedwigHub { - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { { true }, { false } }); - } - - public TestHedwigHubRegular(boolean isSubscriptionChannelSharingEnabled) { - super(Mode.REGULAR, isSubscriptionChannelSharingEnabled); - } -}
