http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java deleted file mode 100644 index 52a5874..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import org.apache.hedwig.protocol.PubSubProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.Test; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.HelperMethods; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.server.topics.TopicOwnershipChangeListener; -import org.apache.hedwig.util.Callback; - -import static org.junit.Assert.*; - -public abstract class TestPersistenceManagerBlackBox { - protected PersistenceManager persistenceManager; - protected int NUM_MESSAGES_TO_TEST = 5; - protected int NUM_TOPICS_TO_TEST = 5; - private static final Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class); - TestCallback testCallback = new TestCallback(); - - RuntimeException failureException; - - class TestCallback implements Callback<PubSubProtocol.MessageSeqId> { - - public void operationFailed(Object ctx, PubSubException exception) { - throw (failureException = new RuntimeException(exception)); - } - - @SuppressWarnings("unchecked") - public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) { - LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx; - try { - statusQueue.put(true); - } catch (InterruptedException e) { - throw (failureException = new RuntimeException(e)); - } - } - } - - class RangeScanVerifierListener implements ScanCallback { - List<Message> pubMsgs; - - public RangeScanVerifierListener(List<Message> pubMsgs) { - this.pubMsgs = pubMsgs; - } - - public void messageScanned(Object ctx, Message recvMessage) { - if (pubMsgs.isEmpty()) { - throw (failureException = new RuntimeException("Message received when none expected")); - } - - Message pubMsg = pubMsgs.get(0); - if (!HelperMethods.areEqual(recvMessage, pubMsg)) { - throw (failureException = new RuntimeException("Scanned message not equal to expected")); - } - pubMsgs.remove(0); - } - - public void scanFailed(Object ctx, Exception exception) { - throw (failureException = new RuntimeException(exception)); - } - - @SuppressWarnings("unchecked") - public void scanFinished(Object ctx, ReasonForFinish reason) { - if (reason != ReasonForFinish.NO_MORE_MESSAGES) { - throw (failureException = new RuntimeException("Scan finished prematurely " + reason)); - } - LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx; - try { - statusQueue.put(true); - } catch (InterruptedException e) { - throw (failureException = new RuntimeException(e)); - } - } - - } - - class PointScanVerifierListener implements ScanCallback { - List<Message> pubMsgs; - ByteString topic; - - public PointScanVerifierListener(List<Message> pubMsgs, ByteString topic) { - this.topic = topic; - this.pubMsgs = pubMsgs; - } - - @SuppressWarnings("unchecked") - public void messageScanned(Object ctx, Message recvMessage) { - - Message pubMsg = pubMsgs.get(0); - if (!HelperMethods.areEqual(recvMessage, pubMsg)) { - throw (failureException = new RuntimeException("Scanned message not equal to expected")); - } - pubMsgs.remove(0); - - if (pubMsgs.isEmpty()) { - LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx; - try { - statusQueue.put(true); - } catch (InterruptedException e) { - throw (failureException = new RuntimeException(e)); - } - } else { - long seqId = recvMessage.getMsgId().getLocalComponent(); - seqId = persistenceManager.getSeqIdAfterSkipping(topic, seqId, 1); - ScanRequest request = new ScanRequest(topic, seqId, new PointScanVerifierListener(pubMsgs, topic), ctx); - persistenceManager.scanSingleMessage(request); - } - - } - - public void scanFailed(Object ctx, Exception exception) { - throw (failureException = new RuntimeException(exception)); - } - - public void scanFinished(Object ctx, ReasonForFinish reason) { - - } - - } - - class ScanVerifier implements Runnable { - List<Message> pubMsgs; - ByteString topic; - LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>(); - - public ScanVerifier(ByteString topic, List<Message> pubMsgs) { - this.topic = topic; - this.pubMsgs = pubMsgs; - } - - public void run() { - // start the scan - try { - if (persistenceManager instanceof PersistenceManagerWithRangeScan) { - - ScanCallback listener = new RangeScanVerifierListener(pubMsgs); - - PersistenceManagerWithRangeScan rangePersistenceManager = (PersistenceManagerWithRangeScan) persistenceManager; - - rangePersistenceManager.scanMessages(new RangeScanRequest(topic, getLowestSeqId(), - NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue)); - - } else { - - ScanCallback listener = new PointScanVerifierListener(pubMsgs, topic); - persistenceManager - .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue)); - - } - // now listen for it to finish - // wait a maximum of a minute - Boolean b = statusQueue.poll(60, TimeUnit.SECONDS); - if (b == null) { - throw (failureException = new RuntimeException("Scanning timed out")); - } - } catch (InterruptedException e) { - throw (failureException = new RuntimeException(e)); - } - } - } - - class Publisher implements Runnable { - List<Message> pubMsgs; - ByteString topic; - - public Publisher(ByteString topic, List<Message> pubMsgs) { - this.pubMsgs = pubMsgs; - this.topic = topic; - } - - public void run() { - LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>(); - - for (Message msg : pubMsgs) { - - try { - persistenceManager.persistMessage(new PersistRequest(topic, msg, testCallback, statusQueue)); - // wait a maximum of a minute - Boolean b = statusQueue.poll(60, TimeUnit.SECONDS); - if (b == null) { - throw (failureException = new RuntimeException("Scanning timed out")); - } - } catch (InterruptedException e) { - throw (failureException = new RuntimeException(e)); - } - } - } - - } - - public void setUp() throws Exception { - logger.info("STARTING " + getClass()); - persistenceManager = instantiatePersistenceManager(); - failureException = null; - logger.info("Persistence Manager test setup finished"); - } - - abstract long getLowestSeqId(); - - abstract PersistenceManager instantiatePersistenceManager() throws Exception; - - public void tearDown() throws Exception { - logger.info("tearDown starting"); - persistenceManager.stop(); - logger.info("FINISHED " + getClass()); - } - - protected ByteString getTopicName(int number) { - return ByteString.copyFromUtf8("topic" + number); - } - - @Test(timeout=60000) - public void testPersistenceManager() throws Exception { - List<Thread> publisherThreads = new LinkedList<Thread>(); - List<Thread> scannerThreads = new LinkedList<Thread>(); - Thread thread; - Semaphore latch = new Semaphore(1); - - for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) { - ByteString topic = getTopicName(i); - - if (persistenceManager instanceof TopicOwnershipChangeListener) { - - TopicOwnershipChangeListener tocl = (TopicOwnershipChangeListener) persistenceManager; - - latch.acquire(); - - tocl.acquiredTopic(topic, new Callback<Void>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - failureException = new RuntimeException(exception); - ((Semaphore) ctx).release(); - } - - @Override - public void operationFinished(Object ctx, Void res) { - ((Semaphore) ctx).release(); - } - }, latch); - - latch.acquire(); - latch.release(); - if (failureException != null) { - throw (Exception) failureException.getCause(); - } - } - List<Message> msgs = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, 1024); - - thread = new Thread(new Publisher(topic, msgs)); - publisherThreads.add(thread); - thread.start(); - - thread = new Thread(new ScanVerifier(topic, msgs)); - scannerThreads.add(thread); - } - for (Thread t : publisherThreads) { - t.join(); - } - - for (Thread t : scannerThreads) { - t.start(); - } - - for (Thread t : scannerThreads) { - t.join(); - } - - assertEquals(null, failureException); - for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) { - assertEquals(persistenceManager.getCurrentSeqIdForTopic(getTopicName(i)).getLocalComponent(), - getExpectedSeqId(NUM_MESSAGES_TO_TEST)); - } - - } - - abstract long getExpectedSeqId(int numPublished); - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java deleted file mode 100644 index 2e59a8a..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.hedwig.server.common.ServerConfiguration; -import org.junit.After; -import org.junit.Before; - -public class TestReadAheadCacheBlackBox extends TestPersistenceManagerBlackBox { - - @After - @Override - public void tearDown() throws Exception { - super.tearDown(); - LocalDBPersistenceManager.instance().reset(); - } - - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - } - - @Override - long getExpectedSeqId(int numPublished) { - return numPublished; - } - - @Override - long getLowestSeqId() { - return 1; - } - - @Override - PersistenceManager instantiatePersistenceManager() { - return new ReadAheadCache(LocalDBPersistenceManager.instance(), new ServerConfiguration()).start(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java deleted file mode 100644 index ae08005..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java +++ /dev/null @@ -1,302 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import static org.junit.Assert.*; - -import java.util.List; - -import org.apache.hedwig.protocol.PubSubProtocol; -import org.junit.Before; -import org.junit.Test; - -import com.google.protobuf.ByteString; -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.HelperMethods; -import org.apache.hedwig.StubCallback; -import org.apache.hedwig.StubScanCallback; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.ConcurrencyUtils; - -public class TestReadAheadCacheWhiteBox { - ByteString topic = ByteString.copyFromUtf8("testTopic"); - final static int NUM_MESSAGES = 10; - final static int MSG_SIZE = 50; - List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES, MSG_SIZE); - StubPersistenceManager stubPersistenceManager; - ReadAheadCache cacheBasedPersistenceManager; - MyServerConfiguration myConf = new MyServerConfiguration(); - - class MyReadAheadCache extends ReadAheadCache { - public MyReadAheadCache(PersistenceManagerWithRangeScan persistenceManger, ServerConfiguration cfg) { - super(persistenceManger, cfg); - } - - @Override - protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) { - // make it perform in the same thread - obj.performRequest(); - } - - } - - class MyServerConfiguration extends ServerConfiguration { - - // Note these are set up, so that the size limit will be reached before - // the count limit - int readAheadCount = NUM_MESSAGES / 2; - long readAheadSize = (long) (MSG_SIZE * 2.5); - long maxCacheSize = Integer.MAX_VALUE; - long cacheEntryTTL = 0L; - - @Override - public int getReadAheadCount() { - return readAheadCount; - } - - @Override - public long getReadAheadSizeBytes() { - return readAheadSize; - } - - @Override - public long getMaximumCacheSize() { - return maxCacheSize; - } - - @Override - public long getCacheEntryTTL() { - return cacheEntryTTL; - } - - } - - @Before - public void setUp() throws Exception { - stubPersistenceManager = new StubPersistenceManager(); - cacheBasedPersistenceManager = new MyReadAheadCache(stubPersistenceManager, myConf).start(); - } - - @Test(timeout=60000) - public void testPersistMessage() throws Exception { - StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>(); - PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null); - - stubPersistenceManager.failure = true; - cacheBasedPersistenceManager.persistMessage(request); - assertNotNull(ConcurrencyUtils.take(callback.queue).right()); - - CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic) - .getLocalComponent()); - assertFalse(cacheBasedPersistenceManager.cache.containsKey(key)); - - stubPersistenceManager.failure = false; - persistMessage(messages.get(0)); - } - - private void persistMessage(Message msg) throws Exception { - StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>(); - PersistRequest request = new PersistRequest(topic, msg, callback, null); - cacheBasedPersistenceManager.persistMessage(request); - assertNotNull(ConcurrencyUtils.take(callback.queue).left()); - CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic) - .getLocalComponent()); - CacheValue cacheValue = cacheBasedPersistenceManager.cache.get(key); - assertNotNull(cacheValue); - assertFalse(cacheValue.isStub()); - assertTrue(HelperMethods.areEqual(cacheValue.getMessage(), msg)); - - } - - @Test(timeout=60000) - public void testScanSingleMessage() throws Exception { - StubScanCallback callback = new StubScanCallback(); - ScanRequest request = new ScanRequest(topic, 1, callback, null); - stubPersistenceManager.failure = true; - - cacheBasedPersistenceManager.scanSingleMessage(request); - assertTrue(callback.isFailed()); - assertTrue(0 == cacheBasedPersistenceManager.cache.size()); - - stubPersistenceManager.failure = false; - cacheBasedPersistenceManager.scanSingleMessage(request); - assertTrue(myConf.readAheadCount == cacheBasedPersistenceManager.cache.size()); - - persistMessage(messages.get(0)); - assertTrue(callback.isSuccess()); - - } - - @Test(timeout=60000) - public void testDeliveredUntil() throws Exception { - for (Message m : messages) { - persistMessage(m); - } - assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get()); - long middle = messages.size() / 2; - cacheBasedPersistenceManager.deliveredUntil(topic, middle); - - assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size()); - - long middle2 = middle - 1; - cacheBasedPersistenceManager.deliveredUntil(topic, middle2); - // should have no effect - assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size()); - - // delivered all messages - cacheBasedPersistenceManager.deliveredUntil(topic, (long) messages.size()); - // should have no effect - assertTrue(cacheBasedPersistenceManager.cache.isEmpty()); - assertTrue(cacheBasedPersistenceManager.cacheSegment.get() - .timeIndexOfAddition.isEmpty()); - assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty()); - assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize.get()); - - } - - @Test(timeout=60000) - public void testDoReadAhead() { - StubScanCallback callback = new StubScanCallback(); - ScanRequest request = new ScanRequest(topic, 1, callback, null); - cacheBasedPersistenceManager.doReadAhead(request); - - assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size()); - - request = new ScanRequest(topic, myConf.readAheadCount / 2 - 1, callback, null); - cacheBasedPersistenceManager.doReadAhead(request); - assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size()); - - request = new ScanRequest(topic, myConf.readAheadCount / 2 + 2, callback, null); - cacheBasedPersistenceManager.doReadAhead(request); - assertEquals((int) (1.5 * myConf.readAheadCount), cacheBasedPersistenceManager.cache.size()); - - } - - @Test(timeout=60000) - public void testReadAheadSizeLimit() throws Exception { - for (Message m : messages) { - persistMessage(m); - } - cacheBasedPersistenceManager.cache.clear(); - StubScanCallback callback = new StubScanCallback(); - ScanRequest request = new ScanRequest(topic, 1, callback, null); - cacheBasedPersistenceManager.scanSingleMessage(request); - - assertTrue(callback.isSuccess()); - assertEquals((int) Math.ceil(myConf.readAheadSize / (MSG_SIZE + 0.0)), cacheBasedPersistenceManager.cache - .size()); - - } - - @Test(timeout=60000) - public void testDoReadAheadStartingFrom() throws Exception { - persistMessage(messages.get(0)); - int readAheadCount = 5; - int start = 1; - RangeScanRequest readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, start, - readAheadCount); - assertNull(readAheadRequest); - - StubScanCallback callback = new StubScanCallback(); - int end = 100; - ScanRequest request = new ScanRequest(topic, end, callback, null); - cacheBasedPersistenceManager.doReadAhead(request); - - int pos = 98; - readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount); - assertEquals(readAheadRequest.messageLimit, end - pos); - - end = 200; - request = new ScanRequest(topic, end, callback, null); - cacheBasedPersistenceManager.doReadAhead(request); - - // too far back - pos = 150; - readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount); - assertEquals(readAheadRequest.messageLimit, readAheadCount); - } - - @Test(timeout=60000) - public void testAddMessageToCache() { - CacheKey key = new CacheKey(topic, 1); - cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now()); - assertEquals(1, cacheBasedPersistenceManager.cache.size()); - assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get()); - assertEquals(1, cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).size()); - assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L)); - - CacheValue value = cacheBasedPersistenceManager.cache.get(key); - assertTrue(cacheBasedPersistenceManager.cacheSegment.get() - .timeIndexOfAddition.get(value.timeOfAddition).contains(key)); - } - - @Test(timeout=60000) - public void testRemoveMessageFromCache() { - CacheKey key = new CacheKey(topic, 1); - cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now()); - cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true); - assertTrue(cacheBasedPersistenceManager.cache.isEmpty()); - assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty()); - assertTrue(cacheBasedPersistenceManager.cacheSegment.get() - .timeIndexOfAddition.isEmpty()); - } - - @Test(timeout=60000) - public void testCollectOldCacheEntries() { - int i = 1; - for (Message m : messages) { - CacheKey key = new CacheKey(topic, i); - cacheBasedPersistenceManager.addMessageToCache(key, m, i); - i++; - } - - int n = 2; - myConf.maxCacheSize = n * MSG_SIZE * myConf.getNumReadAheadCacheThreads(); - cacheBasedPersistenceManager.reloadConf(myConf); - cacheBasedPersistenceManager.collectOldOrExpiredCacheEntries( - cacheBasedPersistenceManager.cacheSegment.get()); - assertEquals(n, cacheBasedPersistenceManager.cache.size()); - assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get() - .timeIndexOfAddition.size()); - } - - @Test(timeout=60000) - public void testCollectExpiredCacheEntries() throws Exception { - int i = 1; - int n = 2; - long ttl = 5000L; - myConf.cacheEntryTTL = ttl; - long curTime = MathUtils.now(); - cacheBasedPersistenceManager.reloadConf(myConf); - for (Message m : messages) { - CacheKey key = new CacheKey(topic, i); - cacheBasedPersistenceManager.addMessageToCache(key, m, curTime++); - if (i == NUM_MESSAGES - n) { - Thread.sleep(2 * ttl); - curTime += 2 * ttl; - } - i++; - } - - assertEquals(n, cacheBasedPersistenceManager.cache.size()); - assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get() - .timeIndexOfAddition.size()); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java deleted file mode 100644 index 26b2ce3..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import java.util.concurrent.ScheduledExecutorService; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; - -public class StubSubscriptionManager extends InMemorySubscriptionManager { - boolean fail = false; - - public void setFail(boolean fail) { - this.fail = fail; - } - - public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, DeliveryManager dm, - ServerConfiguration conf, ScheduledExecutorService scheduler) { - super(conf, tm, pm, dm, scheduler); - } - - @Override - public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId, - Callback<SubscriptionData> callback, Object ctx) { - if (fail) { - callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail")); - return; - } - super.serveSubscribeRequest(topic, subRequest, consumeSeqId, callback, ctx); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java deleted file mode 100644 index 0e0f670..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.Before; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.persistence.LocalDBPersistenceManager; -import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.zookeeper.ZooKeeperTestBase; - -public class TestMMSubscriptionManager extends ZooKeeperTestBase { - MetadataManagerFactory mm; - MMSubscriptionManager sm; - ServerConfiguration cfg = new ServerConfiguration(); - SynchronousQueue<Either<SubscriptionData, PubSubException>> subDataCallbackQueue = new SynchronousQueue<Either<SubscriptionData, PubSubException>>(); - SynchronousQueue<Either<Boolean, PubSubException>> BooleanCallbackQueue = new SynchronousQueue<Either<Boolean, PubSubException>>(); - - Callback<Void> voidCallback; - Callback<SubscriptionData> subDataCallback; - - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - cfg = new ServerConfiguration(); - final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - mm = MetadataManagerFactory.newMetadataManagerFactory(cfg, zk); - sm = new MMSubscriptionManager(cfg, mm, new TrivialOwnAllTopicManager(cfg, scheduler), - LocalDBPersistenceManager.instance(), null, scheduler); - subDataCallback = new Callback<SubscriptionData>() { - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - scheduler.execute(new Runnable() { - public void run() { - ConcurrencyUtils.put(subDataCallbackQueue, Either.of((SubscriptionData) null, exception)); - } - }); - } - - @Override - public void operationFinished(Object ctx, final SubscriptionData resultOfOperation) { - scheduler.execute(new Runnable() { - public void run() { - ConcurrencyUtils.put(subDataCallbackQueue, Either.of(resultOfOperation, (PubSubException) null)); - } - }); - } - }; - - voidCallback = new Callback<Void>() { - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - scheduler.execute(new Runnable() { - public void run() { - ConcurrencyUtils.put(BooleanCallbackQueue, Either.of((Boolean) null, exception)); - } - }); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - scheduler.execute(new Runnable() { - public void run() { - ConcurrencyUtils.put(BooleanCallbackQueue, Either.of(true, (PubSubException) null)); - } - }); - } - }; - - } - - @Test(timeout=60000) - public void testBasics() throws Exception { - - ByteString topic1 = ByteString.copyFromUtf8("topic1"); - ByteString sub1 = ByteString.copyFromUtf8("sub1"); - - // - // No topics acquired. - // - SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(sub1).build(); - MessageSeqId msgId = MessageSeqId.newBuilder().setLocalComponent(100).build(); - - sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null); - - Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(), - PubSubException.ServerNotResponsibleForTopicException.class); - - sm.unsubscribe(topic1, sub1, voidCallback, null); - - Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(), - PubSubException.ServerNotResponsibleForTopicException.class); - - // - // Acquire topic. - // - - sm.acquiredTopic(topic1, voidCallback, null); - Assert.assertTrue(BooleanCallbackQueue.take().left()); - - Assert.assertTrue(sm.top2sub2seq.containsKey(topic1)); - Assert.assertEquals(0, sm.top2sub2seq.get(topic1).size()); - - sm.unsubscribe(topic1, sub1, voidCallback, null); - Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(), - PubSubException.ClientNotSubscribedException.class); - - // - // Try to attach to a subscription. - subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1) - .build(); - - sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null); - Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(), - PubSubException.ClientNotSubscribedException.class); - - // now create - subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).setSubscriberId(sub1) - .build(); - sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null); - Assert.assertEquals(msgId.getLocalComponent(), ConcurrencyUtils.take(subDataCallbackQueue).left().getState().getMsgId().getLocalComponent()); - Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId() - .getLocalComponent()); - - // try to create again - sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null); - Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(), - PubSubException.ClientAlreadySubscribedException.class); - Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId() - .getLocalComponent()); - - sm.lostTopic(topic1); - sm.acquiredTopic(topic1, voidCallback, null); - Assert.assertTrue(BooleanCallbackQueue.take().left()); - - // try to attach - subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1) - .build(); - MessageSeqId msgId1 = MessageSeqId.newBuilder().setLocalComponent(msgId.getLocalComponent() + 10).build(); - sm.serveSubscribeRequest(topic1, subRequest, msgId1, subDataCallback, null); - Assert.assertEquals(msgId.getLocalComponent(), subDataCallbackQueue.take().left().getState().getMsgId().getLocalComponent()); - Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId() - .getLocalComponent()); - - // now manipulate the consume ptrs - // dont give it enough to have it persist to ZK - MessageSeqId msgId2 = MessageSeqId.newBuilder().setLocalComponent( - msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build(); - sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId2, voidCallback, null); - Assert.assertTrue(BooleanCallbackQueue.take().left()); - Assert.assertEquals(msgId2.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId() - .getLocalComponent()); - Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId() - .getLocalComponent()); - - // give it more so that it will write to ZK - MessageSeqId msgId3 = MessageSeqId.newBuilder().setLocalComponent( - msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build(); - sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId3, voidCallback, null); - Assert.assertTrue(BooleanCallbackQueue.take().left()); - - sm.lostTopic(topic1); - sm.acquiredTopic(topic1, voidCallback, null); - Assert.assertTrue(BooleanCallbackQueue.take().left()); - - Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId() - .getLocalComponent()); - Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId() - .getLocalComponent()); - - // finally unsubscribe - sm.unsubscribe(topic1, sub1, voidCallback, null); - Assert.assertTrue(BooleanCallbackQueue.take().left()); - - sm.lostTopic(topic1); - sm.acquiredTopic(topic1, voidCallback, null); - Assert.assertTrue(BooleanCallbackQueue.take().left()); - Assert.assertFalse(sm.top2sub2seq.get(topic1).containsKey(sub1)); - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java deleted file mode 100644 index d5569de..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import java.util.concurrent.SynchronousQueue; - -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.protobuf.ByteString; - -public class TestUpdateSubscriptionState extends HedwigHubTestBase { - - private static final int RETENTION_SECS_VALUE = 100; - - // Client side variables - protected HedwigClient client; - protected Publisher publisher; - protected Subscriber subscriber; - - // SynchronousQueues to verify async calls - private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>(); - - // Test implementation of subscriber's message handler - class OrderCheckingMessageHandler implements MessageHandler { - - ByteString topic; - ByteString subscriberId; - int startMsgId; - int numMsgs; - int endMsgId; - boolean inOrder = true; - - OrderCheckingMessageHandler(ByteString topic, ByteString subscriberId, - int startMsgId, int numMsgs) { - this.topic = topic; - this.subscriberId = subscriberId; - this.startMsgId = startMsgId; - this.numMsgs = numMsgs; - this.endMsgId = startMsgId + numMsgs - 1; - } - - @Override - public void deliver(ByteString thisTopic, ByteString thisSubscriberId, - Message msg, Callback<Void> callback, Object context) { - if (!topic.equals(thisTopic) || - !subscriberId.equals(thisSubscriberId)) { - return; - } - // check order - int msgId = Integer.parseInt(msg.getBody().toStringUtf8()); - if (logger.isDebugEnabled()) { - logger.debug("Received message : " + msgId); - } - - if (inOrder) { - if (startMsgId != msgId) { - logger.error("Expected message " + startMsgId + ", but received message " + msgId); - inOrder = false; - } else { - ++startMsgId; - } - } - callback.operationFinished(context, null); - if (msgId == endMsgId) { - new Thread(new Runnable() { - @Override - public void run() { - if (logger.isDebugEnabled()) { - logger.debug("Deliver finished!"); - } - ConcurrencyUtils.put(queue, true); - } - }).start(); - } - } - - public boolean isInOrder() { - return inOrder; - } - } - - public TestUpdateSubscriptionState() { - super(1); - } - - protected class NewHubServerConfiguration extends HubServerConfiguration { - - public NewHubServerConfiguration(int serverPort, int sslServerPort) { - super(serverPort, sslServerPort); - } - - @Override - public int getRetentionSecs() { - return RETENTION_SECS_VALUE; - } - - } - - @Override - protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) { - return new NewHubServerConfiguration(serverPort, sslServerPort); - } - - protected class TestClientConfiguration extends HubClientConfiguration { - @Override - public boolean isAutoSendConsumeMessageEnabled() { - return true; - } - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - client = new HedwigClient(new TestClientConfiguration()); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - @Override - @After - public void tearDown() throws Exception { - client.close(); - super.tearDown(); - } - - @Test(timeout=60000) - public void testConsumeWhenTopicRelease() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestConsumeWhenTopicRelease"); - ByteString subId = ByteString.copyFromUtf8("mysub"); - - int startMsgId = 0; - int numMsgs = 10; - // subscriber in client - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subId, opts); - // start delivery - OrderCheckingMessageHandler ocm = new OrderCheckingMessageHandler( - topic, subId, startMsgId, numMsgs); - subscriber.startDelivery(topic, subId, ocm); - for (int i=0; i<numMsgs; i++) { - Message msg = Message.newBuilder().setBody( - ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build(); - publisher.publish(topic, msg); - } - logger.info("Publish finished."); - queue.take(); - logger.info("Deliver finished."); - // check messages received in order - assertTrue(ocm.isInOrder()); - - // wait for retention secs - Thread.sleep((RETENTION_SECS_VALUE + 2) * 1000); - - subscriber.stopDelivery(topic, subId); - subscriber.closeSubscription(topic, subId); - - startMsgId = 20; - // reconnect it again - subscriber.subscribe(topic, subId, opts); - ocm = new OrderCheckingMessageHandler(topic, subId, startMsgId, numMsgs); - subscriber.startDelivery(topic, subId, ocm); - for (int i=0; i<numMsgs; i++) { - Message msg = Message.newBuilder().setBody( - ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build(); - publisher.publish(topic, msg); - } - queue.take(); - // check messages received in order - assertTrue(ocm.isInOrder()); - } - - @Test(timeout=60000) - public void testConsumeWhenHubShutdown() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestConsumeWhenHubShutdown"); - ByteString subId = ByteString.copyFromUtf8("mysub"); - - int startMsgId = 0; - int numMsgs = 10; - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - // subscriber in client - subscriber.subscribe(topic, subId, opts); - // start delivery - OrderCheckingMessageHandler ocm = new OrderCheckingMessageHandler( - topic, subId, startMsgId, numMsgs); - subscriber.startDelivery(topic, subId, ocm); - for (int i=0; i<numMsgs; i++) { - Message msg = Message.newBuilder().setBody( - ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build(); - publisher.publish(topic, msg); - } - logger.info("Publish finished."); - queue.take(); - logger.info("Deliver finished."); - // check messages received in order - assertTrue(ocm.isInOrder()); - // make sure consume request sent to hub server before shut down - Thread.sleep(2000); - subscriber.stopDelivery(topic, subId); - subscriber.closeSubscription(topic, subId); - - stopHubServers(); - Thread.sleep(1000); - startHubServers(); - - startMsgId = 20; - // reconnect it again - subscriber.subscribe(topic, subId, opts); - ocm = new OrderCheckingMessageHandler(topic, subId, startMsgId, numMsgs); - subscriber.startDelivery(topic, subId, ocm); - for (int i=0; i<numMsgs; i++) { - Message msg = Message.newBuilder().setBody( - ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build(); - publisher.publish(topic, msg); - } - queue.take(); - // check messages received in order - assertTrue(ocm.isInOrder()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java deleted file mode 100644 index b66196e..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.net.UnknownHostException; -import java.util.concurrent.Executors; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; - -public class StubTopicManager extends TrivialOwnAllTopicManager { - - boolean shouldOwnEveryNewTopic = false; - boolean shouldError = false; - - public void setShouldOwnEveryNewTopic(boolean shouldOwnEveryNewTopic) { - this.shouldOwnEveryNewTopic = shouldOwnEveryNewTopic; - } - - public void setShouldError(boolean shouldError) { - this.shouldError = shouldError; - } - - public StubTopicManager(ServerConfiguration conf) throws UnknownHostException { - super(conf, Executors.newSingleThreadScheduledExecutor()); - } - - @Override - protected void realGetOwner(ByteString topic, boolean shouldClaim, - Callback<HedwigSocketAddress> cb, Object ctx) { - - if (shouldError) { - cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail")); - return; - } - if (null != topics.getIfPresent(topic) // already own it - || shouldOwnEveryNewTopic) { - super.realGetOwner(topic, shouldClaim, cb, ctx); - return; - } else { - // return some other address - cb.operationFinished(ctx, new HedwigSocketAddress("124.31.0.1:80")); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java deleted file mode 100644 index 04fb451..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.junit.Test; - -import com.google.protobuf.ByteString; - -public class TestConcurrentTopicAcquisition extends HedwigHubTestBase { - - // Client variables - protected HedwigClient client; - protected Publisher publisher; - protected Subscriber subscriber; - - final LinkedBlockingQueue<ByteString> subscribers = - new LinkedBlockingQueue<ByteString>(); - final ByteString topic = ByteString.copyFromUtf8("concurrent-topic"); - final int numSubscribers = 300; - final AtomicInteger numDone = new AtomicInteger(0); - - // SynchronousQueues to verify async calls - private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>(); - - class SubCallback implements Callback<Void> { - - ByteString subId; - - public SubCallback(ByteString subId) { - this.subId = subId; - } - - @Override - public void operationFinished(Object ctx, - Void resultOfOperation) { - if (logger.isDebugEnabled()) { - logger.debug("subscriber " + subId.toStringUtf8() + " succeed."); - } - int done = numDone.incrementAndGet(); - if (done == numSubscribers) { - ConcurrencyUtils.put(queue, false); - } - } - - @Override - public void operationFailed(Object ctx, - PubSubException exception) { - if (logger.isDebugEnabled()) { - logger.debug("subscriber " + subId.toStringUtf8() + " failed : ", exception); - } - ConcurrencyUtils.put(subscribers, subId); - // ConcurrencyUtils.put(queue, false); - } - } - - @Override - public void setUp() throws Exception { - super.setUp(); - client = new HedwigClient(new HubClientConfiguration()); - - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - @Override - public void tearDown() throws Exception { - // sub.interrupt(); - // sub.join(); - - client.close(); - super.tearDown(); - } - - @Test(timeout=60000) - public void testTopicAcquistion() throws Exception { - logger.info("Start concurrent topic acquistion test."); - - // let one bookie down to cause not enough bookie exception - logger.info("Tear down one bookie server."); - bktb.tearDownOneBookieServer(); - - // In current implementation, the first several subscriptions will succeed to put topic in topic manager set, - // because the tear down bookie server's zk node need time to disappear - // some subscriptions will create ledger successfully, then other subscriptions will fail. - // the race condition will be: topic manager own topic but persistence manager doesn't - - // 300 subscribers subscribe to a same topic - final AtomicBoolean inRedirectLoop = new AtomicBoolean(false); - numDone.set(0); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - for (int i=0; i<numSubscribers; i++) { - ByteString subId = ByteString.copyFromUtf8("sub-" + i); - if (logger.isDebugEnabled()) { - logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8()); - } - subscriber.asyncSubscribe(topic, subId, opts, - new Callback<Void>() { - - private void tick() { - if (numDone.incrementAndGet() == numSubscribers) { - ConcurrencyUtils.put(queue, true); - } - } - - @Override - public void operationFinished(Object ctx, - Void resultOfOperation) { - tick(); - } - - @Override - public void operationFailed(Object ctx, - PubSubException exception) { - if (exception instanceof PubSubException.ServiceDownException) { - String msg = exception.getMessage(); - if (msg.indexOf("ServerRedirectLoopException") > 0) { - inRedirectLoop.set(true); - } - if (logger.isDebugEnabled()) { - logger.debug("Operation failed : ", exception); - } - } - tick(); - } - - }, - null); - } - - queue.take(); - - // TODO: remove comment after we fix the issue - // Assert.assertEquals(false, inRedirectLoop.get()); - - // start a thread to send subscriptions - numDone.set(0); - Thread sub = new Thread(new Runnable() { - - @Override - public void run() { - logger.info("sub thread started"); - try { - // 100 subscribers subscribe to a same topic - for (int i=0; i<numSubscribers; i++) { - ByteString subscriberId = ByteString.copyFromUtf8("sub-" + i); - subscribers.put(subscriberId); - } - - ByteString subId; - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - while (true) { - subId = subscribers.take(); - - if (logger.isDebugEnabled()) { - logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8()); - } - subscriber.asyncSubscribe(topic, subId, opts, new SubCallback(subId), null); - } - // subscriber.asyncSubscribe(topic, subscriberId, mode, callback, context) - } catch (InterruptedException ie) { - // break - logger.warn("Interrupted : ", ie); - } - } - - }); - sub.start(); - Thread.sleep(2000); - - // start a new bookie server - logger.info("start new bookie server"); - bktb.startUpNewBookieServer(); - - // hope that all the subscriptions will be OK - queue.take(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java deleted file mode 100644 index 77c6fad..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.topics; - -import org.apache.hedwig.util.HedwigSocketAddress; - -import org.junit.Test; -import org.junit.Assert; - -public class TestHubInfo { - - @Test(timeout=60000) - public void testParseHubInfo() throws Exception { - HedwigSocketAddress addr = new HedwigSocketAddress("localhost", 9086, 9087); - HubInfo hubInfo1 = new HubInfo(addr, 9999); - - String strHubInfo1 = hubInfo1.toString(); - HubInfo parsedHubInfo1 = HubInfo.parse(strHubInfo1); - Assert.assertEquals("Hub infos should be same", hubInfo1, parsedHubInfo1); - - HubInfo hubInfo2 = new HubInfo(addr, 0); - HubInfo parsedHubInfo2 = HubInfo.parse("localhost:9086:9087"); - Assert.assertEquals("Hub infos w/o zxid should be same", hubInfo2, parsedHubInfo2); - - // parse empty string - try { - HubInfo.parse(""); - Assert.fail("Should throw InvalidHubInfoException parsing empty string."); - } catch (HubInfo.InvalidHubInfoException ihie) { - } - - // parse corrupted hostname - try { - HubInfo.parse("localhost,a,b,c"); - Assert.fail("Should throw InvalidHubInfoException parsing corrupted hostname."); - } catch (HubInfo.InvalidHubInfoException ihie) { - } - - // parse corrupted string - try { - HubInfo.parse("hostname: localhost:9086:9087"); - Assert.fail("Should throw InvalidHubInfoException parsing corrupted string."); - } catch (HubInfo.InvalidHubInfoException ihie) { - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java deleted file mode 100644 index f14d601..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.topics; - -import org.junit.Test; -import org.junit.Assert; - -public class TestHubLoad { - - @Test(timeout=60000) - public void testParseHubLoad() throws Exception { - HubLoad hubLoad1 = new HubLoad(9999); - - String strHubLoad1 = hubLoad1.toString(); - HubLoad parsedHubLoad1 = HubLoad.parse(strHubLoad1); - Assert.assertEquals("Hub load data should be same", hubLoad1, parsedHubLoad1); - - final int numTopics = 9998; - HubLoad hubLoad2 = new HubLoad(numTopics); - HubLoad parsedHubLoad2 = HubLoad.parse(numTopics + ""); - Assert.assertEquals("Hub load data not protobuf encoded should be same", hubLoad2, parsedHubLoad2); - - // parse empty string - try { - HubLoad.parse(""); - Assert.fail("Should throw InvalidHubLoadException parsing empty string."); - } catch (HubLoad.InvalidHubLoadException ihie) { - } - - // parse corrupted numTopics - try { - HubLoad.parse("9998_x"); - Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data."); - } catch (HubLoad.InvalidHubLoadException ihie) { - } - - // parse corrupted string - try { - HubLoad.parse("hostname: 9998_x"); - Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data."); - } catch (HubLoad.InvalidHubLoadException ihie) { - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java deleted file mode 100644 index c75ff05..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java +++ /dev/null @@ -1,354 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.StubCallback; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.CompositeException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.meta.MetadataManagerFactoryTestCase; -import org.apache.hedwig.server.meta.TopicOwnershipManager; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.hedwig.util.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestMMTopicManager extends MetadataManagerFactoryTestCase { - - private static final Logger LOG = LoggerFactory.getLogger(TestMMTopicManager.class); - - protected MMTopicManager tm; - protected TopicOwnershipManager tom; - - protected class CallbackQueue<T> implements Callback<T> { - SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>(); - - public SynchronousQueue<Either<T, Exception>> getQueue() { - return q; - } - - public Either<T, Exception> take() throws InterruptedException { - return q.take(); - } - - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - LOG.error("got exception: " + exception); - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception)); - } - }).start(); - } - - @Override - public void operationFinished(Object ctx, final T resultOfOperation) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null)); - } - }).start(); - } - } - - protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>(); - protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>(); - protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>(); - - protected ByteString topic = ByteString.copyFromUtf8("topic"); - protected HedwigSocketAddress me; - protected ScheduledExecutorService scheduler; - - public TestMMTopicManager(String metaManagerCls) { - super(metaManagerCls); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - me = conf.getServerAddr(); - scheduler = Executors.newSingleThreadScheduledExecutor(); - tom = metadataManagerFactory.newTopicOwnershipManager(); - tm = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler); - } - - @Override - @After - public void tearDown() throws Exception { - tom.close(); - tm.stop(); - super.tearDown(); - } - - @Test(timeout=60000) - public void testGetOwnerSingle() throws Exception { - tm.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - } - - protected ByteString mkTopic(int i) { - return ByteString.copyFromUtf8(topic.toStringUtf8() + i); - } - - protected <T> T check(Either<T, Exception> ex) throws Exception { - if (ex.left() == null) - throw ex.right(); - else - return ex.left(); - } - - public static class CustomServerConfiguration extends ServerConfiguration { - int port; - - public CustomServerConfiguration(int port) { - this.port = port; - } - - @Override - public int getServerPort() { - return port; - } - } - - @Test(timeout=60000) - public void testGetOwnerMulti() throws Exception { - ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1), - conf2 = new CustomServerConfiguration(conf.getServerPort() + 2); - MMTopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler), - tm2 = new MMTopicManager(conf2, zk, metadataManagerFactory, scheduler); - - tm.getOwner(topic, false, addrCbq, null); - HedwigSocketAddress owner = check(addrCbq.take()); - - for (int i = 0; i < 100; ++i) { - tm.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(owner, check(addrCbq.take())); - - tm1.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(owner, check(addrCbq.take())); - - tm2.getOwner(topic, false, addrCbq, null); - Assert.assertEquals(owner, check(addrCbq.take())); - } - - for (int i = 0; i < 100; ++i) { - if (!owner.equals(me)) - break; - tm.getOwner(mkTopic(i), false, addrCbq, null); - owner = check(addrCbq.take()); - if (i == 99) - Assert.fail("Never chose another owner"); - } - - tm1.stop(); - tm2.stop(); - } - - @Test(timeout=60000) - public void testLoadBalancing() throws Exception { - tm.getOwner(topic, false, addrCbq, null); - - Assert.assertEquals(me, check(addrCbq.take())); - - ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1); - TopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler); - - ByteString topic1 = mkTopic(1); - tm.getOwner(topic1, false, addrCbq, null); - Assert.assertEquals(conf1.getServerAddr(), check(addrCbq.take())); - - tm1.stop(); - } - - class StubOwnershipChangeListener implements TopicOwnershipChangeListener { - boolean failure; - SynchronousQueue<Pair<ByteString, Boolean>> bsQueue; - - public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>> bsQueue) { - this.bsQueue = bsQueue; - } - - public void setFailure(boolean failure) { - this.failure = failure; - } - - @Override - public void lostTopic(final ByteString topic) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(bsQueue, Pair.of(topic, false)); - } - }).start(); - } - - public void acquiredTopic(final ByteString topic, final Callback<Void> callback, final Object ctx) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(bsQueue, Pair.of(topic, true)); - if (failure) { - callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail")); - } else { - callback.operationFinished(ctx, null); - } - } - }).start(); - } - } - - @Test(timeout=60000) - public void testOwnershipChange() throws Exception { - SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString, Boolean>>(); - - StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue); - - tm.addTopicOwnershipChangeListener(listener); - - // regular acquire - tm.getOwner(topic, true, addrCbq, null); - Pair<ByteString, Boolean> pair = bsQueue.take(); - Assert.assertEquals(topic, pair.first()); - Assert.assertTrue(pair.second()); - Assert.assertEquals(me, check(addrCbq.take())); - assertOwnershipNodeExists(); - - // topic that I already own - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Assert.assertTrue(bsQueue.isEmpty()); - assertOwnershipNodeExists(); - - // regular release - tm.releaseTopic(topic, cb, null); - pair = bsQueue.take(); - Assert.assertEquals(topic, pair.first()); - Assert.assertFalse(pair.second()); - Assert.assertTrue(queue.take()); - assertOwnershipNodeDoesntExist(); - - // releasing topic that I don't own - tm.releaseTopic(mkTopic(0), cb, null); - Assert.assertTrue(queue.take()); - Assert.assertTrue(bsQueue.isEmpty()); - - // set listener to return error - listener.setFailure(true); - - tm.getOwner(topic, true, addrCbq, null); - pair = bsQueue.take(); - Assert.assertEquals(topic, pair.first()); - Assert.assertTrue(pair.second()); - Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right()) - .getExceptions().iterator().next().getClass()); - Assert.assertFalse(null != tm.topics.getIfPresent(topic)); - Thread.sleep(100); - assertOwnershipNodeDoesntExist(); - - } - - public void assertOwnershipNodeExists() throws Exception { - StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>(); - tom.readOwnerInfo(topic, callback, null); - Versioned<HubInfo> hubInfo = callback.queue.take().left(); - Assert.assertEquals(tm.addr, hubInfo.getValue().getAddress()); - } - - public void assertOwnershipNodeDoesntExist() throws Exception { - StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>(); - tom.readOwnerInfo(topic, callback, null); - Versioned<HubInfo> hubInfo = callback.queue.take().left(); - Assert.assertEquals(null, hubInfo); - } - - @Test(timeout=60000) - public void testZKClientDisconnected() throws Exception { - // First assert ownership of the topic - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - - // Suspend the ZKTopicManager and make sure calls to getOwner error out - tm.isSuspended = true; - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass()); - // Release the topic. This should not error out even if suspended. - tm.releaseTopic(topic, cb, null); - Assert.assertTrue(queue.take()); - assertOwnershipNodeDoesntExist(); - - // Restart the ZKTopicManager and make sure calls to getOwner are okay - tm.isSuspended = false; - tm.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - assertOwnershipNodeExists(); - } - - @Test(timeout=60000) - public void testRetentionAfterAccess() throws Exception { - conf.getConf().setProperty("retention_secs_after_access", "5"); - MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler); - tm1.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Thread.sleep(6000L); - tm1.topics.cleanUp(); - Thread.sleep(2000L); - assertOwnershipNodeDoesntExist(); - tm1.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Thread.sleep(1000L); - tm1.topics.cleanUp(); - Thread.sleep(2000L); - assertOwnershipNodeExists(); - - tm1.stop(); - } - - @Test(timeout=60000) - public void testMaxNumTopics() throws Exception { - conf.getConf().setProperty("max_num_topics", "1"); - MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler); - tm1.getOwner(topic, true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - assertOwnershipNodeExists(); - tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"), - true, addrCbq, null); - Assert.assertEquals(me, check(addrCbq.take())); - Thread.sleep(2000L); - assertOwnershipNodeDoesntExist(); - tm1.stop(); - } - - -}
