http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java
deleted file mode 100644
index 98d36b6..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubSSL.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import java.util.Collection;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class TestHedwigHubSSL extends TestHedwigHub {
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] { { true }, { false } });
-    }
-
-    public TestHedwigHubSSL(boolean isSubscriptionChannelSharingEnabled) {
-        super(Mode.SSL, isSubscriptionChannelSharingEnabled);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
deleted file mode 100644
index bce41e5..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import 
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.server.HedwigRegionTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.integration.TestHedwigHub.TestCallback;
-import org.apache.hedwig.server.integration.TestHedwigHub.TestMessageHandler;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestHedwigRegion extends HedwigRegionTestBase {
-
-    // SynchronousQueues to verify async calls
-    private final SynchronousQueue<Boolean> queue = new 
SynchronousQueue<Boolean>();
-    private final SynchronousQueue<Boolean> consumeQueue = new 
SynchronousQueue<Boolean>();
-
-    private static final int TEST_RETRY_REMOTE_SUBSCRIBE_INTERVAL_VALUE = 3000;
-
-    protected class NewRegionServerConfiguration extends 
RegionServerConfiguration {
-
-        public NewRegionServerConfiguration(int serverPort, int sslServerPort,
-                String regionName) {
-            super(serverPort, sslServerPort, regionName);
-        }
-
-        @Override
-        public int getRetryRemoteSubscribeThreadRunInterval() {
-            return TEST_RETRY_REMOTE_SUBSCRIBE_INTERVAL_VALUE;
-        }
-
-    }
-
-    protected class NewRegionClientConfiguration extends ClientConfiguration {
-        @Override
-        public boolean isSubscriptionChannelSharingEnabled() {
-            return isSubscriptionChannelSharingEnabled;
-        }
-        @Override
-        public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
-            return regionHubAddresses.get(0).get(0);
-        }
-    }
-
-    protected ServerConfiguration getServerConfiguration(int serverPort, int 
sslServerPort, String regionName) {
-        return new NewRegionServerConfiguration(serverPort, sslServerPort, 
regionName);
-    }
-
-    protected ClientConfiguration getRegionClientConfiguration() {
-        return new NewRegionClientConfiguration();
-    }
-
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] { { false }, { true } });
-    }
-
-    protected boolean isSubscriptionChannelSharingEnabled;
-
-    public TestHedwigRegion(boolean isSubscriptionChannelSharingEnabled) {
-        this.isSubscriptionChannelSharingEnabled = 
isSubscriptionChannelSharingEnabled;
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        numRegions = 3;
-        numServersPerRegion = 4;
-        super.setUp();
-    }
-
-    @Test(timeout=60000)
-    public void testMultiRegionSubscribeAndConsume() throws Exception {
-        int batchSize = 10;
-        // Subscribe to topics for clients in all regions
-        for (HedwigClient client : regionClientsMap.values()) {
-            for (int i = 0; i < batchSize; i++) {
-                SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                    
.setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-                
client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
-                        ByteString.copyFromUtf8("LocalSubscriber"), opts, new 
TestCallback(queue), null);
-                assertTrue(queue.take());
-            }
-        }
-
-        // Start delivery for the local subscribers in all regions
-        for (HedwigClient client : regionClientsMap.values()) {
-            for (int i = 0; i < batchSize; i++) {
-                
client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i),
-                                                     
ByteString.copyFromUtf8("LocalSubscriber"), new 
TestMessageHandler(consumeQueue));
-            }
-        }
-
-        // Now start publishing messages for the subscribed topics in one of 
the
-        // regions and verify that it gets delivered and consumed in all of the
-        // other ones.
-        Publisher publisher = 
regionClientsMap.values().iterator().next().getPublisher();
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), 
Message.newBuilder().setBody(
-                                       ByteString.copyFromUtf8("Message" + 
i)).build(), new TestCallback(queue), null);
-            assertTrue(queue.take());
-        }
-        // Make sure each region consumes the same set of published messages.
-        for (int i = 0; i < regionClientsMap.size(); i++) {
-            for (int j = 0; j < batchSize; j++) {
-                assertTrue(consumeQueue.take());
-            }
-        }
-    }
-
-    /**
-     * Test region shuts down when first subscription.
-     *
-     * @throws Exception
-     */
-    @Test(timeout=60000)
-    public void testSubscribeAndConsumeWhenARegionDown() throws Exception {
-        int batchSize = 10;
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        // first shut down a region
-        Random r = new Random();
-        int regionId = r.nextInt(numRegions);
-        stopRegion(regionId);
-        // subscribe to topics when a region shuts down
-        for (HedwigClient client : regionClientsMap.values()) {
-            for (int i = 0; i < batchSize; i++) {
-                
client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
-                                                      
ByteString.copyFromUtf8("LocalSubscriber"),
-                                                      opts, new 
TestCallback(queue), null);
-                assertFalse(queue.take());
-            }
-        }
-
-        // start region gain
-        startRegion(regionId);
-
-        // sub it again
-        for (Map.Entry<String, HedwigClient> entry : 
regionClientsMap.entrySet()) {
-            HedwigClient client = entry.getValue();
-            for (int i = 0; i < batchSize; i++) {
-                
client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
-                                                      
ByteString.copyFromUtf8("LocalSubscriber"),
-                                                      opts, new 
TestCallback(queue), null);
-                assertTrue(queue.take());
-            }
-        }
-
-        // Start delivery for local subscribers in all regions
-        for (Map.Entry<String, HedwigClient> entry : 
regionClientsMap.entrySet()) {
-            HedwigClient client = entry.getValue();
-            for (int i = 0; i < batchSize; i++) {
-                
client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i),
-                                                     
ByteString.copyFromUtf8("LocalSubscriber"), new 
TestMessageHandler(consumeQueue));
-            }
-        }
-
-        // Now start publishing messages for the subscribed topics in one of 
the
-        // regions and verify that it gets delivered and consumed in all of the
-        // other ones.
-        int rid = r.nextInt(numRegions);
-        String regionName = REGION_PREFIX + rid;
-        Publisher publisher = regionClientsMap.get(regionName).getPublisher();
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), 
Message.newBuilder().setBody(
-                                   ByteString.copyFromUtf8(regionName + 
"-Message" + i)).build(), new TestCallback(queue), null);
-            assertTrue(queue.take());
-        }
-        // Make sure each region consumes the same set of published messages.
-        for (int i = 0; i < regionClientsMap.size(); i++) {
-            for (int j = 0; j < batchSize; j++) {
-                assertTrue(consumeQueue.take());
-            }
-        }
-    }
-
-    /**
-     * Test region shuts down when attaching existing subscriptions.
-     *
-     * @throws Exception
-     */
-    @Test(timeout=60000)
-    public void testAttachExistingSubscriptionsWhenARegionDown() throws 
Exception {
-        int batchSize = 10;
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        // sub it remotely to make subscriptions existed
-        for (Map.Entry<String, HedwigClient> entry : 
regionClientsMap.entrySet()) {
-            HedwigClient client = entry.getValue();
-            for (int i = 0; i < batchSize; i++) {
-                
client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
-                                                      
ByteString.copyFromUtf8("LocalSubscriber"),
-                                                      opts, new 
TestCallback(queue), null);
-                assertTrue(queue.take());
-            }
-        }
-
-        // stop regions
-        for (int i=0; i<numRegions; i++) {
-            stopRegion(i);
-        }
-        // start regions again
-        for (int i=0; i<numRegions; i++) {
-            startRegion(i);
-        }
-
-        // first shut down a region
-        Random r = new Random();
-        int regionId = r.nextInt(numRegions);
-        stopRegion(regionId);
-        // subscribe to topics when a region shuts down
-        // it should succeed since the subscriptions existed before
-        for (HedwigClient client : regionClientsMap.values()) {
-            for (int i = 0; i < batchSize; i++) {
-                
client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
-                                                      
ByteString.copyFromUtf8("LocalSubscriber"),
-                                                      opts, new 
TestCallback(queue), null);
-                assertTrue(queue.take());
-            }
-        }
-
-        // Start delivery for local subscribers in all regions
-        for (Map.Entry<String, HedwigClient> entry : 
regionClientsMap.entrySet()) {
-            HedwigClient client = entry.getValue();
-            for (int i = 0; i < batchSize; i++) {
-                
client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i),
-                                                     
ByteString.copyFromUtf8("LocalSubscriber"), new 
TestMessageHandler(consumeQueue));
-            }
-        }
-
-        // start region again
-        startRegion(regionId);
-        // wait for retry
-        Thread.sleep(3 * TEST_RETRY_REMOTE_SUBSCRIBE_INTERVAL_VALUE);
-
-        String regionName = REGION_PREFIX + regionId;
-        HedwigClient client = regionClientsMap.get(regionName);
-        for (int i = 0; i < batchSize; i++) {
-            
client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
-                                                  
ByteString.copyFromUtf8("LocalSubscriber"),
-                                                  opts, new 
TestCallback(queue), null);
-            assertTrue(queue.take());
-            
client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i),
-                    ByteString.copyFromUtf8("LocalSubscriber"), new 
TestMessageHandler(consumeQueue));
-        }
-
-        // Now start publishing messages for the subscribed topics in one of 
the
-        // regions and verify that it gets delivered and consumed in all of the
-        // other ones.
-        Publisher publisher = client.getPublisher();
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), 
Message.newBuilder().setBody(
-                                   ByteString.copyFromUtf8(regionName + 
"-Message" + i)).build(), new TestCallback(queue), null);
-            assertTrue(queue.take());
-        }
-        // Make sure each region consumes the same set of published messages.
-        for (int i = 0; i < regionClientsMap.size(); i++) {
-            for (int j = 0; j < batchSize; j++) {
-                assertTrue(consumeQueue.take());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java
deleted file mode 100644
index 644500d..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.hedwig.server.meta;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.bookkeeper.metastore.InMemoryMetaStore;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.ZkMetadataManagerFactory;
-import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(Parameterized.class)
-public abstract class MetadataManagerFactoryTestCase extends ZooKeeperTestBase 
{
-    private static final Logger LOG = 
LoggerFactory.getLogger(MetadataManagerFactoryTestCase.class);
-
-    protected MetadataManagerFactory metadataManagerFactory;
-    protected ServerConfiguration conf;
-
-    public MetadataManagerFactoryTestCase(String metadataManagerFactoryCls) {
-        super();
-        conf = new ServerConfiguration();
-        conf.setMetadataManagerFactoryName(metadataManagerFactoryCls);
-        conf.getConf().setProperty("metastore_impl_class", 
InMemoryMetaStore.class.getName());
-        InMemoryMetaStore.reset();
-    }
-
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] {
-            { ZkMetadataManagerFactory.class.getName() },
-            { MsMetadataManagerFactory.class.getName() },
-        });
-    }
-
-    @Before
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-        metadataManagerFactory = 
MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-    }
-
-    @After
-    @Override
-    public void tearDown() throws Exception {
-        metadataManagerFactory.shutdown();
-        super.tearDown();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java
deleted file mode 100644
index 7e15135..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.hedwig.server.meta;
-
-import java.io.IOException;
-
-import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
-import org.apache.hedwig.zookeeper.ZkUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-
-import org.junit.Test;
-import org.junit.Assert;
-
-public class TestFactoryLayout extends ZooKeeperTestBase {
-
-    @Test(timeout=60000)
-    public void testFactoryLayout() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration();
-        conf.setMetadataManagerFactoryName(
-            "org.apache.hedwig.server.meta.ZkMetadataManager");
-
-        FactoryLayout layout = FactoryLayout.readLayout(zk, conf);
-        Assert.assertTrue("Layout should be null", layout == null);
-
-        String testName = "foobar";
-        int testVersion = 0xdeadbeef;
-        // use layout defined in configuration also create it in zookeeper
-        writeFactoryLayout(conf, testName, testVersion);
-
-        layout = FactoryLayout.readLayout(zk, conf);
-        Assert.assertEquals(testName, 
layout.getManagerMeta().getManagerImpl());
-        Assert.assertEquals(testVersion, 
layout.getManagerMeta().getManagerVersion());
-    }
-
-    private void writeFactoryLayout(ServerConfiguration conf, String 
managerCls,
-                                    int managerVersion)
-        throws Exception {
-        ManagerMeta managerMeta = ManagerMeta.newBuilder()
-                                             .setManagerImpl(managerCls)
-                                             .setManagerVersion(managerVersion)
-                                             .build();
-        FactoryLayout layout = new FactoryLayout(managerMeta);
-        layout.store(zk, conf);
-    }
-
-    @Test(timeout=60000)
-    public void testCorruptedFactoryLayout() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration();
-        StringBuilder msb = new StringBuilder();
-        String factoryLayoutPath = FactoryLayout.getFactoryLayoutPath(msb, 
conf);
-        // write corrupted manager layout
-        ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, 
"BadLayout".getBytes(),
-                                         Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-
-        try {
-            FactoryLayout.readLayout(zk, conf);
-            Assert.fail("Shouldn't reach here!");
-        } catch (IOException ie) {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
deleted file mode 100644
index 7e395e9..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.hedwig.server.meta;
-
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.junit.Test;
-import org.junit.Assert;
-
-public class TestMetadataManager extends MetadataManagerFactoryTestCase {
-
-    public TestMetadataManager(String metadataManagerFactoryCls) {
-        super(metadataManagerFactoryCls);
-    }
-
-    @Test(timeout=60000)
-    public void testOwnerInfo() throws Exception {
-        TopicOwnershipManager toManager = 
metadataManagerFactory.newTopicOwnershipManager();
-
-        ByteString topic = ByteString.copyFromUtf8("testOwnerInfo");
-        StubCallback<Versioned<HubInfo>> readCallback = new 
StubCallback<Versioned<HubInfo>>();
-        StubCallback<Version> writeCallback = new StubCallback<Version>();
-        StubCallback<Void> deleteCallback = new StubCallback<Void>();
-
-        Either<Version, PubSubException> res;
-        HubInfo owner = new HubInfo(new HedwigSocketAddress("127.0.0.1", 
8008), 999);
-
-        // Write non-existed owner info
-        toManager.writeOwnerInfo(topic, owner, Version.NEW, writeCallback, 
null);
-        res = writeCallback.queue.take();
-        Assert.assertEquals(null, res.right());
-        Version v1 = res.left();
-
-        // read owner info
-        toManager.readOwnerInfo(topic, readCallback, null);
-        Versioned<HubInfo> hubInfo = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v1.compare(hubInfo.getVersion()));
-        Assert.assertEquals(owner, hubInfo.getValue());
-
-        HubInfo newOwner = new HubInfo(new HedwigSocketAddress("127.0.0.1", 
8008), 1000);
-
-        // write exsited owner info with null version
-        toManager.writeOwnerInfo(topic, newOwner, Version.NEW, writeCallback, 
null);
-        res = writeCallback.queue.take();
-        Assert.assertNotNull(res.right());
-        Assert.assertTrue(res.right() instanceof 
PubSubException.TopicOwnerInfoExistsException);
-
-        // write existed owner info with right version
-        toManager.writeOwnerInfo(topic, newOwner, v1, writeCallback, null);
-        res = writeCallback.queue.take();
-        Assert.assertEquals(null, res.right());
-        Version v2 = res.left();
-        Assert.assertEquals(Version.Occurred.AFTER, v2.compare(v1));
-
-        // read owner info
-        toManager.readOwnerInfo(topic, readCallback, null);
-        hubInfo = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(hubInfo.getVersion()));
-        Assert.assertEquals(newOwner, hubInfo.getValue());
-
-        HubInfo newOwner2 = new HubInfo(new HedwigSocketAddress("127.0.0.1", 
8008), 1001);
-
-        // write existed owner info with bad version
-        toManager.writeOwnerInfo(topic, newOwner2, v1,
-                                 writeCallback, null);
-        res = writeCallback.queue.take();
-        Assert.assertNotNull(res.right());
-        Assert.assertTrue(res.right() instanceof 
PubSubException.BadVersionException);
-
-        // read owner info
-        toManager.readOwnerInfo(topic, readCallback, null);
-        hubInfo = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(hubInfo.getVersion()));
-        Assert.assertEquals(newOwner, hubInfo.getValue());
-
-        // delete existed owner info with bad version
-        toManager.deleteOwnerInfo(topic, v1, deleteCallback, null);
-        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
-                          PubSubException.BadVersionException);
-
-        // read owner info
-        toManager.readOwnerInfo(topic, readCallback, null);
-        hubInfo = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(hubInfo.getVersion()));
-
-        // delete existed owner info with right version
-        toManager.deleteOwnerInfo(topic, v2, deleteCallback, null);
-        Assert.assertEquals(null, deleteCallback.queue.take().right());
-
-        // Empty owner info
-        toManager.readOwnerInfo(topic, readCallback, null);
-        Assert.assertEquals(null, readCallback.queue.take().left());
-
-        // delete non-existed owner info
-        toManager.deleteOwnerInfo(topic, Version.ANY, deleteCallback, null);
-        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
-                          PubSubException.NoTopicOwnerInfoException);
-
-        toManager.close();
-    }
-
-    @Test(timeout=60000)
-    public void testPersistenceInfo() throws Exception {
-        TopicPersistenceManager tpManager = 
metadataManagerFactory.newTopicPersistenceManager();
-
-        ByteString topic = ByteString.copyFromUtf8("testPersistenceInfo");
-        StubCallback<Versioned<LedgerRanges>> readCallback = new 
StubCallback<Versioned<LedgerRanges>>();
-        StubCallback<Version> writeCallback = new StubCallback<Version>();
-        StubCallback<Void> deleteCallback = new StubCallback<Void>();
-
-        // Write non-existed persistence info
-        tpManager.writeTopicPersistenceInfo(topic, 
LedgerRanges.getDefaultInstance(),
-                                            Version.NEW, writeCallback, null);
-        Either<Version, PubSubException> res = writeCallback.queue.take();
-        Assert.assertEquals(null, res.right());
-        Version v1 = res.left();
-
-        // read persistence info
-        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
-        Versioned<LedgerRanges> ranges = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v1.compare(ranges.getVersion()));
-        Assert.assertEquals(LedgerRanges.getDefaultInstance(), 
ranges.getValue());
-
-        LedgerRange lastRange = 
LedgerRange.newBuilder().setLedgerId(1).build();
-        LedgerRanges.Builder builder = LedgerRanges.newBuilder();
-        builder.addRanges(lastRange);
-        LedgerRanges newRanges = builder.build();
-
-        // write existed persistence info with null version
-        tpManager.writeTopicPersistenceInfo(topic, newRanges, Version.NEW,
-                                            writeCallback, null);
-        res = writeCallback.queue.take();
-        Assert.assertNotNull(res.right());
-        Assert.assertTrue(res.right() instanceof 
PubSubException.TopicPersistenceInfoExistsException);
-
-        // write existed persistence info with right version
-        tpManager.writeTopicPersistenceInfo(topic, newRanges, v1,
-                                            writeCallback, null);
-        res = writeCallback.queue.take();
-        Assert.assertEquals(null, res.right());
-        Version v2 = res.left();
-        Assert.assertEquals(Version.Occurred.AFTER, v2.compare(v1));
-
-        // read persistence info
-        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
-        ranges = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(ranges.getVersion()));
-        Assert.assertEquals(newRanges, ranges.getValue());
-
-        lastRange = LedgerRange.newBuilder().setLedgerId(2).build();
-        builder = LedgerRanges.newBuilder();
-        builder.addRanges(lastRange);
-        LedgerRanges newRanges2 = builder.build();
-
-        // write existed persistence info with bad version
-        tpManager.writeTopicPersistenceInfo(topic, newRanges2, v1,
-                                            writeCallback, null);
-        res = writeCallback.queue.take();
-        Assert.assertNotNull(res.right());
-        Assert.assertTrue(res.right() instanceof 
PubSubException.BadVersionException);
-
-        // read persistence info
-        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
-        ranges = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(ranges.getVersion()));
-        Assert.assertEquals(newRanges, ranges.getValue());
-
-        // delete with bad version
-        tpManager.deleteTopicPersistenceInfo(topic, v1, deleteCallback, null);
-        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
-                          PubSubException.BadVersionException);
-
-        // read persistence info
-        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
-        ranges = readCallback.queue.take().left();
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(ranges.getVersion()));
-        Assert.assertEquals(newRanges, ranges.getValue());
-
-        // delete existed persistence info with right version
-        tpManager.deleteTopicPersistenceInfo(topic, v2, deleteCallback, null);
-        Assert.assertEquals(null, deleteCallback.queue.take().right());
-
-        // read empty persistence info
-        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
-        Assert.assertEquals(null, readCallback.queue.take().left());
-
-        // delete non-existed persistence info
-        tpManager.deleteTopicPersistenceInfo(topic, Version.ANY, 
deleteCallback, null);
-        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
-                          PubSubException.NoTopicPersistenceInfoException);
-
-        tpManager.close();
-    }
-
-    @Test(timeout=60000)
-    public void testSubscriptionData() throws Exception {
-        SubscriptionDataManager subManager = 
metadataManagerFactory.newSubscriptionDataManager();
-
-        ByteString topic = ByteString.copyFromUtf8("testSubscriptionData");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        final StubCallback<Version> callback = new StubCallback<Version>();
-        StubCallback<Versioned<SubscriptionData>> readCallback = new 
StubCallback<Versioned<SubscriptionData>>();
-        StubCallback<Map<ByteString, Versioned<SubscriptionData>>> subsCallback
-            = new StubCallback<Map<ByteString, Versioned<SubscriptionData>>>();
-
-        subManager.readSubscriptionData(topic, subid, readCallback, null);
-        Either<Versioned<SubscriptionData>, PubSubException> readRes = 
readCallback.queue.take();
-        Assert.assertEquals("Found inconsistent subscription state", null, 
readRes.left());
-        Assert.assertEquals("Should not fail with PubSubException", null, 
readRes.right());
-
-        // read non-existed subscription state
-        subManager.readSubscriptions(topic, subsCallback, null);
-        Either<Map<ByteString, Versioned<SubscriptionData>>, PubSubException> 
res = subsCallback.queue.take();
-        Assert.assertEquals("Found more than 0 subscribers", 0, 
res.left().size());
-        Assert.assertEquals("Should not fail with PubSubException", null, 
res.right());
-
-        // update non-existed subscription state
-        if (subManager.isPartialUpdateSupported()) {
-            subManager.updateSubscriptionData(topic, subid, 
-                    SubscriptionData.getDefaultInstance(), Version.ANY, 
callback, null);
-        } else {
-            subManager.replaceSubscriptionData(topic, subid, 
-                    SubscriptionData.getDefaultInstance(), Version.ANY, 
callback, null);
-        }
-        Assert.assertTrue("Should fail to update a non-existed subscriber with 
PubSubException",
-                          callback.queue.take().right() instanceof 
PubSubException.NoSubscriptionStateException);
-
-        Callback<Void> voidCallback = new Callback<Void>() {
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                callback.operationFinished(ctx, null);
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) 
{
-                callback.operationFailed(ctx, exception);
-            }
-        }; 
-        
-        // delete non-existed subscription state
-        subManager.deleteSubscriptionData(topic, subid, Version.ANY, 
voidCallback, null);
-        Assert.assertTrue("Should fail to delete a non-existed subscriber with 
PubSubException",
-                          callback.queue.take().right() instanceof 
PubSubException.NoSubscriptionStateException);
-
-        long seqId = 10;
-        MessageSeqId.Builder builder = MessageSeqId.newBuilder();
-        builder.setLocalComponent(seqId);
-        MessageSeqId msgId = builder.build();
-
-        SubscriptionState.Builder stateBuilder = 
SubscriptionState.newBuilder(SubscriptionState.getDefaultInstance()).setMsgId(msgId);
-        SubscriptionData data = 
SubscriptionData.newBuilder().setState(stateBuilder).build();
-
-        // create a subscription state
-        subManager.createSubscriptionData(topic, subid, data, callback, null);
-        Either<Version, PubSubException> cbResult = callback.queue.take();
-        Version v1 = cbResult.left();
-        Assert.assertEquals("Should not fail with PubSubException",
-                            null, cbResult.right());
-
-        // read subscriptions
-        subManager.readSubscriptions(topic, subsCallback, null);
-        res = subsCallback.queue.take();
-        Assert.assertEquals("Should find just 1 subscriber", 1, 
res.left().size());
-        Assert.assertEquals("Should not fail with PubSubException", null, 
res.right());
-        Versioned<SubscriptionData> versionedSubData = res.left().get(subid);
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v1.compare(versionedSubData.getVersion()));
-        SubscriptionData imss = versionedSubData.getValue();
-        Assert.assertEquals("Found inconsistent subscription state",
-                            data, imss);
-        Assert.assertEquals("Found inconsistent last consumed seq id",
-                            seqId, 
imss.getState().getMsgId().getLocalComponent());
-
-        // move consume seq id
-        seqId = 99;
-        builder = MessageSeqId.newBuilder();
-        builder.setLocalComponent(seqId);
-        msgId = builder.build();
-
-        stateBuilder = 
SubscriptionState.newBuilder(data.getState()).setMsgId(msgId);
-        data = SubscriptionData.newBuilder().setState(stateBuilder).build();
-        
-        // update subscription state
-        if (subManager.isPartialUpdateSupported()) {
-            subManager.updateSubscriptionData(topic, subid, data, 
versionedSubData.getVersion(), callback, null);
-        } else {
-            subManager.replaceSubscriptionData(topic, subid, data, 
versionedSubData.getVersion(), callback, null);
-        }
-        cbResult = callback.queue.take();
-        Assert.assertEquals("Fail to update a subscription state", null, 
cbResult.right());
-        Version v2 = cbResult.left();
-        // read subscription state
-        subManager.readSubscriptionData(topic, subid, readCallback, null);
-        Assert.assertEquals("Found inconsistent subscription state",
-                            data, readCallback.queue.take().left().getValue());
-        
-        // read subscriptions again
-        subManager.readSubscriptions(topic, subsCallback, null);
-        res = subsCallback.queue.take();
-        Assert.assertEquals("Should find just 1 subscriber", 1, 
res.left().size());
-        Assert.assertEquals("Should not fail with PubSubException", null, 
res.right());
-        versionedSubData = res.left().get(subid);
-        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(versionedSubData.getVersion()));
-        imss = res.left().get(subid).getValue();
-        Assert.assertEquals("Found inconsistent subscription state",
-                            data, imss);
-        Assert.assertEquals("Found inconsistent last consumed seq id",
-                            seqId, 
imss.getState().getMsgId().getLocalComponent());
-
-        // update or replace subscription data with bad version
-        if (subManager.isPartialUpdateSupported()) {
-            subManager.updateSubscriptionData(topic, subid, data, v1, 
callback, null);
-        } else {
-            subManager.replaceSubscriptionData(topic, subid, data, v1, 
callback, null);
-        }
-        Assert.assertTrue(callback.queue.take().right() instanceof 
PubSubException.BadVersionException);
-        
-        // delete with bad version
-        subManager.deleteSubscriptionData(topic, subid, v1, voidCallback, 
null);
-        Assert.assertTrue(callback.queue.take().right() instanceof 
PubSubException.BadVersionException);
-        subManager.deleteSubscriptionData(topic, subid, 
res.left().get(subid).getVersion(), voidCallback, null);
-        Assert.assertEquals("Fail to delete an existed subscriber", null, 
callback.queue.take().right());
-
-        // read subscription states again
-        subManager.readSubscriptions(topic, subsCallback, null);
-        res = subsCallback.queue.take();
-        Assert.assertEquals("Found more than 0 subscribers", 0, 
res.left().size());
-        Assert.assertEquals("Should not fail with PubSubException", null, 
res.right());
-
-        subManager.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
deleted file mode 100644
index 8b9016a..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.hedwig.server.meta;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import java.io.IOException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.CountDownLatch;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
-import org.junit.Test;
-import org.junit.Assert;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestMetadataManagerFactory extends ZooKeeperTestBase {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestMetadataManagerFactory.class);
-
-    static class TestServerConfiguration extends ServerConfiguration {
-        String hedwigPrefix = "/hedwig";
-
-        @Override
-        public String getZkPrefix() {
-            return hedwigPrefix;
-        }
-
-        public void setZkPrefix(String prefix) {
-            this.hedwigPrefix = prefix;
-        }
-    }
-
-    static class DummyMetadataManagerFactory extends MetadataManagerFactory {
-        static int VERSION = 10;
-
-        public int getCurrentVersion() { return VERSION; }
-
-
-        public MetadataManagerFactory initialize(ServerConfiguration cfg,
-                                                 ZooKeeper zk,
-                                                 int version)
-        throws IOException {
-            if (version != VERSION) {
-                throw new IOException("unmatched manager version");
-            }
-            // do nothing
-            return this;
-        }
-
-        public void shutdown() {}
-
-        public Iterator<ByteString> getTopics() {
-            return null;
-        }
-
-        public TopicPersistenceManager newTopicPersistenceManager() {
-            return null;
-        }
-
-        public SubscriptionDataManager newSubscriptionDataManager() {
-            return null;
-        }
-
-        public TopicOwnershipManager newTopicOwnershipManager() {
-            return null;
-        }
-
-        public void format(ServerConfiguration cfg, ZooKeeper zk) throws 
IOException {
-            // do nothing
-        }
-    }
-
-    private void writeFactoryLayout(ServerConfiguration conf,
-                                    String factoryCls,
-                                    int factoryVersion)
-        throws Exception {
-        ManagerMeta meta = ManagerMeta.newBuilder()
-                                      .setManagerImpl(factoryCls)
-                                      
.setManagerVersion(factoryVersion).build();
-        new FactoryLayout(meta).store(zk, conf);
-    }
-
-    /**
-     * Test bad server configuration
-     */
-    @Test(timeout=60000)
-    public void testBadConf() throws Exception {
-        TestServerConfiguration conf = new TestServerConfiguration();
-
-        String root0 = "/goodconf";
-        conf.setZkPrefix(root0);
-
-        MetadataManagerFactory m =
-            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-        Assert.assertTrue("MetadataManagerFactory is unexpected type",
-                          (m instanceof ZkMetadataManagerFactory));
-
-        // mismatching conf
-        
conf.setMetadataManagerFactoryName(DummyMetadataManagerFactory.class.getName());
-        try {
-            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-            Assert.fail("Shouldn't reach here");
-        } catch (Exception e) {
-            Assert.assertTrue("Invalid exception",
-                              e.getMessage().contains("does not match existing 
factory"));
-        }
-
-        // invalid metadata manager
-        String root1 = "/badconf1";
-        conf.setZkPrefix(root1);
-        conf.setMetadataManagerFactoryName("DoesNotExist");
-        try {
-            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-            Assert.fail("Shouldn't reach here");
-        } catch (Exception e) {
-            Assert.assertTrue("Invalid exception",
-                              e.getMessage().contains("Failed to get metadata 
manager factory class from configuration"));
-        }
-    }
-
-    /**
-     * Test bad zk configuration
-     */
-    @Test(timeout=60000)
-    public void testBadZkContents() throws Exception {
-        TestServerConfiguration conf = new TestServerConfiguration();
-
-        // bad type in zookeeper
-        String root0 = "/badzk0";
-        conf.setZkPrefix(root0);
-
-        writeFactoryLayout(conf, "DoesNotExist", 0xdeadbeef);
-        try {
-            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-            Assert.fail("Shouldn't reach here");
-        } catch (Exception e) {
-            Assert.assertTrue("Invalid exception",
-                              e.getMessage().contains("No class found to 
instantiate metadata manager factory"));
-        }
-
-        // bad version in zookeeper
-        String root1 = "/badzk1";
-        conf.setZkPrefix(root1);
-
-        writeFactoryLayout(conf, ZkMetadataManagerFactory.class.getName(), 
0xdeadbeef);
-        try {
-            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-            Assert.fail("Shouldn't reach here");
-        } catch (Exception e) {
-            Assert.assertTrue("Invalid exception",
-                              e.getMessage().contains("Incompatible 
ZkMetadataManagerFactory version"));
-        }
-    }
-
-    private class CreateMMThread extends Thread {
-        private boolean success = false;
-        private final String factoryCls;
-        private final String root;
-        private final CyclicBarrier barrier;
-        private ZooKeeper zkc;
-
-        CreateMMThread(String root, String factoryCls, CyclicBarrier barrier) 
throws Exception {
-            this.factoryCls = factoryCls;
-            this.barrier = barrier;
-            this.root = root;
-            final CountDownLatch latch = new CountDownLatch(1);
-            zkc = new ZooKeeper(hostPort, 10000, new Watcher() {
-                public void process(WatchedEvent event) {
-                    latch.countDown();
-                }
-            });
-            latch.await();
-        }
-
-        public void run() {
-            TestServerConfiguration conf = new TestServerConfiguration();
-            conf.setZkPrefix(root);
-            conf.setMetadataManagerFactoryName(factoryCls);
-
-            try {
-                barrier.await();
-                MetadataManagerFactory.newMetadataManagerFactory(conf, zkc);
-                success = true;
-            } catch (Exception e) {
-                LOG.error("Failed to create metadata manager factory", e);
-            }
-        }
-
-        public boolean isSuccessful() {
-            return success;
-        }
-
-        public void close() throws Exception {
-            zkc.close();
-        }
-    }
-
-    // test concurrent
-    @Test(timeout=60000)
-    public void testConcurrent1() throws Exception {
-        /// everyone creates the same
-        int numThreads = 50;
-
-        // bad version in zookeeper
-        String root0 = "/lmroot0";
-
-        CyclicBarrier barrier = new CyclicBarrier(numThreads+1);
-        List<CreateMMThread> threads = new 
ArrayList<CreateMMThread>(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            CreateMMThread t = new CreateMMThread(root0, 
ZkMetadataManagerFactory.class.getName(), barrier);
-            t.start();
-            threads.add(t);
-        }
-
-        barrier.await();
-
-        boolean success = true;
-        for (CreateMMThread t : threads) {
-            t.join();
-            t.close();
-            success = t.isSuccessful() && success;
-        }
-        Assert.assertTrue("Not all metadata manager factories created", 
success);
-    }
-
-    @Test(timeout=60000)
-    public void testConcurrent2() throws Exception {
-        /// odd create different
-        int numThreadsEach = 25;
-
-        // bad version in zookeeper
-        String root0 = "/lmroot0";
-
-        CyclicBarrier barrier = new CyclicBarrier(numThreadsEach*2+1);
-        List<CreateMMThread> threadsA = new 
ArrayList<CreateMMThread>(numThreadsEach);
-        for (int i = 0; i < numThreadsEach; i++) {
-            CreateMMThread t = new CreateMMThread(root0, 
ZkMetadataManagerFactory.class.getName(), barrier);
-            t.start();
-            threadsA.add(t);
-        }
-        List<CreateMMThread> threadsB = new 
ArrayList<CreateMMThread>(numThreadsEach);
-        for (int i = 0; i < numThreadsEach; i++) {
-            CreateMMThread t = new CreateMMThread(root0, 
DummyMetadataManagerFactory.class.getName(), barrier);
-            t.start();
-            threadsB.add(t);
-        }
-
-        barrier.await();
-
-        int numSuccess = 0;
-        int numFails = 0;
-        for (CreateMMThread t : threadsA) {
-            t.join();
-            t.close();
-            if (t.isSuccessful()) {
-                numSuccess++;
-            } else {
-                numFails++;
-            }
-        }
-
-        for (CreateMMThread t : threadsB) {
-            t.join();
-            t.close();
-            if (t.isSuccessful()) {
-                numSuccess++;
-            } else {
-                numFails++;
-            }
-        }
-        Assert.assertEquals("Incorrect number of successes", numThreadsEach, 
numSuccess);
-        Assert.assertEquals("Incorrect number of failures", numThreadsEach, 
numFails);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
deleted file mode 100644
index 241f45b..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.netty;
-
-import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Test;
-
-import org.apache.bookkeeper.test.PortManager;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.AbstractTopicManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.LoggingExceptionHandler;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
-
-import static org.junit.Assert.*;
-
-public class TestPubSubServer extends PubSubServerStandAloneTestBase {
-
-    @Test(timeout=60000)
-    public void testSecondServer() throws Exception {
-        PubSubServer server1 = new PubSubServer(new 
StandAloneServerConfiguration() {
-                @Override
-                public int getServerPort() {
-                    return super.getServerPort() + 1;
-                }
-            }, new ClientConfiguration(), new LoggingExceptionHandler());
-        server1.start();
-        server1.shutdown();
-    }
-
-    class RecordingUncaughtExceptionHandler implements 
Thread.UncaughtExceptionHandler {
-        SynchronousQueue<Throwable> queue;
-
-        public RecordingUncaughtExceptionHandler(SynchronousQueue<Throwable> 
queue) {
-            this.queue = queue;
-        }
-
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-            queue.add(e);
-        }
-
-    }
-
-    private interface TopicManagerInstantiator {
-        public TopicManager instantiateTopicManager() throws IOException;
-    }
-
-    PubSubServer startServer(final UncaughtExceptionHandler 
uncaughtExceptionHandler, final int port,
-                             final TopicManagerInstantiator instantiator) 
throws Exception {
-        PubSubServer server = new PubSubServer(new 
StandAloneServerConfiguration() {
-            @Override
-            public int getServerPort() {
-                return port;
-            }
-
-        }, new ClientConfiguration(), uncaughtExceptionHandler) {
-
-            @Override
-            protected TopicManager instantiateTopicManager() throws 
IOException {
-                return instantiator.instantiateTopicManager();
-            }
-        };
-        server.start();
-        return server;
-
-    }
-
-    public void runPublishRequest(final int port) throws Exception {
-        Publisher publisher = new HedwigClient(new ClientConfiguration() {
-            @Override
-            public InetSocketAddress getDefaultServerHost() {
-                return new InetSocketAddress("localhost", port);
-            }
-        }).getPublisher();
-
-        publisher.asyncPublish(ByteString.copyFromUtf8("blah"), 
Message.newBuilder().setBody(
-        ByteString.copyFromUtf8("blah")).build(), new Callback<Void>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) 
{
-                assertTrue(false);
-            }
-
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                assertTrue(false);
-            }
-
-        }, null);
-    }
-
-    @Test(timeout=60000)
-    public void testUncaughtExceptionInNettyThread() throws Exception {
-
-        SynchronousQueue<Throwable> queue = new SynchronousQueue<Throwable>();
-        RecordingUncaughtExceptionHandler uncaughtExceptionHandler = new 
RecordingUncaughtExceptionHandler(queue);
-        final int port = PortManager.nextFreePort();
-
-        PubSubServer server = startServer(uncaughtExceptionHandler, port, new 
TopicManagerInstantiator() {
-
-            @Override
-            public TopicManager instantiateTopicManager() throws IOException {
-                return new AbstractTopicManager(new ServerConfiguration(), 
Executors.newSingleThreadScheduledExecutor()) {
-                    @Override
-                    protected void realGetOwner(ByteString topic, boolean 
shouldClaim,
-                    Callback<HedwigSocketAddress> cb, Object ctx) {
-                        throw new RuntimeException("this exception should be 
uncaught");
-                    }
-
-                    @Override
-                    protected void postReleaseCleanup(ByteString topic, 
Callback<Void> cb, Object ctx) {
-                    }
-                };
-            }
-        });
-
-        runPublishRequest(port);
-        assertEquals(RuntimeException.class, queue.take().getClass());
-        server.shutdown();
-    }
-
-    @Test(timeout=60000)
-    public void testUncaughtExceptionInZKThread() throws Exception {
-
-        SynchronousQueue<Throwable> queue = new SynchronousQueue<Throwable>();
-        RecordingUncaughtExceptionHandler uncaughtExceptionHandler = new 
RecordingUncaughtExceptionHandler(queue);
-        final int port = PortManager.nextFreePort();
-        final String hostPort = "127.0.0.1:" + PortManager.nextFreePort();
-
-        PubSubServer server = startServer(uncaughtExceptionHandler, port, new 
TopicManagerInstantiator() {
-
-            @Override
-            public TopicManager instantiateTopicManager() throws IOException {
-                return new AbstractTopicManager(new ServerConfiguration(), 
Executors.newSingleThreadScheduledExecutor()) {
-
-                    @Override
-                    protected void realGetOwner(ByteString topic, boolean 
shouldClaim,
-                    Callback<HedwigSocketAddress> cb, Object ctx) {
-                        ZooKeeper zookeeper;
-                        try {
-                            zookeeper = new ZooKeeper(hostPort, 60000, new 
Watcher() {
-                                @Override
-                                public void process(WatchedEvent event) {
-                                    // TODO Auto-generated method stub
-
-                                }
-                            });
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-
-                        zookeeper.getData("/fake", false, new 
SafeAsyncZKCallback.DataCallback() {
-                            @Override
-                            public void safeProcessResult(int rc, String path, 
Object ctx, byte[] data,
-                            org.apache.zookeeper.data.Stat stat) {
-                                throw new RuntimeException("This should go to 
the uncaught exception handler");
-                            }
-
-                        }, null);
-                    }
-
-                    @Override
-                    protected void postReleaseCleanup(ByteString topic, 
Callback<Void> cb, Object ctx) {
-                    }
-                };
-            }
-        });
-
-        runPublishRequest(port);
-        assertEquals(RuntimeException.class, queue.take().getClass());
-        server.shutdown();
-    }
-
-    @Test(timeout=60000)
-    public void testInvalidServerConfiguration() throws Exception {
-        boolean success = false;
-        ServerConfiguration conf = new ServerConfiguration() {
-            @Override
-            public boolean isInterRegionSSLEnabled() {
-                return conf.getBoolean(INTER_REGION_SSL_ENABLED, true);
-            }
-
-            @Override
-            public List<String> getRegions() {
-                List<String> regionsList = new LinkedList<String>();
-                regionsList.add("regionHost1:4080:9876");
-                regionsList.add("regionHost2:4080");
-                regionsList.add("regionHost3:4080:9876");
-                return regionsList;
-            }
-        };
-        try {
-            conf.validate();
-        }
-        catch (ConfigurationException e) {
-            logger.error("Invalid configuration: ", e);
-            success = true;
-        }
-        assertTrue(success);
-    }
-
-    @Test(timeout=60000)
-    public void testValidServerConfiguration() throws Exception {
-        boolean success = true;
-        ServerConfiguration conf = new ServerConfiguration() {
-            @Override
-            public boolean isInterRegionSSLEnabled() {
-                return conf.getBoolean(INTER_REGION_SSL_ENABLED, true);
-            }
-
-            @Override
-            public List<String> getRegions() {
-                List<String> regionsList = new LinkedList<String>();
-                regionsList.add("regionHost1:4080:9876");
-                regionsList.add("regionHost2:4080:2938");
-                regionsList.add("regionHost3:4080:9876");
-                return regionsList;
-            }
-        };
-        try {
-            conf.validate();
-        }
-        catch (ConfigurationException e) {
-            logger.error("Invalid configuration: ", e);
-            success = false;
-        }
-        assertTrue(success);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java
deleted file mode 100644
index 08f5ad8..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.netty;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.junit.Test;
-
-/** Tests that Statistics updation in hedwig Server */
-public class TestServerStats {
-
-    /**
-     * Tests that updatLatency should not fail with
-     * ArrayIndexOutOfBoundException when latency time coming as negative.
-     */
-    @Test(timeout=60000)
-    public void testUpdateLatencyShouldNotFailWithAIOBEWithNegativeLatency()
-            throws Exception {
-        OpStats opStat = new OpStats();
-        opStat.updateLatency(-10);
-        assertEquals("Should not update any latency metrics", 0,
-                opStat.numSuccessOps);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java
deleted file mode 100644
index 91cf2fe..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.netty;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelConfig;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.DefaultChannelFuture;
-import org.jboss.netty.channel.SucceededChannelFuture;
-
-public class WriteRecordingChannel implements Channel {
-
-    public boolean closed = false;
-    ChannelFuture closingFuture = new DefaultChannelFuture(this, false);
-    List<Object> messagesWritten = new LinkedList<Object>();
-
-    public List<Object> getMessagesWritten() {
-        return messagesWritten;
-    }
-
-    public void clearMessages() {
-        messagesWritten.clear();
-    }
-
-    @Override
-    public ChannelFuture bind(SocketAddress localAddress) {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelFuture close() {
-        closed = true;
-        closingFuture.setSuccess();
-        return new SucceededChannelFuture(this);
-    }
-
-    @Override
-    public ChannelFuture connect(SocketAddress remoteAddress) {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelFuture disconnect() {
-        return close();
-    }
-
-    @Override
-    public ChannelFuture getCloseFuture() {
-        return closingFuture;
-    }
-
-    @Override
-    public ChannelConfig getConfig() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelFactory getFactory() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public Integer getId() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public int getInterestOps() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public SocketAddress getLocalAddress() {
-        return new InetSocketAddress("localhost", 1234);
-    }
-
-    @Override
-    public Channel getParent() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return new InetSocketAddress("www.yahoo.com", 80);
-    }
-
-    @Override
-    public boolean isBound() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public boolean isConnected() {
-        return closed == false;
-    }
-
-    @Override
-    public boolean isOpen() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public boolean isReadable() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public boolean isWritable() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelFuture setInterestOps(int interestOps) {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelFuture setReadable(boolean readable) {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelFuture unbind() {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public ChannelFuture write(Object message) {
-        messagesWritten.add(message);
-        return new SucceededChannelFuture(this);
-    }
-
-    @Override
-    public ChannelFuture write(Object message, SocketAddress remoteAddress) {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public int compareTo(Channel o) {
-        throw new RuntimeException("Not intended");
-    }
-
-    @Override
-    public void setAttachment(Object attachment) {}
-
-    @Override
-    public Object getAttachment() { throw new RuntimeException("Not 
intended"); }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
 
b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
deleted file mode 100644
index b71d037..0000000
--- 
a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.io.File;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
-import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
-import org.apache.bookkeeper.test.PortManager;
-
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hedwig.util.FileUtils;
-import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a base class for any tests that require a BookKeeper client/server
- * setup.
- *
- */
-public class BookKeeperTestBase extends ZooKeeperTestBase {
-    private static final Logger LOG = 
LoggerFactory.getLogger(BookKeeperTestBase.class);
-
-    class TestBookie extends Bookie {
-        final long readDelay;
-
-        public TestBookie(ServerConfiguration conf, long readDelay)
-            throws IOException, KeeperException, InterruptedException, 
BookieException {
-            super(conf);
-            this.readDelay = readDelay;
-        }
-
-        @Override
-        public ByteBuffer readEntry(long ledgerId, long entryId)
-            throws IOException, NoLedgerException {
-            if (readDelay > 0) {
-                try {
-                    Thread.sleep(readDelay);
-                } catch (InterruptedException ie) {
-                }
-            }
-            return super.readEntry(ledgerId, entryId);
-        }
-    }
-
-    class TestBookieServer extends BookieServer {
-        public TestBookieServer(ServerConfiguration conf)
-            throws IOException,
-                KeeperException, InterruptedException, BookieException,
-                UnavailableException, CompatibilityException {
-            super(conf);
-        }
-
-        protected Bookie newBookie(ServerConfiguration conf)
-            throws IOException, KeeperException, InterruptedException, 
BookieException {
-            return new TestBookie(conf, readDelay);
-        }
-    }
-
-    // BookKeeper Server variables
-    private List<BookieServer> bookiesList;
-    private List<ServerConfiguration> bkConfsList;
-
-    // String constants used for creating the bookie server files.
-    private static final String PREFIX = "bookie";
-    private static final String SUFFIX = "test";
-
-    // readDelay
-    protected long readDelay;
-
-    // Variable to decide how many bookie servers to set up.
-    private final int numBookies;
-    // BookKeeper client instance
-    protected BookKeeper bk;
-
-    protected ServerConfiguration baseConf = newServerConfiguration();
-    protected ClientConfiguration baseClientConf = new ClientConfiguration();
-
-    // Constructor
-    public BookKeeperTestBase(int numBookies) {
-        this(numBookies, 0L);
-    }
-
-    public BookKeeperTestBase(int numBookies, long readDelay) {
-        this.numBookies = numBookies;
-        this.readDelay = readDelay;
-    }
-
-    public BookKeeperTestBase() {
-        // By default, use 3 bookies.
-        this(3);
-    }
-
-    // Getter for the ZooKeeper client instance that the parent class sets up.
-    protected ZooKeeper getZooKeeperClient() {
-        return zk;
-    }
-
-    // Give junit a fake test so that its happy
-    @Test(timeout=60000)
-    public void testNothing() throws Exception {
-
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        // Initialize the zk client with values
-        try {
-            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            LOG.error("Error setting up", e);
-        } catch (InterruptedException e) {
-            LOG.error("Error setting up", e);
-        }
-
-        // Create Bookie Servers
-        bookiesList = new LinkedList<BookieServer>();
-        bkConfsList = new LinkedList<ServerConfiguration>();
-
-        for (int i = 0; i < numBookies; i++) {
-            startUpNewBookieServer();
-        }
-
-        // Create the BookKeeper client
-        bk = new BookKeeper(hostPort);
-    }
-
-    public String getZkHostPort() {
-        return hostPort;
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        // Shutdown all of the bookie servers
-        for (BookieServer bs : bookiesList) {
-            bs.shutdown();
-        }
-        // Close the BookKeeper client
-        bk.close();
-        super.tearDown();
-    }
-
-    public void stopAllBookieServers() throws Exception {
-        for (BookieServer bs : bookiesList) {
-            bs.shutdown();
-        }
-        bookiesList.clear();
-    }
-
-    public void startAllBookieServers() throws Exception {
-        for (ServerConfiguration conf : bkConfsList) {
-            bookiesList.add(startBookie(conf));
-        }
-    }
-
-    public void suspendAllBookieServers() throws Exception {
-        for (BookieServer bs : bookiesList) {
-            bs.suspendProcessing();
-        }
-    }
-
-    public void resumeAllBookieServers() throws Exception {
-        for (BookieServer bs : bookiesList) {
-            bs.resumeProcessing();
-        }
-    }
-
-    public void tearDownOneBookieServer() throws Exception {
-        Random r = new Random();
-        int bi = r.nextInt(bookiesList.size());
-        BookieServer bs = bookiesList.get(bi);
-        bs.shutdown();
-        bookiesList.remove(bi);
-        bkConfsList.remove(bi);
-    }
-    
-    public void startUpNewBookieServer() throws Exception {
-        int port = PortManager.nextFreePort();
-        File tmpDir = FileUtils.createTempDirectory(
-                PREFIX + port, SUFFIX);
-        ServerConfiguration conf = newServerConfiguration(
-                port, hostPort, tmpDir, new File[] { tmpDir });
-        bookiesList.add(startBookie(conf));
-        bkConfsList.add(conf);
-    }
-
-    /**
-     * Helper method to startup a bookie server using a configuration object
-     *
-     * @param conf
-     *            Server Configuration Object
-     *
-     */
-    private BookieServer startBookie(ServerConfiguration conf) throws 
Exception {
-        BookieServer server = new TestBookieServer(conf);
-        server.start();
-
-        int port = conf.getBookiePort();
-        while(zk.exists("/ledgers/available/" + 
InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
-            Thread.sleep(500);
-        }
-
-        return server;
-    }
-
-    // construct the basic server configuration for bookkeeper testing
-    private static ServerConfiguration newServerConfiguration() {
-        ServerConfiguration conf = new ServerConfiguration();
-        conf.setJournalFlushWhenQueueEmpty(true);
-        conf.setJournalAdaptiveGroupWrites(false);
-        return conf;
-    }
-
-    protected ServerConfiguration newServerConfiguration(int port, String 
zkServers, File journalDir, File[] ledgerDirs) {
-        ServerConfiguration conf = new ServerConfiguration(baseConf);
-        conf.setAllowLoopback(true);
-        conf.setBookiePort(port);
-        conf.setZkServers(zkServers);
-        conf.setJournalDirName(journalDir.getPath());
-        String[] ledgerDirNames = new String[ledgerDirs.length];
-        for (int i=0; i<ledgerDirs.length; i++) {
-            ledgerDirNames[i] = ledgerDirs[i].getPath();
-        }
-        conf.setLedgerDirNames(ledgerDirNames);
-        return conf;
-    }
-
-}

Reply via email to