Repository: bookkeeper Updated Branches: refs/heads/master 410ff7263 -> 9a8d62b1d
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java deleted file mode 100644 index a54d0d4..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.SynchronousQueue; - -import junit.framework.Assert; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.junit.Test; - -import com.google.protobuf.ByteString; - -public class TestTopicBasedLoadShedder { - - final protected SynchronousQueue<Boolean> statusQueue = new SynchronousQueue<Boolean>(); - private int myTopics = 10; - private int numHubs = 10; - private List<ByteString> mockTopicList; - private final HubLoad infiniteMaxLoad = new HubLoad(10000000); - Map<HubInfo, HubLoad> mockLoadMap = new HashMap<HubInfo, HubLoad>(); - - class MockTopicBasedLoadShedder extends TopicBasedLoadShedder { - // This is set by the reduceLoadTo function. - public HubLoad targetLoad; - public MockTopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList, - Double tolerancePercentage, HubLoad maxLoadToShed) { - super(tm, topicList, tolerancePercentage, maxLoadToShed); - } - @Override - public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback, final Object ctx) { - this.targetLoad = targetLoad; - // Indicates that we released these many topics. - callback.operationFinished(ctx, targetLoad.toHubLoadData().getNumTopics()); - } - } - public Callback<Boolean> getShedLoadCallback(final MockTopicBasedLoadShedder ls, final HubLoad expected, - final Boolean shouldRelease, final Boolean shouldFail) { - return new Callback<Boolean>() { - @Override - public void operationFinished(Object o, Boolean aBoolean) { - Boolean status = false; - status = (aBoolean == shouldRelease); - if (shouldRelease) { - status &= (ls.targetLoad != null); - status &= (expected.numTopics == ls.targetLoad.numTopics); - } - final Boolean statusToPut = status; - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(statusQueue, statusToPut); - } - }).start(); - } - - @Override - public void operationFailed(Object o, PubSubException e) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(statusQueue, shouldFail); - } - }).start(); - } - }; - } - - private List<ByteString> getMockTopicList(int numTopics) { - List<ByteString> topics = new ArrayList<ByteString>(); - for (int i = 0; i < numTopics; i++) { - topics.add(ByteString.copyFromUtf8("MyTopic_" + i)); - } - return topics; - } - - private HubInfo getHubInfo(int hubNum) { - return new HubInfo(new HedwigSocketAddress("myhub.testdomain.foo"+hubNum+":4080:4080"), 0); - } - - private synchronized void initialize(int myTopics, int numHubs, int[] otherHubsLoad) { - if (null != otherHubsLoad) { - Assert.assertTrue(otherHubsLoad.length == numHubs - 1); - } - this.myTopics = myTopics; - mockTopicList = getMockTopicList(this.myTopics); - this.numHubs = numHubs; - this.mockLoadMap.clear(); - this.mockLoadMap.put(getHubInfo(0), new HubLoad(this.myTopics)); - for (int i = 1; i < this.numHubs; i++) { - this.mockLoadMap.put(getHubInfo(i), new HubLoad(otherHubsLoad[i-1])); - } - } - - private int[] getEqualLoadDistributionArray(int n, int load) { - if (n == 0) { - return null; - } - int[] retLoad = new int[n]; - Arrays.fill(retLoad, load); - return retLoad; - } - - @Test(timeout = 60000) - public synchronized void testAllHubsSameTopics() throws Exception { - // All hubs have the same number of topics. We should not release any topics even with a - // tolerance of 0.0. - initialize(10, 10, getEqualLoadDistributionArray(9, 10)); - MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null); - Assert.assertTrue(statusQueue.take()); - } - - @Test(timeout = 60000) - public synchronized void testOneHubUnequalTopics() throws Exception { - // The hub has 20 topics while the average is 11. Should reduce the load to 11. - initialize(20, 10, getEqualLoadDistributionArray(9, 10)); - MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null); - Assert.assertTrue(statusQueue.take()); - } - - @Test(timeout = 60000) - public synchronized void testOneHubUnequalTopicsWithTolerance() throws Exception { - // The hub has 20 topics and average is 11. Should still release as tolerance level of 50.0 is - // breached. Should get down to average. - initialize(20, 10, getEqualLoadDistributionArray(9, 10)); - MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 50.0, infiniteMaxLoad); - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null); - Assert.assertTrue(statusQueue.take()); - - // A tolerance level of 100.0 should result in the hub not releasing topics. - tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 100.0, infiniteMaxLoad); - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null); - Assert.assertTrue(statusQueue.take()); - } - - @Test(timeout = 60000) - public synchronized void testMaxLoadShed() throws Exception { - // The hub should not shed more than maxLoadShed topics. - initialize(20, 10, getEqualLoadDistributionArray(9, 10)); - MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(5)); - // Our load should reduce to 15. - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(15), true, false), null); - Assert.assertTrue(statusQueue.take()); - - // We should reduce to 11 even when maxLoadShed and average result in the same - // values - tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(9)); - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null); - Assert.assertTrue(statusQueue.take()); - } - - @Test(timeout = 60000) - public synchronized void testSingleHubLoadShed() throws Exception { - // If this is the only hub in the cluster, it should not release any topics. - initialize(20, 1, null); - MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null); - Assert.assertTrue(statusQueue.take()); - } - - @Test(timeout = 60000) - public synchronized void testUnderloadedClusterLoadShed() throws Exception { - // Hold on to at least one topic while shedding load (if cluster is underloaded) - initialize(5, 10, getEqualLoadDistributionArray(9, 0)); - MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); - tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(1), true, false), null); - Assert.assertTrue(statusQueue.take()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java deleted file mode 100644 index 90e77b2..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java +++ /dev/null @@ -1,376 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; - -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.CompositeException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.hedwig.util.Pair; -import org.apache.hedwig.zookeeper.ZooKeeperTestBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestZkTopicManager extends ZooKeeperTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(TestZkTopicManager.class); - - protected ZkTopicManager tm; - - protected class CallbackQueue<T> implements Callback<T> { - SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>(); - - public SynchronousQueue<Either<T, Exception>> getQueue() { - return q; - } - - public Either<T, Exception> take() throws InterruptedException { - return q.take(); - } - - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - LOG.error("got exception: " + exception); - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception)); - } - }).start(); - } - - @Override - public void operationFinished(Object ctx, final T resultOfOperation) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null)); - } - }).start(); - } - } - - protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>(); - protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>(); - protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>(); - - protected ByteString topic = ByteString.copyFromUtf8("topic"); - protected ServerConfiguration cfg; - protected HedwigSocketAddress me; - protected ScheduledExecutorService scheduler; - - private volatile int DEFAULT_MAX_NUM_TOPICS = Integer.MAX_VALUE; - private volatile int DEFAULT_RETENTION_SECS_AFTER_ACCESS = 0; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - cfg = new ServerConfiguration() { - @Override - public int getRetentionSecsAfterAccess() { - return DEFAULT_RETENTION_SECS_AFTER_ACCESS; - } - @Override - public int getMaxNumTopics() { - return DEFAULT_MAX_NUM_TOPICS; - } - }; - me = cfg.getServerAddr(); - scheduler = Executors.newSingleThreadScheduledExecutor(); - tm = new ZkTopicManager(zk, cfg, scheduler); - } - - @Override - @After - public void tearDown() throws Exception { - tm.stop(); - super.tearDown(); - } - - @Test(timeout=60000) - public void testGetOwnerSingle() throws Exception { - tm.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - } - - protected ByteString mkTopic(int i) { - return ByteString.copyFromUtf8(topic.toStringUtf8() + i); - } - - protected <T> T check(Either<T, Exception> ex) throws Exception { - if (ex.left() == null) - throw ex.right(); - else - return ex.left(); - } - - public static class CustomServerConfiguration extends ServerConfiguration { - int port; - - public CustomServerConfiguration(int port) { - this.port = port; - } - - @Override - public int getServerPort() { - return port; - } - } - - @Test(timeout=60000) - public void testGetOwnerMulti() throws Exception { - ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration( - cfg.getServerPort() + 2); - // TODO change cfg1 cfg2 params - ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), - tm2 = new ZkTopicManager(zk, cfg2, scheduler); - - tm.getOwner(topic, false, addrCbq, null); - HedwigSocketAddress owner = check(addrCbq.take()); - - // If we were told to have another person claim the topic, make them - // claim the topic. - if (owner.getPort() == cfg1.getServerPort()) - tm1.getOwner(topic, true, addrCbq, null); - else if (owner.getPort() == cfg2.getServerPort()) - tm2.getOwner(topic, true, addrCbq, null); - if (owner.getPort() != cfg.getServerPort()) - Assert.assertEquals(owner, check(addrCbq.take())); - - for (int i = 0; i < 100; ++i) { - tm.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(owner, check(addrCbq.take())); - - tm1.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(owner, check(addrCbq.take())); - - tm2.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(owner, check(addrCbq.take())); - } - - // Give us 100 chances to choose another owner if not shouldClaim. - for (int i = 0; i < 100; ++i) { - if (!owner.equals(me)) - break; - tm.getOwner(mkTopic(i), false, addrCbq, null); - owner = check(addrCbq.take()); - if (i == 99) - Assert.fail("Never chose another owner"); - } - - // Make sure we always choose ourselves if shouldClaim. - for (int i = 0; i < 100; ++i) { - tm.getOwner(mkTopic(100), true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - } - - tm1.stop(); - tm2.stop(); - } - - @Test(timeout=60000) - public void testLoadBalancing() throws Exception { - tm.getOwner(topic, false, addrCbq, null); - - Assert.assertEquals(me, check(addrCbq.take())); - - ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1); - TopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler); - - ByteString topic1 = mkTopic(1); - tm.getOwner(topic1, false, addrCbq, null); - Assert.assertEquals(cfg1.getServerAddr(), check(addrCbq.take())); - - tm1.stop(); - } - - class StubOwnershipChangeListener implements TopicOwnershipChangeListener { - boolean failure; - SynchronousQueue<Pair<ByteString, Boolean>> bsQueue; - - public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>> bsQueue) { - this.bsQueue = bsQueue; - } - - public void setFailure(boolean failure) { - this.failure = failure; - } - - @Override - public void lostTopic(final ByteString topic) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(bsQueue, Pair.of(topic, false)); - } - }).start(); - } - - public void acquiredTopic(final ByteString topic, final Callback<Void> callback, final Object ctx) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(bsQueue, Pair.of(topic, true)); - if (failure) { - callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail")); - } else { - callback.operationFinished(ctx, null); - } - } - }).start(); - } - } - - @Test(timeout=60000) - public void testOwnershipChange() throws Exception { - SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString, Boolean>>(); - - StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue); - - tm.addTopicOwnershipChangeListener(listener); - - // regular acquire - tm.getOwner(topic, true, addrCbq, null); - Pair<ByteString, Boolean> pair = bsQueue.take(); - Assert.assertEquals(topic, pair.first()); - Assert.assertTrue(pair.second()); - Assert.assertEquals(me, check(addrCbq.take())); - assertOwnershipNodeExists(); - - // topic that I already own - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Assert.assertTrue(bsQueue.isEmpty()); - assertOwnershipNodeExists(); - - // regular release - tm.releaseTopic(topic, cb, null); - pair = bsQueue.take(); - Assert.assertEquals(topic, pair.first()); - Assert.assertFalse(pair.second()); - Assert.assertTrue(queue.take()); - assertOwnershipNodeDoesntExist(); - - // releasing topic that I don't own - tm.releaseTopic(mkTopic(0), cb, null); - Assert.assertTrue(queue.take()); - Assert.assertTrue(bsQueue.isEmpty()); - - // set listener to return error - listener.setFailure(true); - - tm.getOwner(topic, true, addrCbq, null); - pair = bsQueue.take(); - Assert.assertEquals(topic, pair.first()); - Assert.assertTrue(pair.second()); - Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right()) - .getExceptions().iterator().next().getClass()); - Assert.assertFalse(null != tm.topics.getIfPresent(topic)); - Thread.sleep(100); - assertOwnershipNodeDoesntExist(); - - } - - public void assertOwnershipNodeExists() throws Exception { - byte[] data = zk.getData(tm.hubPath(topic), false, null); - Assert.assertEquals(HubInfo.parse(new String(data)).getAddress(), - tm.addr); - } - - public void assertOwnershipNodeDoesntExist() throws Exception { - try { - zk.getData(tm.hubPath(topic), false, null); - Assert.assertTrue(false); - } catch (KeeperException e) { - Assert.assertEquals(e.code(), KeeperException.Code.NONODE); - } - } - - @Test(timeout=60000) - public void testZKClientDisconnected() throws Exception { - // First assert ownership of the topic - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - - // Suspend the ZKTopicManager and make sure calls to getOwner error out - tm.isSuspended = true; - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass()); - // Release the topic. This should not error out even if suspended. - tm.releaseTopic(topic, cb, null); - Assert.assertTrue(queue.take()); - assertOwnershipNodeDoesntExist(); - - // Restart the ZKTopicManager and make sure calls to getOwner are okay - tm.isSuspended = false; - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - assertOwnershipNodeExists(); - } - - @Test(timeout=60000) - public void testRetentionAfterAccess() throws Exception { - DEFAULT_RETENTION_SECS_AFTER_ACCESS = 5; - ZkTopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler); - tm1.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Thread.sleep(6000L); - tm1.topics.cleanUp(); - Thread.sleep(2000L); - assertOwnershipNodeDoesntExist(); - tm1.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Thread.sleep(1000L); - tm1.topics.cleanUp(); - Thread.sleep(2000L); - assertOwnershipNodeExists(); - - tm1.stop(); - } - - @Test(timeout=60000) - public void testMaxNumTopics() throws Exception { - DEFAULT_MAX_NUM_TOPICS = 1; - TopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler); - tm1.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - assertOwnershipNodeExists(); - tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"), - true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Thread.sleep(2000L); - assertOwnershipNodeDoesntExist(); - tm1.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java b/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java deleted file mode 100644 index e025e76..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.zookeeper; - -import java.util.Arrays; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; -import org.junit.Assert; -import org.junit.Test; - -public class TestZkUtils extends ZooKeeperTestBase { - - @Test(timeout=60000) - public void testCreateFullPathOptimistic() throws Exception { - testPath("/a/b/c", CreateMode.EPHEMERAL); - - testPath("/b/c/d", CreateMode.PERSISTENT); - - testPath("/b/c/d/e", CreateMode.PERSISTENT); - - } - - void testPath(String path, CreateMode mode) throws Exception { - byte[] data = new byte[] { 77 }; - ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, mode, strCb, null); - Assert.assertTrue(queue.take()); - Assert.assertTrue(Arrays.equals(data, zk.getData(path, false, null))); - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java deleted file mode 100644 index 4213059..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.zookeeper; - -import java.util.concurrent.SynchronousQueue; - -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.test.ClientBase; -import org.junit.After; -import org.junit.Before; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Callback; -import org.apache.bookkeeper.test.PortManager; - -/** - * This is a base class for any tests that need a ZooKeeper client/server setup. - * - */ -public abstract class ZooKeeperTestBase extends ClientBase { - - protected ZooKeeper zk; - - protected SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>(); - - protected Callback<Void> cb = new Callback<Void>() { - - @Override - public void operationFinished(Object ctx, Void result) { - new Thread(new Runnable() { - public void run() { - ConcurrencyUtils.put(queue, true); - } - }).start(); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - new Thread(new Runnable() { - public void run() { - ConcurrencyUtils.put(queue, false); - } - }).start(); - } - }; - - protected AsyncCallback.StringCallback strCb = new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - ConcurrencyUtils.put(queue, rc == Code.OK.intValue()); - } - }; - - protected AsyncCallback.VoidCallback voidCb = new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - ConcurrencyUtils.put(queue, rc == Code.OK.intValue()); - } - }; - - @Override - @Before - public void setUp() throws Exception { - hostPort = "127.0.0.1:" + PortManager.nextFreePort(); - super.setUp(); - zk = createClient(); - } - - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); - zk.close(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/resources/log4j.properties b/hedwig-server/src/test/resources/log4j.properties deleted file mode 100644 index 5983f0b..0000000 --- a/hedwig-server/src/test/resources/log4j.properties +++ /dev/null @@ -1,72 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# - -# -# Hedwig Logging Configuration -# - -# Format is "<default threshold> (, <appender>)+ - -# DEFAULT: console appender only -log4j.rootLogger=INFO, CONSOLE - -# Example with rolling log file -#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE - -# Example with rolling log file and tracing -#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE - -# -# Log INFO level and above messages to the console -# -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=INFO -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n - -# -# Add ROLLINGFILE to rootLogger to get log file output -# Log DEBUG level and above messages to a log file -log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender -log4j.appender.ROLLINGFILE.Threshold=DEBUG -log4j.appender.ROLLINGFILE.File=hedwig-server.log -log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n - -# Max log file size of 10MB -log4j.appender.ROLLINGFILE.MaxFileSize=10MB -# uncomment the next line to limit number of backup files -#log4j.appender.ROLLINGFILE.MaxBackupIndex=10 - -log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n - - -# -# Add TRACEFILE to rootLogger to get log file output -# Log DEBUG level and above messages to a log file -log4j.appender.TRACEFILE=org.apache.log4j.FileAppender -log4j.appender.TRACEFILE.Threshold=TRACE -log4j.appender.TRACEFILE.File=hedwig_trace.log - -log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout -### Notice we are including log4j's NDC here (%x) -log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index da6867d..3b8b233 100644 --- a/pom.xml +++ b/pom.xml @@ -33,10 +33,6 @@ <inceptionYear>2011</inceptionYear> <modules> <module>compat-deps</module> - <module>hedwig-client</module> - <module>hedwig-server</module> - <module>hedwig-protocol</module> - <module>hedwig-client-jms</module> <module>bookkeeper-stats</module> <module>bookkeeper-server</module> <module>bookkeeper-benchmark</module> @@ -87,17 +83,12 @@ <artifactId>maven-javadoc-plugin</artifactId> <version>2.8</version> <configuration> - <additionalparam>-exclude org.apache.hedwig.client.netty:org.apache.hedwig.client.benchmark:org.apache.hedwig.client.data:org.apache.hedwig.client.exceptions:org.apache.hedwig.client.handlers:org.apache.hedwig.client.ssl</additionalparam> - <subpackages>org.apache.bookkeeper.client:org.apache.bookkeeper.conf:org.apache.hedwig.client:org.apache.hedwig.util:org.apache.hedwig.protocol:org.apache.hedwig.exceptions</subpackages> + <subpackages>org.apache.bookkeeper.client:org.apache.bookkeeper.conf</subpackages> <groups> <group> <title>Bookkeeper</title> <packages>org.apache.bookkeeper*</packages> </group> - <group> - <title>Hedwig</title> - <packages>org.apache.hedwig*</packages> - </group> </groups> </configuration> <executions>
