http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java 
b/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
deleted file mode 100644
index 0150a11..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
+++ /dev/null
@@ -1,1320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.net.InetAddress;
-import java.io.File;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.protobuf.ByteString;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import org.apache.bookkeeper.test.ZooKeeperUtil;
-import org.apache.bookkeeper.test.PortManager;
-
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Backward Compatability between different versions
- */
-public class TestBackwardCompat {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(TestBackwardCompat.class);
-
-    static final int CONSUMEINTERVAL = 5;
-    static ZooKeeperUtil zkUtil = new ZooKeeperUtil();
-
-    static class BookKeeperCluster400 {
-
-        int numBookies;
-        List<org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration> bkConfs;
-        List<org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer> bks;
-
-        BookKeeperCluster400(int numBookies) {
-            this.numBookies = numBookies;
-        }
-
-        public void start() throws Exception {
-            zkUtil.startServer();
-
-            bks = new 
LinkedList<org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer>();
-            bkConfs = new 
LinkedList<org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration>();
-
-            for (int i=0; i<numBookies; i++) {
-                startBookieServer();
-            }
-        }
-
-        public void stop() throws Exception {
-            for (org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer bs : bks) {
-                bs.shutdown();
-            }
-            bks.clear();
-
-            zkUtil.killServer();
-        }
-
-        protected void startBookieServer() throws Exception {
-            int port = PortManager.nextFreePort();
-            File tmpDir = 
org.apache.hw_v4_0_0.hedwig.util.FileUtils.createTempDirectory(
-                getClass().getName() + port, "test");
-            org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf = 
newServerConfiguration(
-                port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { 
tmpDir });
-            bks.add(startBookie(conf));
-            bkConfs.add(conf);
-        }
-
-        protected org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration 
newServerConfiguration(
-            int port, String zkServers, File journalDir, File[] ledgerDirs) {
-            org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf =
-                new org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration();
-            conf.setBookiePort(port);
-            conf.setZkServers(zkServers);
-            conf.setJournalDirName(journalDir.getPath());
-            String[] ledgerDirNames = new String[ledgerDirs.length];
-            for (int i=0; i<ledgerDirs.length; i++) {
-                ledgerDirNames[i] = ledgerDirs[i].getPath();
-            }
-            conf.setLedgerDirNames(ledgerDirNames);
-            return conf;
-        }
-
-        protected org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer 
startBookie(
-            org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf) 
throws Exception {
-            org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer server
-                = new org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer(conf);
-            server.start();
-
-            int port = conf.getBookiePort();
-            while (zkUtil.getZooKeeperClient().exists(
-                    "/ledgers/available/" + 
InetAddress.getLocalHost().getHostAddress() + ":" + port,
-                    false) == null) {
-                Thread.sleep(500);
-            }
-            return server;
-        }
-    }
-
-    /**
-     * Version 4.0.0 classes
-     */
-    static class Server400 {
-        org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration conf;
-        org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer server;
-
-        Server400(final String zkHosts, final int port, final int sslPort) {
-            conf = new 
org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration() {
-                @Override
-                public String getZkHost() {
-                    return zkHosts;
-                }
-
-                @Override
-                public int getServerPort() {
-                    return port;
-                }
-
-                @Override
-                public int getSSLServerPort() {
-                    return sslPort;
-                }
-            };
-        }
-
-        void start() throws Exception {
-            server = new 
org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer(conf);
-        }
-
-        void stop() throws Exception {
-            if (null != server) {
-                server.shutdown();
-            }
-        }
-    }
-
-    static class Client400 {
-        org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration conf;
-        org.apache.hw_v4_0_0.hedwig.client.api.Client client;
-        org.apache.hw_v4_0_0.hedwig.client.api.Publisher publisher;
-        org.apache.hw_v4_0_0.hedwig.client.api.Subscriber subscriber;
-
-        Client400(final String connectString) {
-            conf = new 
org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration() {
-                    @Override
-                    protected 
org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress
-                        getDefaultServerHedwigSocketAddress() {
-                        return new 
org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress(connectString);
-                    }
-                };
-            client = new org.apache.hw_v4_0_0.hedwig.client.HedwigClient(conf);
-            publisher = client.getPublisher();
-            subscriber = client.getSubscriber();
-        }
-
-        void close() throws Exception {
-            if (null != client) {
-                client.close();
-            }
-        }
-
-        org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.MessageSeqId 
publish(
-            ByteString topic, ByteString data) throws Exception {
-            org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.Message 
message =
-                
org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.Message.newBuilder()
-                    .setBody(data).build();
-            publisher.publish(topic, message);
-            return null;
-        }
-    }
-
-    static class BookKeeperCluster410 {
-
-        int numBookies;
-        List<org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration> bkConfs;
-        List<org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer> bks;
-
-        BookKeeperCluster410(int numBookies) {
-            this.numBookies = numBookies;
-        }
-
-        public void start() throws Exception {
-            zkUtil.startServer();
-
-            bks = new 
LinkedList<org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer>();
-            bkConfs = new 
LinkedList<org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration>();
-
-            for (int i=0; i<numBookies; i++) {
-                startBookieServer();
-            }
-        }
-
-        public void stop() throws Exception {
-            for (org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer bs : bks) {
-                bs.shutdown();
-            }
-            bks.clear();
-
-            zkUtil.killServer();
-        }
-
-        protected void startBookieServer() throws Exception {
-            int port = PortManager.nextFreePort();
-            File tmpDir = 
org.apache.hw_v4_1_0.hedwig.util.FileUtils.createTempDirectory(
-                getClass().getName() + port, "test");
-            org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf = 
newServerConfiguration(
-                    port, zkUtil.getZooKeeperConnectString(), tmpDir, new 
File[] { tmpDir });
-            bks.add(startBookie(conf));
-            bkConfs.add(conf);
-        }
-
-        protected org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration 
newServerConfiguration(
-            int port, String zkServers, File journalDir, File[] ledgerDirs) {
-            org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf =
-                new org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration();
-            conf.setBookiePort(port);
-            conf.setZkServers(zkServers);
-            conf.setJournalDirName(journalDir.getPath());
-            String[] ledgerDirNames = new String[ledgerDirs.length];
-            for (int i=0; i<ledgerDirs.length; i++) {
-                ledgerDirNames[i] = ledgerDirs[i].getPath();
-            }
-            conf.setLedgerDirNames(ledgerDirNames);
-            return conf;
-        }
-
-        protected org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer 
startBookie(
-            org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf) 
throws Exception {
-            org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer server
-                = new org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer(conf);
-            server.start();
-
-            int port = conf.getBookiePort();
-            while (zkUtil.getZooKeeperClient().exists(
-                    "/ledgers/available/" + 
InetAddress.getLocalHost().getHostAddress() + ":" + port,
-                    false) == null) {
-                Thread.sleep(500);
-            }
-            return server;
-        }
-    }
-
-    /**
-     * Version 4.1.0 classes
-     */
-    static class Server410 {
-        org.apache.hw_v4_1_0.hedwig.server.common.ServerConfiguration conf;
-        org.apache.hw_v4_1_0.hedwig.server.netty.PubSubServer server;
-
-        Server410(final String zkHosts, final int port, final int sslPort) {
-            conf = new 
org.apache.hw_v4_1_0.hedwig.server.common.ServerConfiguration() {
-                @Override
-                public int getConsumeInterval() {
-                    return CONSUMEINTERVAL;
-                }
-                @Override
-                public String getZkHost() {
-                    return zkHosts;
-                }
-
-                @Override
-                public int getServerPort() {
-                    return port;
-                }
-
-                @Override
-                public int getSSLServerPort() {
-                    return sslPort;
-                }
-            };
-        }
-
-        void start() throws Exception {
-            server = new 
org.apache.hw_v4_1_0.hedwig.server.netty.PubSubServer(conf);
-            server.start();
-        }
-
-        void stop() throws Exception {
-            if (null != server) {
-                server.shutdown();
-            }
-        }
-    }
-
-    static class Client410 {
-        org.apache.hw_v4_1_0.hedwig.client.conf.ClientConfiguration conf;
-        org.apache.hw_v4_1_0.hedwig.client.api.Client client;
-        org.apache.hw_v4_1_0.hedwig.client.api.Publisher publisher;
-        org.apache.hw_v4_1_0.hedwig.client.api.Subscriber subscriber;
-
-        class IntMessageHandler implements 
org.apache.hw_v4_1_0.hedwig.client.api.MessageHandler {
-            ByteString topic;
-            ByteString subId;
-            int next;
-
-            CountDownLatch latch;
-
-            IntMessageHandler(ByteString t, ByteString s, int start, int num) {
-                this.topic = t;
-                this.subId = s;
-                this.next = start;
-                this.latch = new CountDownLatch(num);
-            }
-
-            @Override
-            public void deliver(ByteString t, ByteString s,
-                                
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg,
-                                
org.apache.hw_v4_1_0.hedwig.util.Callback<Void> callback, Object context) {
-                if (!t.equals(topic) || !s.equals(subId)) {
-                    return;
-                }
-                int num = Integer.parseInt(msg.getBody().toStringUtf8());
-                if (num == next) {
-                    latch.countDown();
-                    ++next;
-                }
-                callback.operationFinished(context, null);
-            }
-
-            public boolean await(long timeout, TimeUnit unit)
-            throws InterruptedException {
-                return latch.await(timeout, unit);
-            }
-        }
-
-        Client410(final String connectString) {
-            conf = new 
org.apache.hw_v4_1_0.hedwig.client.conf.ClientConfiguration() {
-                @Override
-                public boolean isAutoSendConsumeMessageEnabled() {
-                    return true;
-                }
-                @Override
-                public int getConsumedMessagesBufferSize() {
-                    return 1;
-                }
-                @Override
-                protected org.apache.hw_v4_1_0.hedwig.util.HedwigSocketAddress
-                    getDefaultServerHedwigSocketAddress() {
-                    return new 
org.apache.hw_v4_1_0.hedwig.util.HedwigSocketAddress(connectString);
-                }
-            };
-            client = new org.apache.hw_v4_1_0.hedwig.client.HedwigClient(conf);
-            publisher = client.getPublisher();
-            subscriber = client.getSubscriber();
-        }
-
-        void close() throws Exception {
-            if (null != client) {
-                client.close();
-            }
-        }
-
-        org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.MessageSeqId 
publish(
-            ByteString topic, ByteString data) throws Exception {
-            org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message 
message =
-                
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder()
-                    .setBody(data).build();
-            publisher.publish(topic, message);
-            return null;
-        }
-
-        void publishInts(ByteString topic, int start, int num) throws 
Exception {
-            for (int i=0; i<num; i++) {
-                org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message 
msg =
-                    
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(ByteString.copyFromUtf8(""
 + (start+i))).build();
-                publisher.publish(topic, msg);
-            }
-        }
-
-        void sendXExpectLastY(ByteString topic, ByteString subid, final int x, 
final int y)
-        throws Exception {
-            for (int i=0; i<x; i++) {
-                publisher.publish(topic, 
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(
-                                         
ByteString.copyFromUtf8(String.valueOf(i))).build());
-            }
-            subscriber.subscribe(topic, subid, 
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH);
-
-            final AtomicInteger expected = new AtomicInteger(x - y);
-            final CountDownLatch latch = new CountDownLatch(1);
-            subscriber.startDelivery(topic, subid, new 
org.apache.hw_v4_1_0.hedwig.client.api.MessageHandler() {
-                @Override
-                synchronized public void deliver(ByteString topic, ByteString 
subscriberId,
-                                                 
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg,
-                                                 
org.apache.hw_v4_1_0.hedwig.util.Callback<Void> callback, Object context) {
-                    try {
-                        int value = 
Integer.valueOf(msg.getBody().toStringUtf8());
-                        if (value == expected.get()) {
-                            expected.incrementAndGet();
-                        } else {
-                            logger.error("Did not receive expected value, 
expected {}, got {}",
-                                         expected.get(), value);
-                            expected.set(0);
-                            latch.countDown();
-                        }
-                        if (expected.get() == x) {
-                            latch.countDown();
-                        }
-                        callback.operationFinished(context, null);
-                    } catch (Exception e) {
-                        logger.error("Received bad message", e);
-                        latch.countDown();
-                    }
-                }
-            });
-            assertTrue("Timed out waiting for messages Y is " + y + " expected 
is currently "
-                       + expected.get(), latch.await(10, TimeUnit.SECONDS));
-            assertEquals("Should be expected message with " + x, x, 
expected.get());
-            subscriber.stopDelivery(topic, subid);
-            subscriber.closeSubscription(topic, subid);
-            Thread.sleep(1000); // give server time to run disconnect logic 
(BOOKKEEPER-513)
-        }
-
-        void subscribe(ByteString topic, ByteString subscriberId) throws 
Exception {
-            
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options 
=
-                
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-                
.setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH).build();
-            subscribe(topic, subscriberId, options);
-        }
-
-        void subscribe(ByteString topic, ByteString subscriberId,
-                       
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions 
options) throws Exception {
-            subscriber.subscribe(topic, subscriberId, options);
-        }
-
-        void closeSubscription(ByteString topic, ByteString subscriberId) 
throws Exception {
-            subscriber.closeSubscription(topic, subscriberId);
-            Thread.sleep(1000); // give server time to run disconnect logic 
(BOOKKEEPER-513)
-        }
-
-        void receiveInts(ByteString topic, ByteString subscriberId, int start, 
int num) throws Exception {
-            IntMessageHandler msgHandler = new IntMessageHandler(topic, 
subscriberId, start, num);
-            subscriber.startDelivery(topic, subscriberId, msgHandler);
-            msgHandler.await(num, TimeUnit.SECONDS);
-            subscriber.stopDelivery(topic, subscriberId);
-        }
-    }
-
-    /**
-     * 4.2.0 Version
-     */
-    static class BookKeeperCluster420{
-
-        int numBookies;
-        List<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration> bkConfs;
-        List<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer> bks;
-
-
-        BookKeeperCluster420(int numBookies) {
-            this.numBookies = numBookies;
-        }
-
-        public void start() throws Exception {
-            zkUtil.startServer();
-
-            bks = new 
LinkedList<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer>();
-            bkConfs = new 
LinkedList<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration>();
-
-            for (int i=0; i<numBookies; i++) {
-                startBookieServer();
-            }
-        }
-
-        public void stop() throws Exception {
-            for (org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer bs : bks) {
-                bs.shutdown();
-            }
-            bks.clear();
-
-            zkUtil.killServer();
-        }
-
-        protected void startBookieServer() throws Exception {
-            int port = PortManager.nextFreePort();
-            File tmpDir = 
org.apache.hw_v4_2_0.hedwig.util.FileUtils.createTempDirectory(
-                getClass().getName() + port, "test");
-            org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf = 
newServerConfiguration(
-                port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { 
tmpDir });
-            bks.add(startBookie(conf));
-            bkConfs.add(conf);
-        }
-
-        protected org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration 
newServerConfiguration(
-            int port, String zkServers, File journalDir, File[] ledgerDirs) {
-            org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf =
-                new org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration();
-            conf.setBookiePort(port);
-            conf.setZkServers(zkServers);
-            conf.setJournalDirName(journalDir.getPath());
-            String[] ledgerDirNames = new String[ledgerDirs.length];
-            for (int i=0; i<ledgerDirs.length; i++) {
-                ledgerDirNames[i] = ledgerDirs[i].getPath();
-            }
-            conf.setLedgerDirNames(ledgerDirNames);
-            return conf;
-        }
-
-        protected org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer 
startBookie(
-            org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf) 
throws Exception {
-            org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer server
-                = new org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer(conf);
-            server.start();
-
-            int port = conf.getBookiePort();
-            while (zkUtil.getZooKeeperClient().exists(
-                    "/ledgers/available/" + 
InetAddress.getLocalHost().getHostAddress() + ":" + port,
-                    false) == null) {
-                Thread.sleep(500);
-            }
-            return server;
-        }
-    }
-
-    static class Server420 {
-        org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration conf;
-        org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer server;
-
-        Server420(final String zkHosts, final int port, final int sslPort) {
-            conf = new 
org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration() {
-                @Override
-                public int getConsumeInterval() {
-                    return CONSUMEINTERVAL;
-                }
-
-                @Override
-                public String getZkHost() {
-                    return zkHosts;
-                }
-
-                @Override
-                public int getServerPort() {
-                    return port;
-                }
-
-                @Override
-                public int getSSLServerPort() {
-                    return sslPort;
-                }
-            };
-        }
-
-        void start() throws Exception {
-            server = new 
org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer(conf);
-            server.start();
-        }
-
-        void stop() throws Exception {
-            if (null != server) {
-                server.shutdown();
-            }
-        }
-    }
-
-    /**
-     * Current Version
-     */
-    static class BookKeeperClusterCurrent {
-
-        int numBookies;
-        List<org.apache.bookkeeper.conf.ServerConfiguration> bkConfs;
-        List<org.apache.bookkeeper.proto.BookieServer> bks;
-
-
-        BookKeeperClusterCurrent(int numBookies) {
-            this.numBookies = numBookies;
-        }
-
-        public void start() throws Exception {
-            zkUtil.startServer();
-
-            bks = new LinkedList<org.apache.bookkeeper.proto.BookieServer>();
-            bkConfs = new 
LinkedList<org.apache.bookkeeper.conf.ServerConfiguration>();
-
-            for (int i=0; i<numBookies; i++) {
-                startBookieServer();
-            }
-        }
-
-        public void stop() throws Exception {
-            for (org.apache.bookkeeper.proto.BookieServer bs : bks) {
-                bs.shutdown();
-            }
-            bks.clear();
-
-            zkUtil.killServer();
-        }
-
-        protected void startBookieServer() throws Exception {
-            int port = PortManager.nextFreePort();
-            File tmpDir = org.apache.hedwig.util.FileUtils.createTempDirectory(
-                getClass().getName() + port, "test");
-            org.apache.bookkeeper.conf.ServerConfiguration conf = 
newServerConfiguration(
-                port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { 
tmpDir });
-            conf.setAllowLoopback(true);
-            bks.add(startBookie(conf));
-            bkConfs.add(conf);
-        }
-
-        protected org.apache.bookkeeper.conf.ServerConfiguration 
newServerConfiguration(
-            int port, String zkServers, File journalDir, File[] ledgerDirs) {
-            org.apache.bookkeeper.conf.ServerConfiguration conf =
-                new org.apache.bookkeeper.conf.ServerConfiguration();
-            conf.setAllowLoopback(true);
-            conf.setBookiePort(port);
-            conf.setZkServers(zkServers);
-            conf.setJournalDirName(journalDir.getPath());
-            String[] ledgerDirNames = new String[ledgerDirs.length];
-            for (int i=0; i<ledgerDirs.length; i++) {
-                ledgerDirNames[i] = ledgerDirs[i].getPath();
-            }
-            conf.setLedgerDirNames(ledgerDirNames);
-            return conf;
-        }
-
-        protected org.apache.bookkeeper.proto.BookieServer startBookie(
-            org.apache.bookkeeper.conf.ServerConfiguration conf) throws 
Exception {
-            org.apache.bookkeeper.proto.BookieServer server
-                = new org.apache.bookkeeper.proto.BookieServer(conf);
-            server.start();
-
-            int port = conf.getBookiePort();
-            while (zkUtil.getZooKeeperClient().exists(
-                    "/ledgers/available/" + 
InetAddress.getLocalHost().getHostAddress() + ":" + port,
-                    false) == null) {
-                Thread.sleep(500);
-            }
-            return server;
-        }
-    }
-
-    static class ServerCurrent {
-        org.apache.hedwig.server.common.ServerConfiguration conf;
-        org.apache.hedwig.server.netty.PubSubServer server;
-
-        ServerCurrent(final String zkHosts, final int port, final int sslPort) 
{
-            conf = new org.apache.hedwig.server.common.ServerConfiguration() {
-                @Override
-                public int getConsumeInterval() {
-                    return CONSUMEINTERVAL;
-                }
-
-                @Override
-                public String getZkHost() {
-                    return zkHosts;
-                }
-
-                @Override
-                public int getServerPort() {
-                    return port;
-                }
-
-                @Override
-                public int getSSLServerPort() {
-                    return sslPort;
-                }
-            };
-        }
-
-        void start() throws Exception {
-            server = new org.apache.hedwig.server.netty.PubSubServer(conf);
-            server.start();
-        }
-
-        void stop() throws Exception {
-            if (null != server) {
-                server.shutdown();
-            }
-        }
-    }
-
-    static class ClientCurrent {
-        org.apache.hedwig.client.conf.ClientConfiguration conf;
-        org.apache.hedwig.client.api.Client client;
-        org.apache.hedwig.client.api.Publisher publisher;
-        org.apache.hedwig.client.api.Subscriber subscriber;
-
-        class IntMessageHandler implements 
org.apache.hedwig.client.api.MessageHandler {
-            ByteString topic;
-            ByteString subId;
-            int next;
-
-            CountDownLatch latch;
-
-            IntMessageHandler(ByteString t, ByteString s, int start, int num) {
-                this.topic = t;
-                this.subId = s;
-                this.next = start;
-                this.latch = new CountDownLatch(num);
-            }
-
-            @Override
-            public void deliver(ByteString t, ByteString s,
-                                
org.apache.hedwig.protocol.PubSubProtocol.Message msg,
-                                org.apache.hedwig.util.Callback<Void> 
callback, Object context) {
-                if (!t.equals(topic) || !s.equals(subId)) {
-                    return;
-                }
-                int num = Integer.parseInt(msg.getBody().toStringUtf8());
-                if (num == next) {
-                    latch.countDown();
-                    ++next;
-                }
-                callback.operationFinished(context, null);
-            }
-
-            public boolean await(long timeout, TimeUnit unit)
-            throws InterruptedException {
-                return latch.await(timeout, unit);
-            }
-        }
-
-        ClientCurrent(final String connectString) {
-            this(true, connectString);
-        }
-
-        ClientCurrent(final boolean autoConsumeEnabled, final String 
connectString) {
-            conf = new org.apache.hedwig.client.conf.ClientConfiguration() {
-                @Override
-                public boolean isAutoSendConsumeMessageEnabled() {
-                    return autoConsumeEnabled;
-                }
-                @Override
-                public int getConsumedMessagesBufferSize() {
-                    return 1;
-                }
-                @Override
-                protected HedwigSocketAddress 
getDefaultServerHedwigSocketAddress() {
-                    return new HedwigSocketAddress(connectString);
-                }
-            };
-            client = new org.apache.hedwig.client.HedwigClient(conf);
-            publisher = client.getPublisher();
-            subscriber = client.getSubscriber();
-        }
-
-        void close() throws Exception {
-            if (null != client) {
-                client.close();
-            }
-        }
-
-        org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId publish(
-            ByteString topic, ByteString data) throws Exception {
-            org.apache.hedwig.protocol.PubSubProtocol.Message message =
-                org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder()
-                    .setBody(data).build();
-            org.apache.hedwig.protocol.PubSubProtocol.PublishResponse resp =
-                publisher.publish(topic, message);
-            if (null == resp) {
-                return null;
-            }
-            return resp.getPublishedMsgId();
-        }
-
-        void publishInts(ByteString topic, int start, int num) throws 
Exception {
-            for (int i=0; i<num; i++) {
-                org.apache.hedwig.protocol.PubSubProtocol.Message msg =
-                    
org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(ByteString.copyFromUtf8(""
 + (start+i))).build();
-                publisher.publish(topic, msg);
-            }
-        }
-
-        void sendXExpectLastY(ByteString topic, ByteString subid, final int x, 
final int y)
-        throws Exception {
-            for (int i=0; i<x; i++) {
-                publisher.publish(topic, 
org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(
-                                         
ByteString.copyFromUtf8(String.valueOf(i))).build());
-            }
-            org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts
-                = 
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-                
.setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)
-                .build();
-            subscriber.subscribe(topic, subid, opts);
-
-            final AtomicInteger expected = new AtomicInteger(x - y);
-            final CountDownLatch latch = new CountDownLatch(1);
-            subscriber.startDelivery(topic, subid, new 
org.apache.hedwig.client.api.MessageHandler() {
-                @Override
-                synchronized public void deliver(ByteString topic, ByteString 
subscriberId,
-                                                 
org.apache.hedwig.protocol.PubSubProtocol.Message msg,
-                                                 
org.apache.hedwig.util.Callback<Void> callback, Object context) {
-                    try {
-                        int value = 
Integer.valueOf(msg.getBody().toStringUtf8());
-                        if (value == expected.get()) {
-                            expected.incrementAndGet();
-                        } else {
-                            logger.error("Did not receive expected value, 
expected {}, got {}",
-                                         expected.get(), value);
-                            expected.set(0);
-                            latch.countDown();
-                        }
-                        if (expected.get() == x) {
-                            latch.countDown();
-                        }
-                        callback.operationFinished(context, null);
-                    } catch (Exception e) {
-                        logger.error("Received bad message", e);
-                        latch.countDown();
-                    }
-                }
-            });
-            assertTrue("Timed out waiting for messages Y is " + y + " expected 
is currently "
-                       + expected.get(), latch.await(10, TimeUnit.SECONDS));
-            assertEquals("Should be expected message with " + x, x, 
expected.get());
-            subscriber.stopDelivery(topic, subid);
-            subscriber.closeSubscription(topic, subid);
-        }
-
-        void receiveNumModM(final ByteString topic, final ByteString subid,
-                            final int start, final int num, final int M) 
throws Exception {
-            org.apache.hedwig.filter.ServerMessageFilter filter =
-                new org.apache.hedwig.filter.ServerMessageFilter() {
-
-                @Override
-                public org.apache.hedwig.filter.ServerMessageFilter
-                    initialize(Configuration conf) {
-                    // do nothing
-                    return this;
-                }
-
-                @Override
-                public void uninitialize() {
-                    // do nothing;
-                }
-
-                @Override
-                public org.apache.hedwig.filter.MessageFilterBase
-                    setSubscriptionPreferences(ByteString topic, ByteString 
subscriberId,
-                    
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences) {
-                    // do nothing;
-                    return this;
-                }
-
-                @Override
-                public boolean 
testMessage(org.apache.hedwig.protocol.PubSubProtocol.Message msg) {
-                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
-                    return 0 == value % M;
-                }
-            };
-            filter.initialize(conf.getConf());
-
-
-            org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts
-                = 
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-                
.setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)
-                .build();
-            subscriber.subscribe(topic, subid, opts);
-            final int base = start + M - start % M;
-            final AtomicInteger expected = new AtomicInteger(base);
-            final CountDownLatch latch = new CountDownLatch(1);
-            subscriber.startDeliveryWithFilter(topic, subid, new 
org.apache.hedwig.client.api.MessageHandler() {
-                synchronized public void deliver(ByteString topic, ByteString 
subscriberId,
-                                                 
org.apache.hedwig.protocol.PubSubProtocol.Message msg,
-                                                 
org.apache.hedwig.util.Callback<Void> callback, Object context) {
-                    try {
-                        int value = 
Integer.valueOf(msg.getBody().toStringUtf8());
-                        // duplicated messages received, ignore them
-                        if (value > start) {
-                            if (value == expected.get()) {
-                                expected.addAndGet(M);
-                            } else {
-                                logger.error("Did not receive expected value, 
expected {}, got {}",
-                                             expected.get(), value);
-                                expected.set(0);
-                                latch.countDown();
-                            }
-                            if (expected.get() == (base + num * M)) {
-                                latch.countDown();
-                            }
-                        }
-                        callback.operationFinished(context, null);
-                    } catch (Exception e) {
-                        logger.error("Received bad message", e);
-                        latch.countDown();
-                    }
-                }
-            }, (org.apache.hedwig.filter.ClientMessageFilter) filter);
-            assertTrue("Timed out waiting for messages mod " + M + " expected 
is " + expected.get(),
-                       latch.await(10, TimeUnit.SECONDS));
-            assertEquals("Should be expected message with " + (base + num * 
M), (base + num*M), expected.get());
-            subscriber.stopDelivery(topic, subid);
-            filter.uninitialize();
-            subscriber.closeSubscription(topic, subid);
-        }
-
-        void subscribe(ByteString topic, ByteString subscriberId) throws 
Exception {
-            org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions 
options =
-                
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-                
.setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH).build();
-            subscribe(topic, subscriberId, options);
-        }
-
-        void subscribe(ByteString topic, ByteString subscriberId,
-                       
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options) throws 
Exception {
-            subscriber.subscribe(topic, subscriberId, options);
-        }
-
-        void closeSubscription(ByteString topic, ByteString subscriberId) 
throws Exception {
-            subscriber.closeSubscription(topic, subscriberId);
-        }
-
-        void receiveInts(ByteString topic, ByteString subscriberId, int start, 
int num) throws Exception {
-            IntMessageHandler msgHandler = new IntMessageHandler(topic, 
subscriberId, start, num);
-            subscriber.startDelivery(topic, subscriberId, msgHandler);
-            msgHandler.await(num, TimeUnit.SECONDS);
-            subscriber.stopDelivery(topic, subscriberId);
-        }
-
-        // throttle doesn't work talking with 41 server
-        void throttleX41(ByteString topic, ByteString subid, final int X)
-        throws Exception {
-            org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions 
options =
-                
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-                
.setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
-                .setMessageWindowSize(X) .build();
-            subscribe(topic, subid, options);
-            closeSubscription(topic, subid);
-            publishInts(topic, 1, 3*X);
-            subscribe(topic, subid);
-
-            final AtomicInteger expected = new AtomicInteger(1);
-            final CountDownLatch throttleLatch = new CountDownLatch(1);
-            final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
-            subscriber.startDelivery(topic, subid, new 
org.apache.hedwig.client.api.MessageHandler() {
-                @Override
-                public synchronized void deliver(ByteString topic, ByteString 
subscriberId,
-                                                 
org.apache.hedwig.protocol.PubSubProtocol.Message msg,
-                                                 
org.apache.hedwig.util.Callback<Void> callback, Object context) {
-                    try {
-                        int value = 
Integer.valueOf(msg.getBody().toStringUtf8());
-                        logger.debug("Received message {},", value);
-
-                        if (value == expected.get()) {
-                            expected.incrementAndGet();
-                        } else {
-                            // error condition
-                            logger.error("Did not receive expected value, 
expected {}, got {}",
-                                         expected.get(), value);
-                            expected.set(0);
-                            throttleLatch.countDown();
-                            nonThrottleLatch.countDown();
-                        }
-                        if (expected.get() > X+1) {
-                            throttleLatch.countDown();
-                        }
-                        if (expected.get() == (3 * X + 1)) {
-                            nonThrottleLatch.countDown();
-                        }
-                        callback.operationFinished(context, null);
-                    } catch (Exception e) {
-                        logger.error("Received bad message", e);
-                        throttleLatch.countDown();
-                        nonThrottleLatch.countDown();
-                    }
-                }
-            });
-            assertTrue("Should Receive more messages than throttle value " + X,
-                        throttleLatch.await(10, TimeUnit.SECONDS));
-
-            assertTrue("Timed out waiting for messages " + (3*X + 1),
-                       nonThrottleLatch.await(10, TimeUnit.SECONDS));
-            assertEquals("Should be expected message with " + (3*X + 1),
-                         3*X + 1, expected.get());
-
-            subscriber.stopDelivery(topic, subid);
-            closeSubscription(topic, subid);
-        }
-    }
-
-    /**
-     * Test compatability of message bound between version 4.0.0 and
-     * current version.
-     *
-     * 1) message bound doesn't take effects on 4.0.0 server.
-     * 2) message bound take effects on both 4.1.0 and current server
-     */
-    @Test(timeout=60000)
-    public void testMessageBoundCompat() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("testMessageBoundCompat");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        int port = PortManager.nextFreePort();
-        int sslPort = PortManager.nextFreePort();
-
-        // start bookkeeper 400
-        BookKeeperCluster400 bkc400 = new BookKeeperCluster400(3);
-        bkc400.start();
-
-        // start 400 server
-        Server400 s400 = new Server400(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s400.start();
-
-        org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions 
options5cur =
-            
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-            
.setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
-            .setMessageBound(5).build();
-
-        ClientCurrent ccur = new ClientCurrent("localhost:" + port + ":" + 
sslPort);
-        ccur.subscribe(topic, subid, options5cur);
-        ccur.closeSubscription(topic, subid);
-        ccur.sendXExpectLastY(topic, subid, 50, 50);
-
-        // stop 400 servers
-        s400.stop();
-        bkc400.stop();
-
-        // start bookkeeper 410
-        BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
-        bkc410.start();
-
-        // start 410 server
-        Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s410.start();
-
-        ccur.subscribe(topic, subid, options5cur);
-        ccur.closeSubscription(topic, subid);
-        ccur.sendXExpectLastY(topic, subid, 50, 5);
-
-        // stop 410 servers
-        s410.stop();
-        bkc410.stop();
-
-        // start bookkeeper current
-        BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
-        bkc420.start();
-
-        // start 420 server
-        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s420.start();
-
-        ccur.subscribe(topic, subid, options5cur);
-        ccur.closeSubscription(topic, subid);
-        ccur.sendXExpectLastY(topic, subid, 50, 5);
-
-        // stop 420 server
-        s420.stop();
-        bkc420.stop();
-
-        ccur.close();
-    }
-
-    /**
-     * Test compatability of publish interface between version 4.1.0
-     * and current verison.
-     *
-     * 1) 4.1.0 client could talk with current server.
-     * 2) current client could talk with 4.1.0 server,
-     *    but no message seq id would be returned
-     */
-    @Test(timeout=60000)
-    public void testPublishCompat410() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestPublishCompat410");
-        ByteString data = ByteString.copyFromUtf8("testdata");
-
-        // start bookkeeper 410
-        BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
-        bkc410.start();
-
-        int port = PortManager.nextFreePort();
-        int sslPort = PortManager.nextFreePort();
-
-        // start 410 server
-        Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s410.start();
-
-        ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
-        Client410 c410 = new Client410("localhost:"+port+":"+sslPort);
-
-        // client c410 could publish message to 410 server
-        assertNull(c410.publish(topic, data));
-        // client ccur could publish message to 410 server
-        // but no message seq id would be returned
-        assertNull(ccur.publish(topic, data));
-
-        // stop 410 server
-        s410.stop();
-
-        // start 420 server
-        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s420.start();
-
-        // client c410 could publish message to 410 server
-        // but no message seq id would be returned
-        assertNull(c410.publish(topic, data));
-        // client ccur could publish message to current server
-        assertNotNull(ccur.publish(topic, data));
-
-        ccur.close();
-        c410.close();
-
-        // stop 420 server
-        s420.stop();
-        bkc410.stop();
-    }
-
-    /**
-     * Test compatability between version 4.1.0 and the current version.
-     *
-     * A current server could read subscription data recorded by 4.1.0 server.
-     */
-    @Test(timeout=60000)
-    public void testSubscriptionDataCompat410() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestCompat410");
-        ByteString sub410 = ByteString.copyFromUtf8("sub410");
-        ByteString subcur = ByteString.copyFromUtf8("subcur");
-
-        // start bookkeeper 410
-        BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
-        bkc410.start();
-
-        int port = PortManager.nextFreePort();
-        int sslPort = PortManager.nextFreePort();
-
-        // start 410 server
-        Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s410.start();
-
-        Client410 c410 = new Client410("localhost:"+port+":"+sslPort);
-        c410.subscribe(topic, sub410);
-        c410.closeSubscription(topic, sub410);
-        Thread.sleep(1000); // give server time to run disconnect logic 
(BOOKKEEPER-513)
-
-        ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
-        ccur.subscribe(topic, subcur);
-        ccur.closeSubscription(topic, subcur);
-
-        // publish messages using old client
-        c410.publishInts(topic, 0, 10);
-        // stop 410 server
-        s410.stop();
-
-        // start 420 server
-        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
-                                               port, sslPort);
-        s420.start();
-
-        c410.subscribe(topic, sub410);
-        c410.receiveInts(topic, sub410, 0, 10);
-
-        ccur.subscribe(topic, subcur);
-        ccur.receiveInts(topic, subcur, 0, 10);
-
-        // publish messages using current client
-        ccur.publishInts(topic, 10, 10);
-
-        c410.receiveInts(topic, sub410, 10, 10);
-        ccur.receiveInts(topic, subcur, 10, 10);
-
-        // stop 420 server
-        s420.stop();
-
-        c410.close();
-        ccur.close();
-
-        // stop bookkeeper cluster
-        bkc410.stop();
-    }
-
-    /**
-     * Test compatability between version 4.1.0 and the current version.
-     *
-     * A 4.1.0 client could not update message bound, while current could do 
it.
-     */
-    @Test(timeout=60000)
-    public void testUpdateMessageBoundCompat410() throws Exception {
-        ByteString topic = 
ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        // start bookkeeper
-        BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
-        bkc420.start();
-
-        int port = PortManager.nextFreePort();
-        int sslPort = PortManager.nextFreePort();
-
-        // start hub server
-        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
-                                               port, sslPort);
-        s420.start();
-
-        org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions 
options5cur =
-            
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-            
.setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
-            .setMessageBound(5).build();
-        
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions 
options5v410 =
-            
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-            
.setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
-            .setMessageBound(5).build();
-        
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions 
options20v410 =
-            
org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
-            
.setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
-            .setMessageBound(20).build();
-
-        Client410 c410 = new Client410("localhost:"+port+":"+sslPort);
-        c410.subscribe(topic, subid, options20v410);
-        c410.closeSubscription(topic, subid);
-        Thread.sleep(1000); // give server time to run disconnect logic 
(BOOKKEEPER-513)
-
-        c410.sendXExpectLastY(topic, subid, 50, 20);
-
-        c410.subscribe(topic, subid, options5v410);
-        c410.closeSubscription(topic, subid);
-        Thread.sleep(1000); // give server time to run disconnect logic 
(BOOKKEEPER-513)
-
-        // the message bound isn't updated.
-        c410.sendXExpectLastY(topic, subid, 50, 20);
-
-        ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
-        ccur.subscribe(topic, subid, options5cur);
-        ccur.closeSubscription(topic, subid);
-        Thread.sleep(1000); // give server time to run disconnect logic 
(BOOKKEEPER-513)
-
-        // the message bound should be updated.
-        c410.sendXExpectLastY(topic, subid, 50, 5);
-
-        // stop 420 server
-        s420.stop();
-
-        c410.close();
-        ccur.close();
-
-        // stop bookkeeper cluster
-        bkc420.stop();
-    }
-
-    /**
-     * Test compatability between version 4.1.0 and the current version.
-     *
-     * A current client running message filter would fail on 4.1.0 hub servers.
-     */
-    @Test(timeout=60000)
-    public void testClientMessageFilterCompat410() throws Exception {
-        ByteString topic = 
ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        // start bookkeeper
-        BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
-        bkc410.start();
-
-        int port = PortManager.nextFreePort();
-        int sslPort = PortManager.nextFreePort();
-
-        // start hub server 410
-        Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s410.start();
-
-        ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
-        ccur.subscribe(topic, subid);
-        ccur.closeSubscription(topic, subid);
-
-        ccur.publishInts(topic, 0, 100);
-        try {
-            ccur.receiveNumModM(topic, subid, 0, 50, 2);
-            fail("client-side filter could not run on 4.1.0 hub server");
-        } catch (Exception e) {
-            logger.info("Should fail to run client-side message filter on 
4.1.0 hub server.", e);
-            ccur.closeSubscription(topic, subid);
-        }
-
-        // stop 410 server
-        s410.stop();
-        // stop bookkeeper cluster
-        bkc410.stop();
-    }
-
-    /**
-     * Test compatability between version 4.1.0 and the current version.
-     *
-     * Server side throttling does't work when current client connects to old 
version
-     * server.
-     */
-    @Test(timeout=60000)
-    public void testServerSideThrottleCompat410() throws Exception {
-        ByteString topic = 
ByteString.copyFromUtf8("TestServerSideThrottleCompat410");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        // start bookkeeper
-        BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
-        bkc410.start();
-
-        int port = PortManager.nextFreePort();
-        int sslPort = PortManager.nextFreePort();
-
-        // start hub server 410
-        Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), 
port, sslPort);
-        s410.start();
-
-        ClientCurrent ccur = new ClientCurrent(false, 
"localhost:"+port+":"+sslPort);
-        ccur.throttleX41(topic, subid, 10);
-
-        ccur.close();
-
-        // stop 410 server
-        s410.stop();
-        // stop bookkeeper cluster
-        bkc410.stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
deleted file mode 100644
index 632ea43..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-
-import junit.framework.Assert;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.LoggingExceptionHandler;
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.Test;
-import org.apache.hedwig.util.FileUtils;
-
-public class TestPubSubServerStartup {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(TestPubSubServerStartup.class);
-
-    /**
-     * Start-up zookeeper + pubsubserver reading from a config URL. Then stop
-     * and cleanup.
-     *
-     * Loop over that.
-     *
-     * If the pubsub server does not wait for its zookeeper client to be
-     * connected, the pubsub server will fail at startup.
-     *
-     */
-    @Test(timeout=60000)
-    public void testPubSubServerInstantiationWithConfig() throws Exception {
-        for (int i = 0; i < 10; i++) {
-            logger.info("iteration " + i);
-            instantiateAndDestroyPubSubServer();
-        }
-    }
-
-    private void instantiateAndDestroyPubSubServer() throws IOException, 
InterruptedException, ConfigurationException,
-        MalformedURLException, Exception {
-        int zkPort = PortManager.nextFreePort();
-        int hwPort = PortManager.nextFreePort();
-        int hwSSLPort = PortManager.nextFreePort();
-        String hedwigParams = "default_server_host=localhost:" + hwPort + "\n"
-            + "zk_host=localhost:" + zkPort + "\n"
-            + "server_port=" + hwPort + "\n"
-            + "ssl_server_port=" + hwSSLPort + "\n"
-            + "zk_timeout=2000\n";
-
-        File hedwigConfigFile = new File(System.getProperty("java.io.tmpdir") 
+ "/hedwig.cfg");
-        writeStringToFile(hedwigParams, hedwigConfigFile);
-
-        ClientBase.setupTestEnv();
-        File zkTmpDir = FileUtils.createTempDirectory("zookeeper", "test");
-
-        ZooKeeperServer zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, zkPort);
-
-        NIOServerCnxnFactory serverFactory = new NIOServerCnxnFactory();
-        serverFactory.configure(new InetSocketAddress(zkPort), 100);
-        serverFactory.startup(zks);
-
-        boolean b = ClientBase.waitForServerUp("127.0.0.1:" + zkPort, 5000);
-        ServerConfiguration serverConf = new ServerConfiguration();
-        serverConf.loadConf(hedwigConfigFile.toURI().toURL());
-
-        logger.info("Zookeeper server up and running!");
-
-        ZooKeeper zkc = new ZooKeeper("127.0.0.1:" + zkPort, 5000, null);
-
-        // initialize the zk client with (fake) values
-        zkc.create("/ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-        zkc.create("/ledgers/available", new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-        zkc.close();
-        PubSubServer hedwigServer = null;
-        try {
-            logger.info("starting hedwig broker!");
-            hedwigServer = new PubSubServer(serverConf, new 
ClientConfiguration(), new LoggingExceptionHandler());
-            hedwigServer.start();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        Assert.assertNotNull("failed to instantiate hedwig pub sub server", 
hedwigServer);
-
-        hedwigServer.shutdown();
-        serverFactory.shutdown();
-
-        zks.shutdown();
-
-        zkTmpDir.delete();
-
-        ClientBase.waitForServerDown("localhost:" + zkPort, 10000);
-
-    }
-
-    public static void writeStringToFile(String string, File f) throws 
IOException {
-        if (f.exists()) {
-            if (!f.delete()) {
-                throw new RuntimeException("cannot create file " + 
f.getAbsolutePath());
-            }
-        }
-        if (!f.createNewFile()) {
-            throw new RuntimeException("cannot create new file " + 
f.getAbsolutePath());
-        }
-
-        FileWriter fw = new FileWriter(f);
-        fw.write(string);
-        fw.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
deleted file mode 100644
index 978649a..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.util.Callback;
-
-public class StubDeliveryManager implements DeliveryManager {
-
-    public static class StartServingRequest {
-        public ByteString topic;
-        public ByteString subscriberId;
-        public MessageSeqId seqIdToStartFrom;
-        public DeliveryEndPoint endPoint;
-        public ServerMessageFilter filter;
-
-        public StartServingRequest(ByteString topic, ByteString subscriberId,
-                                   SubscriptionPreferences preferences,
-                                   MessageSeqId seqIdToStartFrom,
-                                   DeliveryEndPoint endPoint,
-                                   ServerMessageFilter filter) {
-            this.topic = topic;
-            this.subscriberId = subscriberId;
-            this.seqIdToStartFrom = seqIdToStartFrom;
-            this.endPoint = endPoint;
-            this.filter = filter;
-        }
-
-    }
-
-    public Queue<Object> lastRequest = new LinkedList<Object>();
-
-    @Override
-    public void startServingSubscription(ByteString topic, ByteString 
subscriberId,
-                                         SubscriptionPreferences preferences,
-                                         MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint,
-                                         ServerMessageFilter filter,
-                                         Callback<Void> cb, Object ctx) {
-        lastRequest.add(new StartServingRequest(topic, subscriberId, 
preferences,
-                                                seqIdToStartFrom, endPoint, 
filter));
-        cb.operationFinished(ctx, null);
-    }
-
-    @Override
-    public void stopServingSubscriber(ByteString topic, ByteString 
subscriberId,
-                                      SubscriptionEvent event,
-                                      Callback<Void> cb, Object ctx) {
-        lastRequest.add(new TopicSubscriber(topic, subscriberId));
-        cb.operationFinished(ctx, null);
-    }
-
-    @Override
-    public void messageConsumed(ByteString topic, ByteString subscriberId,
-                                MessageSeqId seqId) {
-        // do nothing
-    }
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-        // do nothing now
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
deleted file mode 100644
index ebc26f1..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.StubTopicManager;
-import org.apache.hedwig.server.persistence.PersistRequest;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.persistence.StubPersistenceManager;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-import org.apache.hedwig.util.Callback;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-public class TestFIFODeliveryManager {
-    private static final Logger logger = 
LoggerFactory.getLogger(TestFIFODeliveryManager.class);
-
-    static class TestCallback implements Callback<MessageSeqId> {
-        AtomicBoolean success = new AtomicBoolean(false);
-        final CountDownLatch latch;
-        MessageSeqId msgid = null;
-
-        TestCallback(CountDownLatch l) {
-            this.latch = l;
-        }
-        public void operationFailed(Object ctx, PubSubException exception) {
-            logger.error("Persist operation failed", exception);
-            latch.countDown();
-        }
-
-        public void operationFinished(Object ctx, MessageSeqId 
resultOfOperation) {
-            msgid = resultOfOperation;
-            success.set(true);
-            latch.countDown();
-        }
-
-        MessageSeqId getId() {
-            assertTrue("Persist operation failed", success.get());
-            return msgid;
-        }
-    }
-
-    /**
-     * Delivery endpoint which puts all responses on a queue
-     */
-    static class ExecutorDeliveryEndPointWithQueue implements DeliveryEndPoint 
{
-        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
-        AtomicInteger numResponses = new AtomicInteger(0);
-        ConcurrentLinkedQueue<PubSubResponse> queue = new 
ConcurrentLinkedQueue<PubSubResponse>();
-
-        public void send(final PubSubResponse response, final DeliveryCallback 
callback) {
-            logger.info("Received response {}", response);
-            queue.add(response);
-            numResponses.incrementAndGet();
-            executor.submit(new Runnable() {
-                    public void run() {
-                        callback.sendingFinished();
-                    }
-                });
-        }
-
-        public void close() {
-            executor.shutdown();
-        }
-
-        PubSubResponse getNextResponse() {
-            return queue.poll();
-        }
-
-        int getNumResponses() {
-            return numResponses.get();
-        }
-    }
-
-    /**
-     * Test that the FIFO delivery manager executes stopServing and 
startServing
-     * in the correct order
-     * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-539}
-     */
-    @Test(timeout = 60000)
-    public void testFIFODeliverySubCloseSubRace() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration();
-        ByteString topic = ByteString.copyFromUtf8("subRaceTopic");
-        ByteString subscriber = ByteString.copyFromUtf8("subRaceSubscriber");
-
-        PersistenceManager pm = new StubPersistenceManager();
-        FIFODeliveryManager fdm = new FIFODeliveryManager(new 
StubTopicManager(conf), pm, conf);
-        ExecutorDeliveryEndPointWithQueue dep = new 
ExecutorDeliveryEndPointWithQueue();
-        SubscriptionPreferences prefs = 
SubscriptionPreferences.newBuilder().build();
-
-        PipelineFilter filter = new PipelineFilter();
-        filter.addLast(new AllToAllTopologyFilter());
-        filter.initialize(conf.getConf());
-        filter.setSubscriptionPreferences(topic, subscriber, prefs);
-        MessageSeqId startId = 
MessageSeqId.newBuilder().setLocalComponent(1).build();
-
-        CountDownLatch l = new CountDownLatch(1);
-        Message m = 
Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(1))).build();
-        TestCallback cb = new TestCallback(l);
-        pm.persistMessage(new PersistRequest(topic, m, cb, null));
-        assertTrue("Persistence never finished", l.await(10, 
TimeUnit.SECONDS));
-
-        final CountDownLatch oplatch = new CountDownLatch(3);
-        fdm.start();
-        fdm.startServingSubscription(topic, subscriber, prefs, startId, dep, 
filter,
-                new Callback<Void>() {
-                     @Override
-                     public void operationFinished(Object ctx, Void result) {
-                         oplatch.countDown();
-                     }
-                     @Override
-                     public void operationFailed(Object ctx, PubSubException 
exception) {
-                         oplatch.countDown();
-                     }
-                }, null);
-        fdm.stopServingSubscriber(topic, subscriber, null,
-                new Callback<Void>() {
-                     @Override
-                     public void operationFinished(Object ctx, Void result) {
-                         oplatch.countDown();
-                     }
-                     @Override
-                     public void operationFailed(Object ctx, PubSubException 
exception) {
-                         oplatch.countDown();
-                     }
-                }, null);
-        fdm.startServingSubscription(topic, subscriber, prefs, startId, dep, 
filter,
-                new Callback<Void>() {
-                     @Override
-                     public void operationFinished(Object ctx, Void result) {
-                         oplatch.countDown();
-                     }
-                     @Override
-                     public void operationFailed(Object ctx, PubSubException 
exception) {
-                         oplatch.countDown();
-                     }
-                }, null);
-
-        assertTrue("Ops never finished", oplatch.await(10, TimeUnit.SECONDS));
-        int seconds = 5;
-        while (dep.getNumResponses() < 2) {
-            if (seconds-- == 0) {
-                break;
-            }
-            Thread.sleep(1000);
-        }
-        PubSubResponse r = dep.getNextResponse();
-        assertNotNull("There should be a response", r);
-        assertTrue("Response should contain a message", r.hasMessage());
-        r = dep.getNextResponse();
-        assertNotNull("There should be a response", r);
-        assertTrue("Response should contain a message", r.hasMessage());
-        r = dep.getNextResponse();
-        assertNull("There should only be 2 responses", r);
-    }
-
-    static class ExecutorDeliveryEndPoint implements DeliveryEndPoint {
-        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
-        AtomicInteger numDelivered = new AtomicInteger();
-        final DeliveryManager dm;
-
-        ExecutorDeliveryEndPoint(DeliveryManager dm) {
-            this.dm = dm;
-        }
-
-        public void send(final PubSubResponse response, final DeliveryCallback 
callback) {
-            executor.submit(new Runnable() {
-                    public void run() {
-                        if (response.hasMessage()) {
-                            MessageSeqId msgid = 
response.getMessage().getMsgId();
-                            if ((msgid.getLocalComponent() % 2) == 1) {
-                                dm.messageConsumed(response.getTopic(),
-                                        response.getSubscriberId(),
-                                        response.getMessage().getMsgId());
-                            } else {
-                                executor.schedule(new Runnable() {
-                                        public void run() {
-                                            
dm.messageConsumed(response.getTopic(),
-                                                    response.getSubscriberId(),
-                                                    
response.getMessage().getMsgId());
-                                        }
-                                    }, 1, TimeUnit.SECONDS);
-                            }
-                        }
-                        numDelivered.incrementAndGet();
-                        callback.sendingFinished();
-                    }
-                });
-        }
-
-        public void close() {
-            executor.shutdown();
-        }
-
-        int getNumDelivered() {
-            return numDelivered.get();
-        }
-    }
-
-    /**
-     * Test throttle race issue cause by messageConsumed and 
doDeliverNextMessage
-     * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503}
-     */
-    @Test(timeout = 60000)
-    public void testFIFODeliveryThrottlingRace() throws Exception {
-        final int numMessages = 20;
-        final int throttleSize = 10;
-        ServerConfiguration conf = new ServerConfiguration() {
-                @Override
-                public int getDefaultMessageWindowSize() {
-                    return throttleSize;
-                }
-            };
-        ByteString topic = ByteString.copyFromUtf8("throttlingRaceTopic");
-        ByteString subscriber = 
ByteString.copyFromUtf8("throttlingRaceSubscriber");
-
-        PersistenceManager pm = new StubPersistenceManager();
-        FIFODeliveryManager fdm = new FIFODeliveryManager(new 
StubTopicManager(conf), pm, conf);
-        ExecutorDeliveryEndPoint dep = new ExecutorDeliveryEndPoint(fdm);
-        SubscriptionPreferences prefs = 
SubscriptionPreferences.newBuilder().build();
-
-        PipelineFilter filter = new PipelineFilter();
-        filter.addLast(new AllToAllTopologyFilter());
-        filter.initialize(conf.getConf());
-        filter.setSubscriptionPreferences(topic, subscriber, prefs);
-
-        CountDownLatch l = new CountDownLatch(numMessages);
-
-        TestCallback firstCallback = null;
-        for (int i = 0; i < numMessages; i++) {
-            Message m = 
Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(i))).build();
-            TestCallback cb = new TestCallback(l);
-            if (firstCallback == null) {
-                firstCallback = cb;
-            }
-            pm.persistMessage(new PersistRequest(topic, m, cb, null));
-        }
-        fdm.start();
-        assertTrue("Persistence never finished", l.await(10, 
TimeUnit.SECONDS));
-        fdm.startServingSubscription(topic, subscriber, prefs, 
firstCallback.getId(), dep, filter,
-                new Callback<Void>() {
-                     @Override
-                     public void operationFinished(Object ctx, Void result) {
-                     }
-                     @Override
-                     public void operationFailed(Object ctx, PubSubException 
exception) {
-                         // would not happened
-                     }
-                }, null);
-
-        int count = 30; // wait for 30 seconds maximum
-        while (dep.getNumDelivered() < numMessages) {
-            Thread.sleep(1000);
-            if (count-- == 0) {
-                break;
-            }
-        }
-        assertEquals("Should have delivered " + numMessages, numMessages, 
dep.getNumDelivered());
-    }
-
-}

Reply via email to