http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java deleted file mode 100644 index 08f287c..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.MessageHandler; -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; - -import org.apache.hedwig.client.api.Client; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.util.Callback; - -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.server.common.ServerConfiguration; - -public class MessageBoundedPersistenceTest extends HedwigHubTestBase { - protected static final Logger logger = LoggerFactory.getLogger(MessageBoundedPersistenceTest.class); - - protected class SmallReadAheadServerConfiguration - extends HedwigHubTestBase.HubServerConfiguration { - SmallReadAheadServerConfiguration(int serverPort, int sslServerPort) { - super(serverPort, sslServerPort); - } - public long getMaximumCacheSize() { - return 1; - } - - public int getReadAheadCount() { - return 1; - } - - public int getMessagesConsumedThreadRunInterval() { - return 1000; // run every second - } - } - - protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) { - return new SmallReadAheadServerConfiguration(serverPort, sslServerPort); - } - - private class MessageBoundClientConfiguration extends HubClientConfiguration { - final int messageBound; - - public MessageBoundClientConfiguration(int bound) { - this.messageBound = bound; - } - - public MessageBoundClientConfiguration() { - this(5); - } - - public int getSubscriptionMessageBound() { - return messageBound; - } - } - - private void sendXExpectLastY(Publisher pub, Subscriber sub, - ByteString topic, ByteString subid, - final int X, final int Y) throws Exception { - for (int i = 0; i < X; i++) { - pub.publish(topic, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.ATTACH).build(); - sub.subscribe(topic, subid, opts); - - final AtomicInteger expected = new AtomicInteger(X - Y); - final CountDownLatch latch = new CountDownLatch(1); - sub.startDelivery(topic, subid, new MessageHandler () { - synchronized public void deliver(ByteString topic, ByteString subscriberId, - Message msg, Callback<Void> callback, - Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - - if (value == expected.get()) { - expected.incrementAndGet(); - } else { - // error condition - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - latch.countDown(); - } - if (expected.get() == X) { - latch.countDown(); - } - callback.operationFinished(context, null); - } catch (Exception e) { - logger.error("Received bad message", e); - latch.countDown();// will error on match - } - } - }); - assertTrue("Timed out waiting for messages Y is " + Y - + " expected is currently " + expected.get(), latch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + X, X, expected.get()); - - sub.stopDelivery(topic, subid); - sub.closeSubscription(topic, subid); - } - - @Test(timeout=60000) - public void testBasicBounding() throws Exception { - Client client = new HedwigClient(new MessageBoundClientConfiguration(5)); - Publisher pub = client.getPublisher(); - Subscriber sub = client.getSubscriber(); - - ByteString topic = ByteString.copyFromUtf8("basicBoundingTopic"); - ByteString subid = ByteString.copyFromUtf8("basicBoundingSubId"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - sub.subscribe(topic, subid, opts); - sub.closeSubscription(topic, subid); - - sendXExpectLastY(pub, sub, topic, subid, 1000, 5); - - client.close(); - } - - @Test(timeout=60000) - public void testMultipleSubscribers() throws Exception { - ByteString topic = ByteString.copyFromUtf8("multiSubTopic"); - - Client client = new HedwigClient(new HubClientConfiguration()); - Publisher pub = client.getPublisher(); - Subscriber sub = client.getSubscriber(); - - SubscriptionOptions options5 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(5).build(); - SubscriptionOptions options20 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(20).build(); - SubscriptionOptions optionsUnbounded = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE).build(); - - ByteString subid5 = ByteString.copyFromUtf8("bound5SubId"); - ByteString subid20 = ByteString.copyFromUtf8("bound20SubId"); - ByteString subidUnbounded = ByteString.copyFromUtf8("noboundSubId"); - - sub.subscribe(topic, subid5, options5); - sub.closeSubscription(topic, subid5); - sendXExpectLastY(pub, sub, topic, subid5, 1000, 5); - - sub.subscribe(topic, subid20, options20); - sub.closeSubscription(topic, subid20); - sendXExpectLastY(pub, sub, topic, subid20, 1000, 20); - - sub.subscribe(topic, subidUnbounded, optionsUnbounded); - sub.closeSubscription(topic, subidUnbounded); - - sendXExpectLastY(pub, sub, topic, subidUnbounded, 10000, 10000); - sub.unsubscribe(topic, subidUnbounded); - - sendXExpectLastY(pub, sub, topic, subid20, 1000, 20); - sub.unsubscribe(topic, subid20); - - sendXExpectLastY(pub, sub, topic, subid5, 1000, 5); - sub.unsubscribe(topic, subid5); - - client.close(); - } - - @Test(timeout=60000) - public void testUpdateMessageBound() throws Exception { - ByteString topic = ByteString.copyFromUtf8("UpdateMessageBound"); - - Client client = new HedwigClient(new HubClientConfiguration()); - Publisher pub = client.getPublisher(); - Subscriber sub = client.getSubscriber(); - - SubscriptionOptions options5 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(5).build(); - SubscriptionOptions options20 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(20).build(); - SubscriptionOptions options10 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(10).build(); - - ByteString subid = ByteString.copyFromUtf8("updateSubId"); - - sub.subscribe(topic, subid, options5); - sub.closeSubscription(topic, subid); - sendXExpectLastY(pub, sub, topic, subid, 50, 5); - - // update bound to 20 - sub.subscribe(topic, subid, options20); - sub.closeSubscription(topic, subid); - sendXExpectLastY(pub, sub, topic, subid, 50, 20); - - // update bound to 10 - sub.subscribe(topic, subid, options10); - sub.closeSubscription(topic, subid); - sendXExpectLastY(pub, sub, topic, subid, 50, 10); - - // message bound is not provided, no update - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - sub.subscribe(topic, subid, opts); - sub.closeSubscription(topic, subid); - sendXExpectLastY(pub, sub, topic, subid, 50, 10); - - client.close(); - } - - @Test(timeout=60000) - public void testLedgerGC() throws Exception { - Client client = new HedwigClient(new MessageBoundClientConfiguration()); - Publisher pub = client.getPublisher(); - Subscriber sub = client.getSubscriber(); - - String ledgersPath = "/hedwig/standalone/topics/testGCTopic/ledgers"; - ByteString topic = ByteString.copyFromUtf8("testGCTopic"); - ByteString subid = ByteString.copyFromUtf8("testGCSubId"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - sub.subscribe(topic, subid, opts); - sub.closeSubscription(topic, subid); - - for (int i = 1; i <= 100; i++) { - pub.publish(topic, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - LedgerRanges r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null)); - assertEquals("Should only have 1 ledger yet", 1, r.getRangesList().size()); - long firstLedger = r.getRangesList().get(0).getLedgerId(); - - stopHubServers(); - startHubServers(); - - pub.publish(topic, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(0xdeadbeef))).build()); - - r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null)); - assertEquals("Should have 2 ledgers after restart", 2, r.getRangesList().size()); - - for (int i = 100; i <= 200; i++) { - pub.publish(topic, Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - Thread.sleep(5000); // give GC a chance to happen - - r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null)); - long secondLedger = r.getRangesList().get(0).getLedgerId(); - - assertEquals("Should only have 1 ledger after GC", 1, r.getRangesList().size()); - - // ensure original ledger doesn't exist - String firstLedgerPath = String.format("/ledgers/L%010d", firstLedger); - String secondLedgerPath = String.format("/ledgers/L%010d", secondLedger); - assertNull("Ledger should not exist", bktb.getZooKeeperClient().exists(firstLedgerPath, false)); - assertNotNull("Ledger should exist", bktb.getZooKeeperClient().exists(secondLedgerPath, false)); - - client.close(); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java deleted file mode 100644 index 827677f..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish; - -public class StubPersistenceManager implements PersistenceManagerWithRangeScan { - Map<ByteString, List<Message>> messages = new HashMap<ByteString, List<Message>>(); - boolean failure = false; - ServiceDownException exception = new ServiceDownException("Asked to fail"); - - public void deliveredUntil(ByteString topic, Long seqId) { - // noop - } - - public void consumedUntil(ByteString topic, Long seqId) { - // noop - } - - public void setMessageBound(ByteString topic, Integer bound) { - // noop - } - - public void clearMessageBound(ByteString topic) { - // noop - } - - public void consumeToBound(ByteString topic) { - // noop - } - - protected static class ArrayListMessageFactory implements Factory<List<Message>> { - static ArrayListMessageFactory instance = new ArrayListMessageFactory(); - - public List<Message> newInstance() { - return new ArrayList<Message>(); - } - } - - public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) { - long seqId = MapMethods.getAfterInsertingIfAbsent(messages, topic, ArrayListMessageFactory.instance).size(); - return MessageSeqId.newBuilder().setLocalComponent(seqId).build(); - } - - public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) { - return seqId + skipAmount; - } - - public void persistMessage(PersistRequest request) { - if (failure) { - request.getCallback().operationFailed(request.getCtx(), exception); - return; - } - - MapMethods.addToMultiMap(messages, request.getTopic(), request.getMessage(), ArrayListMessageFactory.instance); - request.getCallback().operationFinished(request.getCtx(), MessageIdUtils.mergeLocalSeqId(request.getMessage(), - (long) messages.get(request.getTopic()).size()).getMsgId()); - } - - public void scanSingleMessage(ScanRequest request) { - if (failure) { - request.getCallback().scanFailed(request.getCtx(), exception); - return; - } - - long index = request.getStartSeqId() - 1; - List<Message> messageList = messages.get(request.getTopic()); - if (index >= messageList.size()) { - request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES); - return; - } - - Message msg = messageList.get((int) index); - Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, request.getStartSeqId()); - request.getCallback().messageScanned(request.getCtx(), toDeliver); - } - - public void scanMessages(RangeScanRequest request) { - if (failure) { - request.getCallback().scanFailed(request.getCtx(), exception); - return; - } - - long totalSize = 0; - long startSeqId = request.getStartSeqId(); - for (int i = 0; i < request.getMessageLimit(); i++) { - List<Message> messageList = MapMethods.getAfterInsertingIfAbsent(messages, request.getTopic(), - ArrayListMessageFactory.instance); - if (startSeqId + i > messageList.size()) { - request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES); - return; - } - Message msg = messageList.get((int) startSeqId + i - 1); - Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, startSeqId + i); - request.getCallback().messageScanned(request.getCtx(), toDeliver); - - totalSize += toDeliver.getBody().size(); - - if (totalSize > request.getSizeLimit()) { - request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.SIZE_LIMIT_EXCEEDED); - return; - } - } - request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED); - - } - - @Override - public void stop() { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java deleted file mode 100644 index e9fbd08..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; - -public class StubScanCallback implements ScanCallback { - - public static Message END_MESSAGE = Message.newBuilder().setBody(ByteString.EMPTY).build(); - - LinkedBlockingQueue<Either<Message, Exception>> queue = new LinkedBlockingQueue<Either<Message,Exception>>(); - - @Override - public void messageScanned(Object ctx, Message message) { - ConcurrencyUtils.put(queue, Either.of(message, (Exception) null)); - } - - @Override - public void scanFailed(Object ctx, Exception exception) { - ConcurrencyUtils.put(queue, Either.of((Message) null, exception)); - } - - @Override - public void scanFinished(Object ctx, ReasonForFinish reason) { - ConcurrencyUtils.put(queue, Either.of(END_MESSAGE, (Exception) null)); - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java deleted file mode 100644 index d65750b..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java +++ /dev/null @@ -1,798 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; - -import org.apache.hedwig.HelperMethods; -import org.apache.hedwig.StubCallback; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.meta.SubscriptionDataManager; -import org.apache.hedwig.server.meta.TopicOwnershipManager; -import org.apache.hedwig.server.meta.TopicPersistenceManager; -import org.apache.hedwig.server.subscriptions.MMSubscriptionManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.zookeeper.ZooKeeper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestBookKeeperPersistenceManager { - private static final Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class); - - BookKeeperTestBase bktb; - private final int numBookies = 3; - private final long readDelay = 2000L; - private final int maxEntriesPerLedger = 10; - - ServerConfiguration conf; - ScheduledExecutorService scheduler; - - TopicManager tm; - BookkeeperPersistenceManager manager; - PubSubException failureException = null; - TestMetadataManagerFactory metadataManagerFactory; - TopicPersistenceManager tpManager; - MMSubscriptionManager sm; - - boolean removeStartSeqId; - - static class TestMetadataManagerFactory extends MetadataManagerFactory { - - final MetadataManagerFactory factory; - int serviceDownCount = 0; - - TestMetadataManagerFactory(ServerConfiguration conf, ZooKeeper zk) throws Exception { - factory = MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - } - - public void setServiceDownCount(int count) { - this.serviceDownCount = count; - } - - @Override - public int getCurrentVersion() { - return factory.getCurrentVersion(); - } - - @Override - protected MetadataManagerFactory initialize( - ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException { - // do nothing - return factory; - } - - @Override - public void shutdown() throws IOException { - factory.shutdown(); - } - - @Override - public Iterator<ByteString> getTopics() throws IOException { - return factory.getTopics(); - } - - @Override - public TopicPersistenceManager newTopicPersistenceManager() { - final TopicPersistenceManager manager = factory.newTopicPersistenceManager(); - return new TopicPersistenceManager() { - - @Override - public void close() throws IOException { - manager.close(); - } - - @Override - public void readTopicPersistenceInfo(ByteString topic, - Callback<Versioned<LedgerRanges>> callback, Object ctx) { - if (serviceDownCount > 0) { - --serviceDownCount; - callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down")); - return; - } - manager.readTopicPersistenceInfo(topic, callback, ctx); - } - @Override - public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version, - Callback<Version> callback, Object ctx) { - if (serviceDownCount > 0) { - --serviceDownCount; - callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down")); - return; - } - manager.writeTopicPersistenceInfo(topic, ranges, version, callback, ctx); - } - @Override - public void deleteTopicPersistenceInfo(ByteString topic, Version version, - Callback<Void> callback, Object ctx) { - if (serviceDownCount > 0) { - --serviceDownCount; - callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down")); - return; - } - manager.deleteTopicPersistenceInfo(topic, version, callback, ctx); - } - }; - } - - @Override - public SubscriptionDataManager newSubscriptionDataManager() { - final SubscriptionDataManager sdm = factory.newSubscriptionDataManager(); - return new SubscriptionDataManager() { - @Override - public void close() throws IOException { - sdm.close(); - } - - @Override - public void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data, - Callback<Version> callback, Object ctx) { - sdm.createSubscriptionData(topic, subscriberId, data, callback, ctx); - } - - @Override - public boolean isPartialUpdateSupported() { - return sdm.isPartialUpdateSupported(); - } - - @Override - public void updateSubscriptionData(ByteString topic, ByteString subscriberId, - SubscriptionData dataToUpdate, Version version, Callback<Version> callback, Object ctx) { - if (serviceDownCount > 0) { - --serviceDownCount; - callback.operationFailed(ctx, - new PubSubException.ServiceDownException("Metadata Store is down")); - return; - } - sdm.updateSubscriptionData(topic, subscriberId, dataToUpdate, version, callback, ctx); - } - - @Override - public void replaceSubscriptionData(ByteString topic, ByteString subscriberId, - SubscriptionData dataToReplace, Version version, Callback<Version> callback, Object ctx) { - if (serviceDownCount > 0) { - --serviceDownCount; - callback.operationFailed(ctx, - new PubSubException.ServiceDownException("Metadata Store is down")); - return; - } - sdm.replaceSubscriptionData(topic, subscriberId, dataToReplace, version, callback, ctx); - } - - @Override - public void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, - Callback<Void> callback, Object ctx) { - sdm.deleteSubscriptionData(topic, subscriberId, version, callback, ctx); - } - - @Override - public void readSubscriptionData(ByteString topic, ByteString subscriberId, - Callback<Versioned<SubscriptionData>> callback, Object ctx) { - sdm.readSubscriptionData(topic, subscriberId, callback, ctx); - } - - @Override - public void readSubscriptions(ByteString topic, - Callback<Map<ByteString, Versioned<SubscriptionData>>> cb, Object ctx) { - sdm.readSubscriptions(topic, cb, ctx); - } - }; - } - - @Override - public TopicOwnershipManager newTopicOwnershipManager() { - return factory.newTopicOwnershipManager(); - } - - @Override - public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException { - factory.format(cfg, zk); - } - } - - public TestBookKeeperPersistenceManager(boolean removeStartSeqId) { - this.removeStartSeqId = removeStartSeqId; - } - - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { - { true }, { false } - }); - } - - @SuppressWarnings("deprecation") - private void startCluster(long delay) throws Exception { - bktb = new BookKeeperTestBase(numBookies, 0L); - bktb.setUp(); - - conf = new ServerConfiguration() { - @Override - public int getMessagesConsumedThreadRunInterval() { - return 2000; - } - @Override - public int getConsumeInterval() { - return 0; - } - @Override - public long getMaxEntriesPerLedger() { - return maxEntriesPerLedger; - } - }; - org.apache.bookkeeper.conf.ClientConfiguration bkClientConf = - new org.apache.bookkeeper.conf.ClientConfiguration(); - bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999) - .setThrottleValue(3); - conf.addConf(bkClientConf); - - metadataManagerFactory = new TestMetadataManagerFactory(conf, bktb.getZooKeeperClient()); - tpManager = metadataManagerFactory.newTopicPersistenceManager(); - - scheduler = Executors.newScheduledThreadPool(1); - tm = new TrivialOwnAllTopicManager(conf, scheduler); - manager = new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory, - tm, conf, scheduler); - sm = new MMSubscriptionManager(conf, metadataManagerFactory, tm, manager, null, scheduler); - } - - private void stopCluster() throws Exception { - tm.stop(); - manager.stop(); - sm.stop(); - tpManager.close(); - metadataManagerFactory.shutdown(); - scheduler.shutdown(); - bktb.tearDown(); - } - - @Before - public void setUp() throws Exception { - startCluster(0L); - } - - @After - public void tearDown() throws Exception { - stopCluster(); - } - - class RangeScanVerifier implements ScanCallback { - LinkedList<Message> pubMsgs; - boolean runNextScan = false; - RangeScanRequest nextScan = null; - - public RangeScanVerifier(LinkedList<Message> pubMsgs, RangeScanRequest nextScan) { - this.pubMsgs = pubMsgs; - this.nextScan = nextScan; - } - - @Override - public void messageScanned(Object ctx, Message recvMessage) { - logger.info("Scanned message : {}", recvMessage.getMsgId().getLocalComponent()); - if (null != nextScan && !runNextScan) { - runNextScan = true; - manager.scanMessages(nextScan); - } - - if (pubMsgs.size() == 0) { - return; - } - - Message pubMsg = pubMsgs.removeFirst(); - if (!HelperMethods.areEqual(recvMessage, pubMsg)) { - fail("Scanned message not equal to expected"); - } - } - - @Override - public void scanFailed(Object ctx, Exception exception) { - fail("Failed to scan messages."); - } - - @Override - @SuppressWarnings("unchecked") - public void scanFinished(Object ctx, ReasonForFinish reason) { - LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx; - try { - statusQueue.put(pubMsgs.isEmpty()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - } - - private LinkedList<Message> subMessages(List<Message> msgs, int start, int end) { - LinkedList<Message> result = new LinkedList<Message>(); - for (int i=start; i<=end; i++) { - result.add(msgs.get(i)); - } - return result; - } - - @Test(timeout=60000) - public void testScanMessagesOnClosedLedgerAfterDeleteLedger() throws Exception { - scanMessagesAfterDeleteLedgerTest(2); - } - - @Test(timeout=60000) - public void testScanMessagesOnUnclosedLedgerAfterDeleteLedger() throws Exception { - scanMessagesAfterDeleteLedgerTest(1); - } - - private void scanMessagesAfterDeleteLedgerTest(int numLedgers) throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestScanMessagesAfterDeleteLedger"); - - List<Message> msgs = new ArrayList<Message>(); - - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 2)); - - for (int i=0; i<numLedgers; i++) { - releaseTopic(topic); - // acquire topic again to force a new ledger - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 2)); - } - - consumedUntil(topic, 2L); - // Wait until ledger ranges is updated. - Thread.sleep(2000L); - releaseTopic(topic); - - // acquire topic again - acquireTopic(topic); - // scan messages starting from 3 - LinkedBlockingQueue<Boolean> statusQueue = - new LinkedBlockingQueue<Boolean>(); - manager.scanMessages(new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE, - new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue)); - Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS); - assertTrue("Should succeed to scan messages after deleted consumed ledger.", b); - } - - @Test(timeout=60000) - public void testScanMessagesOnEmptyLedgerAfterDeleteLedger() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnEmptyLedgerAfterDeleteLedger"); - - List<Message> msgs = new ArrayList<Message>(); - - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 2)); - releaseTopic(topic); - - // acquire topic again to force a new ledger - acquireTopic(topic); - logger.info("Consumed messages."); - consumedUntil(topic, 2L); - // Wait until ledger ranges is updated. - Thread.sleep(2000L); - logger.info("Released topic with an empty ledger."); - // release topic to force an empty ledger - releaseTopic(topic); - - // publish 2 more messages, these message expected to be id 3 and 4 - acquireTopic(topic); - logger.info("Published more messages."); - msgs.addAll(publishMessages(topic, 2)); - releaseTopic(topic); - - // acquire topic again - acquireTopic(topic); - // scan messages starting from 3 - LinkedBlockingQueue<Boolean> statusQueue = - new LinkedBlockingQueue<Boolean>(); - long startSeqId = removeStartSeqId ? 1 : 3; - manager.scanMessages(new RangeScanRequest(topic, startSeqId, 2, Long.MAX_VALUE, - new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue)); - Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS); - assertTrue("Should succeed to scan messages after deleted consumed ledger.", b); - } - - @Test(timeout=60000) - public void testFailedToDeleteLedger1() throws Exception { - failedToDeleteLedgersTest(1); - } - - @Test(timeout=60000) - public void testFailedToDeleteLedger2() throws Exception { - // succeed to delete second ledger - failedToDeleteLedgersTest(2); - } - - private void failedToDeleteLedgersTest(int numLedgers) throws Exception { - final ByteString topic = ByteString.copyFromUtf8("TestFailedToDeleteLedger"); - final int serviceDownCount = 1; - - List<Message> msgs = new ArrayList<Message>(); - - for (int i=0; i<numLedgers; i++) { - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 2)); - releaseTopic(topic); - } - - // acquire topic again to force a new ledger - acquireTopic(topic); - logger.info("Consumed messages."); - metadataManagerFactory.setServiceDownCount(serviceDownCount); - // failed consumed - consumedUntil(topic, 2L * numLedgers); - // Wait until ledger ranges is updated. - Thread.sleep(2000L); - logger.info("Released topic with an empty ledger."); - // release topic to force an empty ledger - releaseTopic(topic); - - // publish 2 more messages, these message expected to be id 3 and 4 - acquireTopic(topic); - logger.info("Published more messages."); - msgs.addAll(publishMessages(topic, 2)); - releaseTopic(topic); - - // acquire topic again - acquireTopic(topic); - LinkedBlockingQueue<Boolean> statusQueue = - new LinkedBlockingQueue<Boolean>(); - manager.scanMessages(new RangeScanRequest(topic, numLedgers * 2 + 1, 2, Long.MAX_VALUE, - new RangeScanVerifier(subMessages(msgs, numLedgers * 2, numLedgers * 2 + 1), null), statusQueue)); - Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS); - assertTrue("Should succeed to scan messages after deleted consumed ledger.", b); - - // consumed - consumedUntil(topic, (numLedgers + 1) * 2L); - // Wait until ledger ranges is updated. - Thread.sleep(2000L); - - Semaphore latch = new Semaphore(1); - latch.acquire(); - tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() { - @Override - public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) { - if (null == ranges || ranges.getValue().getRangesList().size() > 1) { - failureException = new PubSubException.NoTopicPersistenceInfoException("Invalid persistence info found for topic " + topic.toStringUtf8()); - ((Semaphore)ctx).release(); - return; - } - failureException = null; - ((Semaphore)ctx).release(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - failureException = exception; - ((Semaphore)ctx).release(); - } - }, latch); - latch.acquire(); - latch.release(); - assertNull("Should not fail with exception.", failureException); - } - - @Test(timeout=60000) - public void testScanMessagesOnTwoLedgers() throws Exception { - stopCluster(); - startCluster(readDelay); - - ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnTwoLedgers"); - - List<Message> msgs = new ArrayList<Message>(); - - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 1)); - releaseTopic(topic); - - // acquire topic again to force a new ledger - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 3)); - - // scan messages - LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>(); - RangeScanRequest nextScan = new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE, - new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue); - manager.scanMessages(new RangeScanRequest(topic, 1, 2, Long.MAX_VALUE, - new RangeScanVerifier(subMessages(msgs, 0, 1), nextScan), statusQueue)); - Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS); - if (b == null) { - fail("One scan request doesn't finish"); - } - b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS); - if (b == null) { - fail("One scan request doesn't finish"); - } - } - - @Test(timeout=60000) - public void testInconsistentSubscriptionStateAndLedgerRanges1() throws Exception { - // See the comment of inconsistentSubscriptionStateAndLedgerRanges. - // For this case, Step (2) failed to update subscription state metadata, - // but LedgerRanges is updated success. - // Result: scan messages from 1 to 4 take place on ledger L2. - inconsistentSubscriptionStateAndLedgerRanges(1); - } - - @Test(timeout=60000) - public void testInconsistentSubscriptionStateAndLedgerRanges2() throws Exception { - // See the comment of inconsistentSubscriptionStateAndLedgerRanges. - // For this case, step (2) failed to update subscription state metadata, - // step (3) successfully delete L1 but failed to update LedgerRanges. - // Result: scan messages from 1 to 4 falls in L1 and L2, - // but BookKeeper may complain L1 not found. - inconsistentSubscriptionStateAndLedgerRanges(2); - } - - /** - * Since InMemorySubscriptionState and LedgerRanges is maintained - * separately, there may exist such inconsistent state: - * (1). Topic ledgers: L1 [1 ~ 2], L2 [3 ~ ] - * (2). Subscriber consumes to 2 and InMemorySubscriptionState is updated - * successfully but failed when updating subscription state metadata - * (3). AbstractSubscriptionManager#MessagesConsumedTask use - * InMemorySubscriptionState to do garbage collection - * and L1 is delete - * (4). If Hub restarts at this time, old subscription state is read and - * Hub will try to deliver message from 1 - */ - public void inconsistentSubscriptionStateAndLedgerRanges(int failedCount) throws Exception { - final ByteString topic = ByteString.copyFromUtf8("inconsistentSubscriptionStateAndLedgerRanges"); - final ByteString subscriberId = ByteString.copyFromUtf8("subId"); - LinkedList<Message> msgs = new LinkedList<Message>(); - - // make ledger L1 [1 ~ 2] - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 2)); - releaseTopic(topic); - - // acquire topic again to force a new ledger L2 [3 ~ ] - acquireTopic(topic); - msgs.addAll(publishMessages(topic, 2)); - - StubCallback<Void> voidCb = new StubCallback<Void>(); - StubCallback<SubscriptionData> subDataCb = new StubCallback<SubscriptionData>(); - Either<Void, PubSubException> voidResult; - Either<SubscriptionData, PubSubException> subDataResult; - - // prepare for subscription - sm.acquiredTopic(topic, voidCb, null); - voidResult = ConcurrencyUtils.take(voidCb.queue); - assertNull(voidResult.right()); // no exception - - // Do subscription - SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(subscriberId) - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - sm.serveSubscribeRequest(topic, subRequest, MessageSeqId.newBuilder().setLocalComponent(0).build(), subDataCb, - null); - subDataResult = ConcurrencyUtils.take(subDataCb.queue); - assertNotNull(subDataResult.left()); // serveSubscribeRequest success - // and return a SubscriptionData - // object - assertNull(subDataResult.right()); // no exception - - // simulate inconsistent situation between InMemorySubscriptionState and - // LedgerRanges - metadataManagerFactory.setServiceDownCount(failedCount); - sm.setConsumeSeqIdForSubscriber(topic, subscriberId, MessageSeqId.newBuilder().setLocalComponent(2).build(), - voidCb, null); - voidResult = ConcurrencyUtils.take(voidCb.queue); - assertNotNull(voidResult.right()); // update subscription state failed - // and expect a exception - - // wait AbstractSubscriptionManager#MessagesConsumedTask to garbage - // collect ledger L1 - Thread.sleep(conf.getMessagesConsumedThreadRunInterval() * 2); - - // simulate hub restart: read old subscription state metadata and deliver - // messages from 1 - LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>(); - RangeScanRequest scan = new RangeScanRequest(topic, 1, 4, Long.MAX_VALUE, new RangeScanVerifier(msgs, null), - statusQueue); - manager.scanMessages(scan); - Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS); - if (b == null) { - fail("Scan request doesn't finish"); - } - } - - @Test(timeout=60000) - // Add this test case for BOOKKEEPER-458 - public void testReadWhenTopicChangeLedger() throws Exception { - final ByteString topic = ByteString.copyFromUtf8("testReadWhenTopicChangeLedger"); - LinkedList<Message> msgs = new LinkedList<Message>(); - - // Write maxEntriesPerLedger entries to make topic change ledger - acquireTopic(topic); - msgs.addAll(publishMessages(topic, maxEntriesPerLedger)); - - // Notice, change ledger operation is asynchronous, so we should wait!!! - Thread.sleep(2000); - - // Issue a scan request right start from the new ledger - LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>(); - RangeScanRequest scan = new RangeScanRequest(topic, maxEntriesPerLedger + 1, 1, Long.MAX_VALUE, - new RangeScanVerifier(msgs, null), statusQueue); - manager.scanMessages(scan); - Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS); - if (b == null) { - fail("Scan request timeout"); - } - assertFalse("Expect none message is scanned on the new created ledger", b); - } - - class TestCallback implements Callback<PubSubProtocol.MessageSeqId> { - - @Override - @SuppressWarnings("unchecked") - public void operationFailed(Object ctx, PubSubException exception) { - LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx; - try { - statusQueue.put(false); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - @SuppressWarnings("unchecked") - public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) { - LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx; - try { - statusQueue.put(true); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - protected List<Message> publishMessages(ByteString topic, int numMsgs) throws Exception { - List<Message> msgs = HelperMethods.getRandomPublishedMessages(numMsgs, 1024); - LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>(); - for (Message msg : msgs) { - - try { - manager.persistMessage(new PersistRequest(topic, msg, new TestCallback(), statusQueue)); - // wait a maximum of a minute - Boolean b = statusQueue.poll(60, TimeUnit.SECONDS); - if (b == null) { - throw new RuntimeException("Publish timed out"); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - return msgs; - } - - protected void acquireTopic(ByteString topic) throws Exception { - Semaphore latch = new Semaphore(1); - latch.acquire(); - manager.acquiredTopic(topic, new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - failureException = null; - ((Semaphore)ctx).release(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - failureException = exception; - ((Semaphore)ctx).release(); - } - }, latch); - latch.acquire(); - latch.release(); - if (null != failureException) { - throw failureException; - } - } - - protected void releaseTopic(final ByteString topic) throws Exception { - manager.lostTopic(topic); - // backward testing ledger ranges without start seq id - if (removeStartSeqId) { - Semaphore latch = new Semaphore(1); - latch.acquire(); - tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() { - @Override - public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) { - if (null == ranges) { - failureException = new PubSubException.NoTopicPersistenceInfoException("No persistence info found for topic " + topic.toStringUtf8()); - ((Semaphore)ctx).release(); - return; - } - - // build a new ledger ranges w/o start seq id. - LedgerRanges.Builder builder = LedgerRanges.newBuilder(); - final List<LedgerRange> rangesList = ranges.getValue().getRangesList(); - for (LedgerRange range : rangesList) { - LedgerRange.Builder newRangeBuilder = LedgerRange.newBuilder(); - newRangeBuilder.setLedgerId(range.getLedgerId()); - if (range.hasEndSeqIdIncluded()) { - newRangeBuilder.setEndSeqIdIncluded(range.getEndSeqIdIncluded()); - } - builder.addRanges(newRangeBuilder.build()); - } - tpManager.writeTopicPersistenceInfo(topic, builder.build(), ranges.getVersion(), - new Callback<Version>() { - @Override - public void operationFinished(Object ctx, Version newVersion) { - failureException = null; - ((Semaphore)ctx).release(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - failureException = exception; - ((Semaphore)ctx).release(); - } - }, ctx); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - failureException = exception; - ((Semaphore)ctx).release(); - } - }, latch); - latch.acquire(); - latch.release(); - if (null != failureException) { - throw failureException; - } - } - } - - protected void consumedUntil(ByteString topic, long seqId) throws Exception { - manager.consumedUntil(topic, seqId); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java deleted file mode 100644 index 1f30b5b..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import org.junit.After; -import org.junit.Before; - -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager; - -public class TestBookKeeperPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox { - BookKeeperTestBase bktb; - private final int numBookies = 3; - - MetadataManagerFactory metadataManagerFactory = null; - - @Before - @Override - public void setUp() throws Exception { - // We need to setUp this class first since the super.setUp() method will - // need the BookKeeperTestBase to be instantiated. - bktb = new BookKeeperTestBase(numBookies); - bktb.setUp(); - super.setUp(); - } - - - @After - @Override - public void tearDown() throws Exception { - bktb.tearDown(); - super.tearDown(); - if (null != metadataManagerFactory) { - metadataManagerFactory.shutdown(); - } - } - - @Override - long getLowestSeqId() { - return 1; - } - - @Override - PersistenceManager instantiatePersistenceManager() throws Exception { - ServerConfiguration conf = new ServerConfiguration(); - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - metadataManagerFactory = - MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient()); - - return new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory, - new TrivialOwnAllTopicManager(conf, scheduler), - conf, scheduler); - } - - @Override - public long getExpectedSeqId(int numPublished) { - return numPublished; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java deleted file mode 100644 index da2b06c..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java +++ /dev/null @@ -1,355 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.util.List; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.apache.hedwig.util.Either; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.HelperMethods; -import org.apache.hedwig.StubCallback; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager; -import org.apache.hedwig.util.ConcurrencyUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - -public class TestBookkeeperPersistenceManagerWhiteBox { - - protected static final Logger logger = - LoggerFactory.getLogger(TestBookkeeperPersistenceManagerWhiteBox.class); - - BookKeeperTestBase bktb; - private final int numBookies = 3; - BookkeeperPersistenceManager bkpm; - MetadataManagerFactory mm; - ServerConfiguration conf; - ScheduledExecutorService scheduler; - TopicManager tm; - ByteString topic = ByteString.copyFromUtf8("topic0"); - - @Before - public void setUp() throws Exception { - bktb = new BookKeeperTestBase(numBookies); - bktb.setUp(); - - conf = new ServerConfiguration(); - scheduler = Executors.newScheduledThreadPool(1); - tm = new TrivialOwnAllTopicManager(conf, scheduler); - - mm = MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient()); - - bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm, conf, scheduler); - } - - @After - public void tearDown() throws Exception { - mm.shutdown(); - bktb.tearDown(); - } - - @Test(timeout=60000) - public void testEmptyDirtyLedger() throws Exception { - - StubCallback<Void> stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - // now abandon, and try another time, the prev ledger should be dirty - - bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), mm, tm, - conf, scheduler); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size()); - } - - @Test(timeout=60000) - public void testNonEmptyDirtyLedger() throws Exception { - - Random r = new Random(); - int NUM_MESSAGES_TO_TEST = 100; - int SIZE_OF_MESSAGES_TO_TEST = 100; - int index = 0; - int numPrevLedgers = 0; - List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, - SIZE_OF_MESSAGES_TO_TEST); - - while (index < messages.size()) { - - StubCallback<Void> stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - assertEquals(numPrevLedgers, bkpm.topicInfos.get(topic).ledgerRanges.size()); - - StubCallback<PubSubProtocol.MessageSeqId> persistCallback = new StubCallback<PubSubProtocol.MessageSeqId>(); - bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null)); - assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent()); - - index++; - // once in every 10 times, give up ledger - if (r.nextInt(10) == 9) { - // should not release topic when the message is last message - // otherwise when we call scan, bookkeeper persistence manager doesn't own the topic - if (index < messages.size()) { - // Make the bkpm lose its memory - bkpm.topicInfos.clear(); - numPrevLedgers++; - } - } - } - - // Lets scan now - StubScanCallback scanCallback = new StubScanCallback(); - bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null)); - for (int i = 0; i < messages.size(); i++) { - Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left(); - assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody())); - assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent()); - } - assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left()); - - } - - static final long maxEntriesPerLedger = 10; - - class ChangeLedgerServerConfiguration extends ServerConfiguration { - @Override - public long getMaxEntriesPerLedger() { - return maxEntriesPerLedger; - } - } - - @Test(timeout=60000) - public void testSyncChangeLedgers() throws Exception { - int NUM_MESSAGES_TO_TEST = 101; - int SIZE_OF_MESSAGES_TO_TEST = 100; - int index = 0; - List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, - SIZE_OF_MESSAGES_TO_TEST); - - bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm, - new ChangeLedgerServerConfiguration(), scheduler); - - // acquire the topic - StubCallback<Void> stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size()); - - while (index < messages.size()) { - logger.debug("Persist message {}", (index + 1)); - StubCallback<MessageSeqId> persistCallback = new StubCallback<MessageSeqId>(); - bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null)); - assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent()); - - index++; - if (index % maxEntriesPerLedger == 1) { - assertEquals(index / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size()); - } - } - assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size()); - - // Lets scan now - StubScanCallback scanCallback = new StubScanCallback(); - bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null)); - for (int i = 0; i < messages.size(); i++) { - Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left(); - assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody())); - assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent()); - } - assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left()); - - // Make the bkpm lose its memory - bkpm.topicInfos.clear(); - - // acquire the topic again - stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1, bkpm.topicInfos.get(topic).ledgerRanges.size()); - } - - class OrderCheckingCallback extends StubCallback<MessageSeqId> { - long curMsgId; - int numMessages; - int numProcessed; - int numSuccess; - int numFailed; - - OrderCheckingCallback(long startMsgId, int numMessages) { - this.curMsgId = startMsgId; - this.numMessages = numMessages; - numProcessed = numSuccess = numFailed = 0; - } - - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - synchronized (this) { - ++numFailed; - ++numProcessed; - if (numProcessed == numMessages) { - MessageSeqId.Builder seqIdBuilder = - MessageSeqId.newBuilder().setLocalComponent(curMsgId); - super.operationFinished(ctx, seqIdBuilder.build()); - } - } - } - - @Override - public void operationFinished(Object ctx, final MessageSeqId seqId) { - synchronized(this) { - long msgId = seqId.getLocalComponent(); - if (msgId == curMsgId) { - ++curMsgId; - } - ++numSuccess; - ++numProcessed; - if (numProcessed == numMessages) { - MessageSeqId.Builder seqIdBuilder = - MessageSeqId.newBuilder().setLocalComponent(curMsgId); - super.operationFinished(ctx, seqIdBuilder.build()); - } - } - } - } - - @Test(timeout=60000) - public void testAsyncChangeLedgers() throws Exception { - int NUM_MESSAGES_TO_TEST = 101; - int SIZE_OF_MESSAGES_TO_TEST = 100; - List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, - SIZE_OF_MESSAGES_TO_TEST); - - bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm, - new ChangeLedgerServerConfiguration(), scheduler); - - // acquire the topic - StubCallback<Void> stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size()); - - OrderCheckingCallback persistCallback = - new OrderCheckingCallback(1, NUM_MESSAGES_TO_TEST); - for (Message message : messages) { - bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null)); - } - assertEquals(NUM_MESSAGES_TO_TEST + 1, - ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent()); - assertEquals(NUM_MESSAGES_TO_TEST, persistCallback.numSuccess); - assertEquals(0, persistCallback.numFailed); - assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger, - bkpm.topicInfos.get(topic).ledgerRanges.size()); - - // ensure the bkpm has the topic before scanning - stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - - // Lets scan now - StubScanCallback scanCallback = new StubScanCallback(); - bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null)); - for (int i = 0; i < messages.size(); i++) { - Either<Message,Exception> e = ConcurrencyUtils.take(scanCallback.queue); - Message scannedMessage = e.left(); - if (scannedMessage == null) { - throw e.right(); - } - - assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody())); - assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent()); - } - assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left()); - - // Make the bkpm lose its memory - bkpm.topicInfos.clear(); - - // acquire the topic again - stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1, - bkpm.topicInfos.get(topic).ledgerRanges.size()); - } - - class ChangeLedgerCallback extends OrderCheckingCallback { - boolean tearDown = false; - ChangeLedgerCallback(long startMsgId, int numMessages) { - super(startMsgId, numMessages); - } - - @Override - public void operationFinished(Object ctx, final MessageSeqId msgId) { - super.operationFinished(ctx, msgId); - // shutdown bookie server when changing ledger - // so following requests should fail - if (msgId.getLocalComponent() >= maxEntriesPerLedger && !tearDown) { - try { - bktb.tearDownOneBookieServer(); - bktb.tearDownOneBookieServer(); - } catch (Exception e) { - logger.error("Failed to tear down bookie server."); - } - tearDown = true; - } - } - } - - @Test(timeout=60000) - public void testChangeLedgerFailure() throws Exception { - int NUM_MESSAGES_TO_TEST = 101; - int SIZE_OF_MESSAGES_TO_TEST = 100; - List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, - SIZE_OF_MESSAGES_TO_TEST); - - bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm, - new ChangeLedgerServerConfiguration(), scheduler); - - // acquire the topic - StubCallback<Void> stubCallback = new StubCallback<Void>(); - bkpm.acquiredTopic(topic, stubCallback, null); - assertNull(ConcurrencyUtils.take(stubCallback.queue).right()); - assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size()); - - ChangeLedgerCallback persistCallback = - new ChangeLedgerCallback(1, NUM_MESSAGES_TO_TEST); - for (Message message : messages) { - bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null)); - } - assertEquals(maxEntriesPerLedger + 1, - ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent()); - assertEquals(maxEntriesPerLedger, persistCallback.numSuccess); - assertEquals(NUM_MESSAGES_TO_TEST - maxEntriesPerLedger, persistCallback.numFailed); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java deleted file mode 100644 index 90c1817..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; - -public class TestDeadlock extends HedwigHubTestBase { - - protected static final Logger logger = LoggerFactory.getLogger(TestDeadlock.class); - - // Client side variables - protected HedwigClient client; - protected Publisher publisher; - protected Subscriber subscriber; - - ByteString topic = ByteString.copyFromUtf8("DeadLockTopic"); - ByteString subscriberId = ByteString.copyFromUtf8("dl"); - - public TestDeadlock() { - super(1); - } - - @Override - @Before - public void setUp() throws Exception { - numBookies = 1; - readDelay = 1000L; // 1s - super.setUp(); - client = new HedwigClient(new HubClientConfiguration()); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - @Override - @After - public void tearDown() throws Exception { - client.close(); - super.tearDown(); - } - - // Test implementation of Callback for async client actions. - static class TestCallback implements Callback<Void> { - private final SynchronousQueue<Boolean> queue; - - public TestCallback(SynchronousQueue<Boolean> queue) { - this.queue = queue; - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - new Thread(new Runnable() { - @Override - public void run() { - if (logger.isDebugEnabled()) - logger.debug("Operation finished!"); - ConcurrencyUtils.put(queue, true); - } - }).start(); - } - - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - new Thread(new Runnable() { - @Override - public void run() { - logger.error("Operation failed!", exception); - ConcurrencyUtils.put(queue, false); - } - }).start(); - } - } - - // Test implementation of subscriber's message handler. - class TestMessageHandler implements MessageHandler { - private final SynchronousQueue<Boolean> consumeQueue; - boolean doAdd = false; - - public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) { - this.consumeQueue = consumeQueue; - } - - public void deliver(ByteString t, ByteString sub, final Message msg, Callback<Void> callback, - Object context) { - if (!doAdd) { - // after receiving first message, we send a publish - // to obtain permit of second ledger - doAdd = true; - new Thread(new Runnable() { - @Override - public void run() { - // publish messages again to obtain permits - logger.info("Start publishing message to obtain permit"); - // it obtains the permit and wait for a response, - // but the response is delayed and readEntries is called - // in the readComplete callback to read entries of the - // same ledger. since there is no permit, it blocks - try { - CountDownLatch latch = new CountDownLatch(1); - sleepBookies(8, latch); - latch.await(); - SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>(); - for (int i=0; i<3; i++) { - publisher.asyncPublish(topic, getMsg(9999), new TestCallback(queue), null); - } - for (int i=0; i<3; i++) { - if (!queue.take()) { - logger.error("Error publishing to topic {}", topic); - ConcurrencyUtils.put(consumeQueue, false); - } - } - } catch (Exception e) { - logger.error("Failed to publish message to obtain permit."); - } - } - }).start(); - } - - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(consumeQueue, true); - } - }).start(); - callback.operationFinished(context, null); - } - } - - // Helper function to generate Messages - protected Message getMsg(int msgNum) { - return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build(); - } - - // Helper function to generate Topics - protected ByteString getTopic(int topicNum) { - return ByteString.copyFromUtf8("DeadLockTopic" + topicNum); - } - - class TestServerConfiguration extends HubServerConfiguration { - public TestServerConfiguration(int serverPort, int sslServerPort) { - super(serverPort, sslServerPort); - } - @Override - public int getBkEnsembleSize() { - return 1; - } - @Override - public int getBkWriteQuorumSize() { - return 1; - } - @Override - public int getBkAckQuorumSize() { - return 1; - } - @Override - public int getReadAheadCount() { - return 4; - } - @Override - public long getMaximumCacheSize() { - return 32; - } - } - - @SuppressWarnings("deprecation") - @Override - protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) { - ServerConfiguration serverConf = new TestServerConfiguration(serverPort, sslServerPort); - - org.apache.bookkeeper.conf.ClientConfiguration bkClientConf = - new org.apache.bookkeeper.conf.ClientConfiguration(); - bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999) - .setThrottleValue(3); - try { - serverConf.addConf(bkClientConf); - } catch (Exception e) { - } - return serverConf; - } - - @Test(timeout=60000) - public void testDeadlock() throws Exception { - int numMessages = 5; - - SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>(); - - // subscribe to topic - logger.info("Setup subscriptions"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subscriberId, opts); - subscriber.closeSubscription(topic, subscriberId); - - // publish 5 messages to form first ledger - for (int i=0; i<numMessages; i++) { - logger.info("Start publishing message {}", i); - publisher.publish(topic, getMsg(i)); - } - - stopHubServers(); - Thread.sleep(1000); - startHubServers(); - - logger.info("Start publishing messages"); - // publish enough messages to second ledger - // so a scan request need to scan over two ledgers, which - // cause readEntries executed in the previous readEntries - for (int i=0; i<numMessages; i++) { - logger.info("Start publishing message {}", i+5); - publisher.publish(topic, getMsg(i)); - } - - logger.info("Start subscribe topics again and receive messages"); - // subscribe to topic - subscriber.subscribe(topic, subscriberId, opts); - subscriber.startDelivery(topic, subscriberId, - new TestMessageHandler(consumeQueue)); - for (int i=0; i<(2*numMessages+3); i++) { - assertTrue(consumeQueue.take()); - } - } - - protected void sleepBookies(final int seconds, final CountDownLatch l) - throws InterruptedException, IOException { - Thread sleeper = new Thread() { - public void run() { - try { - bktb.suspendAllBookieServers(); - l.countDown(); - Thread.sleep(seconds * 1000); - bktb.resumeAllBookieServers(); - } catch (Exception e) { - logger.error("Error suspending thread", e); - } - } - }; - sleeper.start(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java deleted file mode 100644 index 856eab4..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import org.junit.After; -import org.junit.Before; - -public class TestLocalDBPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox { - - @After - @Override - public void tearDown() throws Exception { - super.tearDown(); - ((LocalDBPersistenceManager) persistenceManager).reset(); - } - - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - } - - @Override - long getLowestSeqId() { - return 1; - } - - @Override - PersistenceManager instantiatePersistenceManager() { - return LocalDBPersistenceManager.instance(); - } - - @Override - public long getExpectedSeqId(int numPublished) { - return numPublished; - } - -}
