Added: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java?rev=1405028&view=auto
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
 (added)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
 Fri Nov  2 16:14:59 2012
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import 
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+
+public class TestMultiplexing extends HedwigHubTestBase {
+
+    private static final int DEFAULT_MSG_WINDOW_SIZE = 10;
+
+    protected class TestServerConfiguration extends HubServerConfiguration {
+        TestServerConfiguration(int serverPort, int sslServerPort) {
+            super(serverPort, sslServerPort);
+        }
+        @Override
+        public int getDefaultMessageWindowSize() {
+            return DEFAULT_MSG_WINDOW_SIZE;
+        }
+    }
+
+    class TestMessageHandler implements MessageHandler {
+
+        int expected;
+        final int numMsgsAtFirstRun;
+        final int numMsgsAtSecondRun;
+        final CountDownLatch firstLatch;
+        final CountDownLatch secondLatch;
+        final boolean receiveSecondRun;
+
+        public TestMessageHandler(int start, int numMsgsAtFirstRun,
+                                  boolean receiveSecondRun,
+                                  int numMsgsAtSecondRun) {
+            expected = start;
+            this.numMsgsAtFirstRun = numMsgsAtFirstRun;
+            this.numMsgsAtSecondRun = numMsgsAtSecondRun;
+            this.receiveSecondRun = receiveSecondRun;
+            firstLatch = new CountDownLatch(1);
+            secondLatch = new CountDownLatch(1);
+        }
+
+        @Override
+        public synchronized void deliver(ByteString topic, ByteString 
subscriberId,
+                                         Message msg,
+                                         Callback<Void> callback, Object 
context) {
+            try {
+                int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                logger.debug("Received message {}.", value);
+
+                if (value == expected) {
+                    ++expected;
+                } else {
+                    // error condition
+                    logger.error("Did not receive expected value, expected {}, 
got {}",
+                                 expected, value);
+                    expected = 0;
+                    firstLatch.countDown();
+                    secondLatch.countDown();
+                }
+                if (numMsgsAtFirstRun + 1 == expected) {
+                    firstLatch.countDown();
+                }
+                if (receiveSecondRun) {
+                    if (numMsgsAtFirstRun + numMsgsAtSecondRun + 1 == 
expected) {
+                        secondLatch.countDown();
+                    }
+                } else {
+                    if (numMsgsAtFirstRun + 1 < expected) {
+                        secondLatch.countDown();
+                    }
+                }
+                callback.operationFinished(context, null);
+                subscriber.consume(topic, subscriberId, msg.getMsgId());
+            } catch (Throwable t) {
+                logger.error("Received bad message.", t);
+                firstLatch.countDown();
+                secondLatch.countDown();
+            }
+        }
+
+        public void checkFirstRun() throws Exception {
+            assertTrue("Timed out waiting for messages " + (numMsgsAtFirstRun 
+ 1),
+                       firstLatch.await(10, TimeUnit.SECONDS));
+            assertEquals("Should be expected messages with " + 
(numMsgsAtFirstRun + 1),
+                         numMsgsAtFirstRun + 1, expected);
+        }
+
+        public void checkSecondRun() throws Exception {
+            if (receiveSecondRun) {
+                assertTrue("Timed out waiting for messages "
+                           + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1),
+                           secondLatch.await(10, TimeUnit.SECONDS));
+                assertEquals("Should be expected messages with "
+                             + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1),
+                             numMsgsAtFirstRun + numMsgsAtSecondRun + 1, 
expected);
+            } else {
+                assertFalse("Receive more messages than " + numMsgsAtFirstRun,
+                            secondLatch.await(3, TimeUnit.SECONDS));
+                assertEquals("Should be expected messages with ony " + 
(numMsgsAtFirstRun + 1),
+                             numMsgsAtFirstRun + 1, expected);
+            }
+        }
+    }
+
+    class ThrottleMessageHandler implements MessageHandler {
+
+        int expected;
+        final int numMsgs;
+        final int numMsgsThrottle;
+        final CountDownLatch throttleLatch;
+        final CountDownLatch nonThrottleLatch;
+        final boolean enableThrottle;
+
+        public ThrottleMessageHandler(int start, int numMsgs,
+                                      boolean enableThrottle,
+                                      int numMsgsThrottle) {
+            expected = start;
+            this.numMsgs = numMsgs;
+            this.enableThrottle = enableThrottle;
+            this.numMsgsThrottle = numMsgsThrottle;
+            throttleLatch = new CountDownLatch(1);
+            nonThrottleLatch = new CountDownLatch(1);
+        }
+
+        @Override
+        public synchronized void deliver(ByteString topic, ByteString 
subscriberId,
+                                         Message msg,
+                                         Callback<Void> callback, Object 
context) {
+            try {
+                int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                logger.debug("Received message {}.", value);
+
+                if (value == expected) {
+                    ++expected;
+                } else {
+                    // error condition
+                    logger.error("Did not receive expected value, expected {}, 
got {}",
+                                 expected, value);
+                    expected = 0;
+                    throttleLatch.countDown();
+                    nonThrottleLatch.countDown();
+                }
+                if (expected == numMsgsThrottle + 2) {
+                    throttleLatch.countDown();
+                }
+                if (expected == numMsgs + 1) {
+                    nonThrottleLatch.countDown();
+                }
+                callback.operationFinished(context, null);
+                if (enableThrottle) {
+                    if (expected > numMsgsThrottle + 1) {
+                        subscriber.consume(topic, subscriberId, 
msg.getMsgId());
+                    }
+                } else {
+                    subscriber.consume(topic, subscriberId, msg.getMsgId());
+                }
+            } catch (Throwable t) {
+                logger.error("Received bad message.", t);
+                throttleLatch.countDown();
+                nonThrottleLatch.countDown();
+            }
+        }
+
+        public void checkThrottle() throws Exception {
+            if (enableThrottle) {
+                assertFalse("Received more messages than throttle value " + 
numMsgsThrottle,
+                            throttleLatch.await(3, TimeUnit.SECONDS));
+                assertEquals("Should be expected messages with only " + 
(numMsgsThrottle + 1),
+                             numMsgsThrottle + 1, expected);
+            } else {
+                assertTrue("Should not be throttled.", throttleLatch.await(10, 
TimeUnit.SECONDS));
+                assertTrue("Timed out waiting for messages " + (numMsgs + 1),
+                           nonThrottleLatch.await(10, TimeUnit.SECONDS));
+                assertEquals("Should be expected messages with " + (numMsgs + 
1),
+                             numMsgs + 1, expected);
+            }
+        }
+
+        public void checkAfterThrottle() throws Exception {
+            if (enableThrottle) {
+                assertTrue("Timed out waiting for messages " + (numMsgs + 1),
+                           nonThrottleLatch.await(10, TimeUnit.SECONDS));
+                assertEquals("Should be expected messages with " + (numMsgs + 
1),
+                             numMsgs + 1, expected);
+            }
+        }
+    }
+
+    HedwigClient client;
+    Publisher publisher;
+    Subscriber subscriber;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        numServers = 1;
+        super.setUp();
+        client = new HedwigClient(new ClientConfiguration() {
+            @Override
+            public boolean isMultiplexingEnabled() {
+                return true;
+            }
+            @Override
+            public boolean isAutoSendConsumeMessageEnabled() {
+                return false;
+            }
+        });
+        publisher = client.getPublisher();
+        subscriber = client.getSubscriber();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        client.close();
+        super.tearDown();
+    }
+
+    @Override
+    protected ServerConfiguration getServerConfiguration(int port, int 
sslPort) {
+        return new TestServerConfiguration(port, sslPort);
+    }
+
+    @Test
+    public void testStopDelivery() throws Exception {
+        ByteString topic1 = ByteString.copyFromUtf8("testStopDelivery-1");
+        ByteString topic2 = ByteString.copyFromUtf8("testStopDelivery-2");
+        ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
+        ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
+
+        final int X = 20;
+
+        TestMessageHandler csHandler11 =
+            new TestMessageHandler(1, X, true, X);
+        TestMessageHandler csHandler12 =
+            new TestMessageHandler(1, X, false, 0);
+        TestMessageHandler csHandler21 =
+            new TestMessageHandler(1, X, false, 0);
+        TestMessageHandler csHandler22 =
+            new TestMessageHandler(1, X, true, X);
+
+        subscriber.subscribe(topic1, subid1, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic1, subid2, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic2, subid1, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic2, subid2, CreateOrAttach.CREATE);
+
+        // start deliveries
+        subscriber.startDelivery(topic1, subid1, csHandler11);
+        subscriber.startDelivery(topic1, subid2, csHandler12);
+        subscriber.startDelivery(topic2, subid1, csHandler21);
+        subscriber.startDelivery(topic2, subid2, csHandler22);
+
+        // first publish
+        for (int i = 1; i<=X; i++) {
+            publisher.publish(topic1, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+            publisher.publish(topic2, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+
+        csHandler11.checkFirstRun();
+        csHandler12.checkFirstRun();
+        csHandler21.checkFirstRun();
+        csHandler22.checkFirstRun();
+
+        // stop delivery for <topic1, subscriber2> and <topic2, subscriber1>
+        subscriber.stopDelivery(topic1, subid2);
+        subscriber.stopDelivery(topic2, subid1);
+
+        // second publish
+        for (int i = X+1; i<=2*X; i++) {
+            publisher.publish(topic1, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+            publisher.publish(topic2, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+
+        csHandler11.checkSecondRun();
+        csHandler22.checkSecondRun();
+        csHandler12.checkSecondRun();
+        csHandler21.checkSecondRun();
+    }
+
+    @Test
+    public void testCloseSubscription() throws Exception {
+        ByteString topic1 = ByteString.copyFromUtf8("testCloseSubscription-1");
+        ByteString topic2 = ByteString.copyFromUtf8("testCloseSubscription-2");
+        ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
+        ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
+
+        final int X = 20;
+
+        TestMessageHandler csHandler11 =
+            new TestMessageHandler(1, X, true, X);
+        TestMessageHandler csHandler12 =
+            new TestMessageHandler(1, X, false, 0);
+        TestMessageHandler csHandler21 =
+            new TestMessageHandler(1, X, false, 0);
+        TestMessageHandler csHandler22 =
+            new TestMessageHandler(1, X, true, X);
+
+        subscriber.subscribe(topic1, subid1, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic1, subid2, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic2, subid1, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic2, subid2, CreateOrAttach.CREATE);
+
+        // start deliveries
+        subscriber.startDelivery(topic1, subid1, csHandler11);
+        subscriber.startDelivery(topic1, subid2, csHandler12);
+        subscriber.startDelivery(topic2, subid1, csHandler21);
+        subscriber.startDelivery(topic2, subid2, csHandler22);
+
+        // first publish
+        for (int i = 1; i<=X; i++) {
+            publisher.publish(topic1, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+            publisher.publish(topic2, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+
+        csHandler11.checkFirstRun();
+        csHandler12.checkFirstRun();
+        csHandler21.checkFirstRun();
+        csHandler22.checkFirstRun();
+
+        // close subscription for <topic1, subscriber2> and <topic2, 
subscriber1>
+        subscriber.closeSubscription(topic1, subid2);
+        subscriber.closeSubscription(topic2, subid1);
+
+        // second publish
+        for (int i = X+1; i<=2*X; i++) {
+            publisher.publish(topic1, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+            publisher.publish(topic2, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+
+        csHandler11.checkSecondRun();
+        csHandler22.checkSecondRun();
+        csHandler12.checkSecondRun();
+        csHandler21.checkSecondRun();
+    }
+
+    @Test
+    public void testThrottle() throws Exception {
+        ByteString topic1 = ByteString.copyFromUtf8("testThrottle-1");
+        ByteString topic2 = ByteString.copyFromUtf8("testThrottle-2");
+        ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
+        ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
+
+        final int X = DEFAULT_MSG_WINDOW_SIZE;
+
+        ThrottleMessageHandler csHandler11 =
+            new ThrottleMessageHandler(1, 3*X, false, X);
+        ThrottleMessageHandler csHandler12 =
+            new ThrottleMessageHandler(1, 3*X, true, X);
+        ThrottleMessageHandler csHandler21 =
+            new ThrottleMessageHandler(1, 3*X, true, X);
+        ThrottleMessageHandler csHandler22 =
+            new ThrottleMessageHandler(1, 3*X, false, X);
+
+        subscriber.subscribe(topic1, subid1, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic1, subid2, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic2, subid1, CreateOrAttach.CREATE);
+        subscriber.subscribe(topic2, subid2, CreateOrAttach.CREATE);
+
+        // start deliveries
+        subscriber.startDelivery(topic1, subid1, csHandler11);
+        subscriber.startDelivery(topic1, subid2, csHandler12);
+        subscriber.startDelivery(topic2, subid1, csHandler21);
+        subscriber.startDelivery(topic2, subid2, csHandler22);
+
+        // publish
+        for (int i = 1; i<=3*X; i++) {
+            publisher.publish(topic1, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+            publisher.publish(topic2, Message.newBuilder().setBody(
+                                      
ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+
+        csHandler11.checkThrottle();
+        csHandler12.checkThrottle();
+        csHandler21.checkThrottle();
+        csHandler22.checkThrottle();
+
+        // consume messages to not throttle them
+        for (int i=1; i<=X; i++) {
+            MessageSeqId seqId =
+                MessageSeqId.newBuilder().setLocalComponent(i).build();
+            subscriber.consume(topic1, subid2, seqId);
+            subscriber.consume(topic2, subid1, seqId);
+        }
+
+        csHandler11.checkAfterThrottle();
+        csHandler22.checkAfterThrottle();
+        csHandler12.checkAfterThrottle();
+        csHandler21.checkAfterThrottle();
+    }
+}

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
 Fri Nov  2 16:14:59 2012
@@ -166,6 +166,11 @@ public abstract class HedwigRegionTestBa
         return new RegionClientConfiguration(serverPort, sslServerPort);
     }
 
+    // Method to get a ClientConfiguration for the Cross Region Hedwig Client.
+    protected ClientConfiguration getRegionClientConfiguration() {
+        return new ClientConfiguration();
+    }
+
     @Override
     @Before
     public void setUp() throws Exception {
@@ -237,7 +242,8 @@ public abstract class HedwigRegionTestBa
                     getServerConfiguration(initialServerPort
                                            + (j + i * numServersPerRegion),
                                            initialSSLServerPort + (j + i * 
numServersPerRegion),
-                                           regionName));
+                                           regionName),
+                    getRegionClientConfiguration());
             serversList.add(s);
             s.start();
         }

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
 Fri Nov  2 16:14:59 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hedwig.server.delivery;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -25,6 +27,9 @@ import java.util.concurrent.atomic.Atomi
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -44,6 +49,7 @@ import org.apache.hedwig.server.HedwigHu
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.util.Callback;
 
+@RunWith(Parameterized.class)
 public class TestThrottlingDelivery extends HedwigHubTestBase {
 
     private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10;
@@ -85,6 +91,11 @@ public class TestThrottlingDelivery exte
         public boolean isAutoSendConsumeMessageEnabled() {
             return false;
         }
+
+        @Override
+        public boolean isMultiplexingEnabled() {
+            return isMultiplexingEnabled;
+        }
     }
 
     private void throttleX(Publisher pub, final Subscriber sub,
@@ -154,6 +165,17 @@ public class TestThrottlingDelivery exte
         sub.closeSubscription(topic, subid);
     }
 
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { false }, { true } });
+    }
+
+    protected boolean isMultiplexingEnabled;
+
+    public TestThrottlingDelivery(boolean isMultiplexingEnabled) {
+        this.isMultiplexingEnabled = isMultiplexingEnabled;
+    }
+
     @Override
     @Before
     public void setUp() throws Exception {

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
 Fri Nov  2 16:14:59 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hedwig.server.handlers;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import org.junit.Test;
@@ -107,7 +109,9 @@ public class TestSubUnsubHandler extends
         assertEquals(StatusCode.SUCCESS, ((PubSubResponse) 
channel.getMessagesWritten().get(0)).getStatusCode());
 
         // make sure the channel was put in the maps
-        assertEquals(new TopicSubscriber(topic, subscriberId),
+        Set<TopicSubscriber> topicSubs = new HashSet<TopicSubscriber>();
+        topicSubs.add(new TopicSubscriber(topic, subscriberId));
+        assertEquals(topicSubs,
                      subChannelMgr.channel2sub.get(channel));
         assertEquals(channel,
                      subChannelMgr.sub2Channel.get(new TopicSubscriber(topic, 
subscriberId)));

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
 Fri Nov  2 16:14:59 2012
@@ -80,13 +80,22 @@ public class TestHedwigHub extends Hedwi
 
     @Parameters
     public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] { { Mode.PROXY }, { Mode.REGULAR 
}, { Mode.SSL }});
+        return Arrays.asList(new Object[][] {
+                                { Mode.PROXY, false },
+                                { Mode.PROXY, true },
+                                { Mode.REGULAR, false },
+                                { Mode.REGULAR, true },
+                                { Mode.SSL, false },
+                                { Mode.SSL, true }
+                             });
     }
 
     protected Mode mode;
+    protected boolean isMultiplexingEnabled;
 
-    public TestHedwigHub(Mode mode) {
+    public TestHedwigHub(Mode mode, boolean isMultiplexingEnabled) {
         this.mode = mode;
+        this.isMultiplexingEnabled = isMultiplexingEnabled;
     }
 
     protected HedwigProxy proxy;
@@ -182,6 +191,7 @@ public class TestHedwigHub extends Hedwi
     }
 
     class TestClientConfiguration extends ClientConfiguration {
+
         @Override
         public InetSocketAddress getDefaultServerHost() {
             if (mode == Mode.PROXY) {
@@ -198,6 +208,11 @@ public class TestHedwigHub extends Hedwi
             else
                 return false;
         }
+
+        @Override
+        public boolean isMultiplexingEnabled() {
+            return isMultiplexingEnabled;
+        }
     }
 
     // ClientConfiguration to use for this test.

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
 Fri Nov  2 16:14:59 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hedwig.server.integration;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.SynchronousQueue;
@@ -24,10 +26,14 @@ import java.util.concurrent.SynchronousQ
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import 
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.server.HedwigRegionTestBase;
@@ -35,6 +41,7 @@ import org.apache.hedwig.server.common.S
 import org.apache.hedwig.server.integration.TestHedwigHub.TestCallback;
 import org.apache.hedwig.server.integration.TestHedwigHub.TestMessageHandler;
 
+@RunWith(Parameterized.class)
 public class TestHedwigRegion extends HedwigRegionTestBase {
 
     // SynchronousQueues to verify async calls
@@ -54,12 +61,35 @@ public class TestHedwigRegion extends He
         public int getRetryRemoteSubscribeThreadRunInterval() {
             return TEST_RETRY_REMOTE_SUBSCRIBE_INTERVAL_VALUE;
         }
+
+    }
+
+    protected class NewRegionClientConfiguration extends ClientConfiguration {
+        @Override
+        public boolean isMultiplexingEnabled() {
+            return isMultiplexingEnabled;
+        }
     }
 
     protected ServerConfiguration getServerConfiguration(int serverPort, int 
sslServerPort, String regionName) {
         return new NewRegionServerConfiguration(serverPort, sslServerPort, 
regionName);
     }
 
+    protected ClientConfiguration getRegionClientConfiguration() {
+        return new NewRegionClientConfiguration();
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { false }, { true } });
+    }
+
+    protected boolean isMultiplexingEnabled;
+
+    public TestHedwigRegion(boolean isMultiplexingEnabled) {
+        this.isMultiplexingEnabled = isMultiplexingEnabled;
+    }
+
     @Override
     @Before
     public void setUp() throws Exception {


Reply via email to