https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - store has messages must be aware of pending also kahadb setBatch for async sends. additional tests and tidy up of cusror sync with store to reflect async/sync additions
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9c2b1d25 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9c2b1d25 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9c2b1d25 Branch: refs/heads/trunk Commit: 9c2b1d257288fb85138a37e30e1216251ca13eaf Parents: 243db1c Author: gtully <gary.tu...@gmail.com> Authored: Thu Oct 16 23:32:55 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Thu Oct 16 23:35:18 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 19 +- .../region/cursors/AbstractStoreCursor.java | 95 +-- .../region/cursors/QueueStorePrefetch.java | 8 +- .../activemq/store/ProxyMessageStore.java | 5 + .../activemq/store/kahadb/KahaDBStore.java | 8 + .../activemq/store/kahadb/MessageDatabase.java | 9 +- activemq-unit-tests/pom.xml | 1 + .../cursors/StoreQueueCursorOrderTest.java | 517 +++++++++++++++ .../bugs/AMQ5266StarvedConsumerTest.java | 641 +++++++++++++++++++ 9 files changed, 1249 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 6df48da..21d7522 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -771,19 +771,24 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index candidate = indexOrderedCursorUpdates.peek(); } } - for (MessageContext messageContext : orderedUpdates) { - if (!cursorAdd(messageContext.message)) { - // cursor suppressed a duplicate - messageContext.duplicate = true; + messagesLock.writeLock().lock(); + try { + for (MessageContext messageContext : orderedUpdates) { + if (!messages.addMessageLast(messageContext.message)) { + // cursor suppressed a duplicate + messageContext.duplicate = true; + } + if (messageContext.onCompletion != null) { + messageContext.onCompletion.run(); + } } + } finally { + messagesLock.writeLock().unlock(); } } finally { sendLock.unlock(); } for (MessageContext messageContext : orderedUpdates) { - if (messageContext.onCompletion != null) { - messageContext.onCompletion.run(); - } if (!messageContext.duplicate) { messageSent(messageContext.context, messageContext.message); } http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 19864b7..c4bf985 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -20,6 +20,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.ListIterator; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -90,6 +92,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { boolean recovered = false; + storeHasMessages = true; if (recordUniqueId(message.getMessageId())) { if (!cached) { message.setRegionDestination(regionDestination); @@ -101,12 +104,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i batchList.addMessageLast(message); clearIterator(true); recovered = true; - storeHasMessages = true; } else if (!cached) { // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart if (message.isRecievedByDFBridge()) { // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true - LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); + if (LOG.isTraceEnabled()) { + LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); + } } else { LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); duplicate(message); @@ -201,7 +205,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i boolean disableCache = false; if (hasSpace()) { if (!isCacheEnabled() && size==0 && isStarted() && useCache) { - LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); + if (LOG.isTraceEnabled()) { + LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); + } setCacheEnabled(true); } if (isCacheEnabled()) { @@ -217,64 +223,48 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } if (disableCache && isCacheEnabled()) { - LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); + if (LOG.isTraceEnabled()) { + LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); + } + syncWithStore(node.getMessage()); setCacheEnabled(false); - syncWithStore(); } this.storeHasMessages = true; size++; return true; } - private void syncWithStore() throws Exception { + private void syncWithStore(Message currentAdd) throws Exception { + pruneLastCached(); if (lastCachedIds[SYNC_ADD] == null) { - // only async adds, lets wait on the potential last add and reset from there + // possibly only async adds, lets wait on the potential last add and reset from there for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { - MessageId lastStored = it.previous(); - Object futureOrLong = lastStored.getFutureOrSequenceLong(); + MessageId lastPending = it.previous(); + Object futureOrLong = lastPending.getFutureOrSequenceLong(); if (futureOrLong instanceof Future) { Future future = (Future) futureOrLong; if (future.isCancelled()) { continue; - } else { - try { - future.get(); - setLastCachedId(ASYNC_ADD, lastStored); - } catch (Exception ignored) {} } + try { + future.get(5, TimeUnit.SECONDS); + setLastCachedId(ASYNC_ADD, lastPending); + } catch (TimeoutException potentialDeadlock) { + LOG.warn("{} timed out waiting for async add", this, potentialDeadlock); + } catch (Exception cancelledOrTimeOutOrErrorWorstCaseWeReplay) {cancelledOrTimeOutOrErrorWorstCaseWeReplay.printStackTrace();} + } else { + setLastCachedId(ASYNC_ADD, lastPending); } + break; } if (lastCachedIds[ASYNC_ADD] != null) { - setBatch(lastCachedIds[ASYNC_ADD]); - } - } else { - // mix of async and sync - async can exceed sync only if next in sequence - for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { - MessageId candidate = it.next(); - final Object futureOrLong = candidate.getFutureOrSequenceLong(); - if (futureOrLong instanceof Future) { - Future future = (Future) futureOrLong; - if (future.isCancelled()) { - it.remove(); - } else { - try { - future.get(); - long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); - if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) { - setLastCachedId(SYNC_ADD, candidate); - } else { - // out of sequence, revert to sync state - LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size()); - break; - } - } catch (Exception ignored) {} - } + // ensure we don't skip current possibly sync add b/c we waited on the future + if (currentAdd.isRecievedByDFBridge() || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) { + setBatch(lastCachedIds[ASYNC_ADD]); } } - if (lastCachedIds[SYNC_ADD] != null) { - setBatch(lastCachedIds[SYNC_ADD]); - } - + } else { + setBatch(lastCachedIds[SYNC_ADD]); } // cleanup lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; @@ -282,7 +272,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } private void trackLastCached(MessageReference node) { - if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) { + if (node.getMessageId().getFutureOrSequenceLong() instanceof Future || node.getMessage().isRecievedByDFBridge()) { pruneLastCached(); pendingCachedIds.add(node.getMessageId()); } else { @@ -305,6 +295,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } else { // complete setLastCachedId(ASYNC_ADD, candidate); + + // keep lock step with sync adds while order is preserved + if (lastCachedIds[SYNC_ADD] != null) { + long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); + if (Long.compare((Long)futureOrLong, next) == 0) { + setLastCachedId(SYNC_ADD, candidate); + } else { + // out of sequence, revert to sync state + if (LOG.isDebugEnabled()) { + LOG.debug("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size()); + } + } + } it.remove(); } } @@ -374,13 +377,17 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i this.batchResetNeeded = false; } if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) { + // avoid repeated trips to the store if there is nothing of interest + this.storeHasMessages = false; try { doFillBatch(); } catch (Exception e) { LOG.error("{} - Failed to fill batch", this, e); throw new RuntimeException(e); } - this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace; + if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) { + this.storeHasMessages = true; + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index 94dc817..9fb73c5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -94,7 +94,9 @@ class QueueStorePrefetch extends AbstractStoreCursor { @Override protected void setBatch(MessageId messageId) throws Exception { - LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator()); + if (LOG.isTraceEnabled()) { + LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator()); + } store.setBatch(messageId); batchResetNeeded = false; } @@ -109,4 +111,8 @@ class QueueStorePrefetch extends AbstractStoreCursor { } } + @Override + public String toString(){ + return super.toString() + ",store=" + store; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 8c747e8..901c769 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -165,4 +165,9 @@ public class ProxyMessageStore implements MessageStore { public void registerIndexListener(IndexListener indexListener) { delegate.registerIndexListener(indexListener); } + + @Override + public String toString() { + return delegate.toString(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index eb5d1c4..a18071b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -665,6 +665,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { StoredDestination sd = getStoredDestination(dest, tx); Long location = sd.messageIdIndex.get(tx, key); if (location != null) { + Long pending = sd.orderIndex.minPendingAdd(); + if (pending != null) { + location = Math.min(location, pending-1); + } sd.orderIndex.setBatch(tx, location); } else { LOG.warn("{} {} setBatch failed, location for {} not found in messageId index for {}", this, dest.getName(), identity.getFutureOrSequenceLong(), identity); @@ -714,6 +718,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { this.localDestinationSemaphore.release(); } + @Override + public String toString(){ + return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); + } } class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 554f1d3..4de5f16 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1767,7 +1767,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // StoredDestination related implementation methods. // ///////////////////////////////////////////////////////////////// - private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); + protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); static class MessageKeys { final String messageId; @@ -1886,6 +1886,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void trackPendingAddComplete(Long seq) { orderIndex.trackPendingAddComplete(seq); } + + @Override + public String toString() { + return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); + } } protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { @@ -2337,7 +2342,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return 0; } - private String key(KahaDestination destination) { + protected String key(KahaDestination destination) { return destination.getType().getNumber() + ":" + destination.getName(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 1333412..4735144 100755 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -1033,6 +1033,7 @@ <exclude>org/apache/activemq/store/kahadb/disk/index/HashIndexTest.*</exclude> <exclude>org/apache/activemq/store/kahadb/disk/index/ListIndexTest.*</exclude> <exclude>org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.*</exclude> + <exclude>org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.*</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java new file mode 100644 index 0000000..f8fab10 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -0,0 +1,517 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.usage.SystemUsage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StoreQueueCursorOrderTest { + private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursorOrderTest.class); + + ActiveMQQueue destination = new ActiveMQQueue("queue-" + + StoreQueueCursorOrderTest.class.getSimpleName()); + BrokerService brokerService; + + final static String mesageIdRoot = "11111:22222:0:"; + final int messageBytesSize = 1024; + final String text = new String(new byte[messageBytesSize]); + + @Before + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.setUseJmx(false); + brokerService.deleteAllMessages(); + brokerService.start(); + } + + protected BrokerService createBroker() throws Exception { + return new BrokerService(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + } + + @Test + public void tesBlockedFuture() throws Exception { + final int count = 2; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + FutureTask<Long> future = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + } + }, 2l) {}; + msg.getMessageId().setFutureOrSequenceLong(future); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // second message will flip the cache but will be stored before the future task + msg = getMessage(1); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws Exception { + final int count = 2; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef = msg; + FutureTask<Long> future = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef.getMessageId().setFutureOrSequenceLong(1l); + } + }, 1l) {}; + msg.getMessageId().setFutureOrSequenceLong(future); + Executors.newSingleThreadExecutor().submit(future); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // second message will flip the cache but will be stored before the future task + msg = getMessage(1); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception { + final int count = 2; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef = msg; + FutureTask<Long> future = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef.getMessageId().setFutureOrSequenceLong(0l); + } + }, 0l) {}; + msg.getMessageId().setFutureOrSequenceLong(future); + Executors.newSingleThreadExecutor().submit(future); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // second message will flip the cache but will be stored before the future task + msg = getMessage(1); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRe2f = msg; + FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRe2f.getMessageId().setFutureOrSequenceLong(1l); + } + }, 1l) {}; + msg.getMessageId().setFutureOrSequenceLong(future2); + Executors.newSingleThreadExecutor().submit(future2); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch set", 1l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testSetBatchWithFuture() throws Exception { + final int count = 4; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6)); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef = msg; + FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef.getMessageId().setFutureOrSequenceLong(0l); + } + }, 0l) {}; + msg.getMessageId().setFutureOrSequenceLong(future0); + underTest.addMessageLast(msg); + Executors.newSingleThreadExecutor().submit(future0); + + + msg = getMessage(1); + messages[3] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef1 = msg; + FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef1.getMessageId().setFutureOrSequenceLong(3l); + } + }, 3l) {}; + msg.getMessageId().setFutureOrSequenceLong(future1); + underTest.addMessageLast(msg); + + + msg = getMessage(2); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // out of order future + Executors.newSingleThreadExecutor().submit(future1); + + // sync add to flip cache + msg = getMessage(3); + messages[2] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(3l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(count); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testSetBatch() throws Exception { + final int count = 3; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + + ActiveMQTextMessage msg = getMessage(0); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(0l); + underTest.addMessageLast(msg); + + msg = getMessage(1); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + msg = getMessage(2); + messages[2] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(2l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + private ActiveMQTextMessage getMessage(int i) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + MessageId id = new MessageId(mesageIdRoot + i); + id.setBrokerSequenceId(i); + id.setProducerSequenceId(i); + message.setMessageId(id); + message.setDestination(destination); + message.setPersistent(true); + message.setResponseRequired(true); + message.setText("Msg:" + i + " " + text); + assertEquals(message.getMessageId().getProducerSequenceId(), i); + return message; + } + + class TestMessageStore extends AbstractMessageStore { + final Message[] messages; + public AtomicLong batch = new AtomicLong(); + + public TestMessageStore(Message[] messages, ActiveMQDestination dest) { + super(dest); + this.messages = messages; + } + + @Override + public void addMessage(ConnectionContext context, Message message) throws IOException { + + } + + @Override + public Message getMessage(MessageId identity) throws IOException { + return null; + } + + @Override + public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { + + } + + @Override + public void removeAllMessages(ConnectionContext context) throws IOException { + + } + + @Override + public void recover(MessageRecoveryListener container) throws Exception { + + } + + @Override + public int getMessageCount() throws IOException { + return 0; + } + + @Override + public void resetBatching() { + + } + @Override + public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { + for (int i=batch.intValue();i<messages.length;i++) { + LOG.info("recovered index:" + i); + listener.recoverMessage(messages[i]); + } + } + + @Override + public void setBatch(MessageId message) { + batch.set((Long)message.getFutureOrSequenceLong()); + batch.incrementAndGet(); + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java new file mode 100644 index 0000000..300bec1 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java @@ -0,0 +1,641 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +/* + * pause producers if consumers stall and verify broker drained before resume + */ +@RunWith(Parameterized.class) +public class AMQ5266StarvedConsumerTest { + static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class); + String activemqURL; + BrokerService brokerService; + private EmbeddedDataSource dataSource; + + public int messageSize = 1000; + + @Parameterized.Parameter(0) + public int publisherMessagesPerThread = 1000; + + @Parameterized.Parameter(1) + public int publisherThreadCount = 20; + + @Parameterized.Parameter(2) + public int consumerThreadsPerQueue = 5; + + @Parameterized.Parameter(3) + public int destMemoryLimit = 50 * 1024; + + @Parameterized.Parameter(4) + public boolean useCache = true; + + @Parameterized.Parameter(5) + public boolean useDefaultStore = false; + + @Parameterized.Parameter(6) + public boolean optimizeDispatch = false; + private AtomicBoolean didNotReceive = new AtomicBoolean(false); + + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") + public static Iterable<Object[]> parameters() { + return Arrays.asList(new Object[][]{ + {1000, 40, 5, 1024*1024, false, false, true}, + }); + } + + public int consumerBatchSize = 5; + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + + dataSource = new EmbeddedDataSource(); + dataSource.setDatabaseName("target/derbyDb"); + dataSource.setCreateDatabase("create"); + + JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + jdbcPersistenceAdapter.setDataSource(dataSource); + jdbcPersistenceAdapter.setUseLock(false); + + if (!useDefaultStore) { + brokerService.setPersistenceAdapter(jdbcPersistenceAdapter); + } else { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); + } + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseJmx(false); + brokerService.setAdvisorySupport(false); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! + defaultEntry.setMaxAuditDepth(publisherThreadCount); + defaultEntry.setEnableAudit(true); + defaultEntry.setUseCache(useCache); + defaultEntry.setMaxPageSize(1000); + defaultEntry.setOptimizedDispatch(optimizeDispatch); + defaultEntry.setMemoryLimit(destMemoryLimit); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + + brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + + TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + activemqURL = transportConnector.getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + try { + dataSource.setShutdownDatabase("shutdown"); + dataSource.getConnection(); + } catch (Exception ignored) {} + } + + CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() { + @Override + public void run() { + // wait for queue size to go to zero + try { + while (((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) { + LOG.info("Total messageCount: " + ((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + TimeUnit.SECONDS.sleep(5); + } + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + @Test(timeout = 30 * 60 * 1000) + public void test() throws Exception { + + String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9"; + + int consumerWaitForConsumption = 5 * 60 * 1000; + + ExportQueuePublisher publisher = null; + ExportQueueConsumer consumer = null; + + LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); + LOG.info("\nBuilding Publisher..."); + + publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); + + LOG.info("Building Consumer..."); + + consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); + + + LOG.info("Starting Publisher..."); + + publisher.start(); + + LOG.info("Starting Consumer..."); + + consumer.start(); + + int distinctPublishedCount = 0; + + + LOG.info("Waiting For Publisher Completion..."); + + publisher.waitForCompletion(); + + List publishedIds = publisher.getIDs(); + distinctPublishedCount = new TreeSet(publishedIds).size(); + + LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); + + + long endWait = System.currentTimeMillis() + consumerWaitForConsumption; + while (!consumer.completed() && System.currentTimeMillis() < endWait) { + try { + int secs = (int) (endWait - System.currentTimeMillis()) / 1000; + LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); + if (!useDefaultStore) { + DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); + } + Thread.sleep(10000); + } catch (Exception e) { + } + } + + LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down."); + + consumer.shutdown(); + + TimeUnit.SECONDS.sleep(2); + LOG.info("DB Contents START"); + if (!useDefaultStore) { + DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); + } + LOG.info("DB Contents END"); + + LOG.info("Consumer Stats:"); + + for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) { + + List<String> idList = entry.getValue(); + + int distinctConsumed = new TreeSet<String>(idList).size(); + + StringBuilder sb = new StringBuilder(); + sb.append(" Queue: " + entry.getKey() + + " -> Total Messages Consumed: " + idList.size() + + ", Distinct IDs Consumed: " + distinctConsumed); + + int diff = distinctPublishedCount - distinctConsumed; + sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); + LOG.info(sb.toString()); + + assertEquals("expect to get all messages!", 0, diff); + + } + } + + public class ExportQueuePublisher { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + // Collection of distinct IDs that the publisher has published. + // After a message is published, its UUID will be written to this list for tracking. + // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. + //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>()); + private List<String> ids = Collections.synchronizedList(new ArrayList<String>()); + private List<PublisherThread> threads; + + public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + + threads = new ArrayList<PublisherThread>(); + + // Build the threads and tell them how many messages to publish + for (int i = 0; i < threadCount; i++) { + PublisherThread pt = new PublisherThread(messagesPerThread); + threads.add(pt); + } + } + + public List<String> getIDs() { + return ids; + } + + // Kick off threads + public void start() throws Exception { + + for (PublisherThread pt : threads) { + pt.start(); + } + } + + // Wait for threads to complete. They will complete once they've published all of their messages. + public void waitForCompletion() throws Exception { + + for (PublisherThread pt : threads) { + pt.join(); + pt.close(); + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + connectionFactory.setWatchTopicAdvisories(false); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + private class PublisherThread extends Thread { + + private int count; + private QueueConnection qc; + private Session session; + private MessageProducer mp; + private Queue q; + + private PublisherThread(int count) throws Exception { + + this.count = count; + + // Each Thread has its own Connection and Session, so no sync worries + qc = newQueueConnection(); + session = newSession(qc); + + // In our code, when publishing to multiple queues, + // we're using composite destinations like below + q = new ActiveMQQueue(activemqQueues); + mp = session.createProducer(null); + } + + public void run() { + + try { + + // Loop until we've published enough messages + while (count-- > 0) { + + TextMessage tm = session.createTextMessage(getMessageText()); + String id = UUID.randomUUID().toString(); + tm.setStringProperty("KEY", id); + ids.add(id); // keep track of the key to compare against consumer + + mp.send(q, tm); + session.commit(); + + if (didNotReceive.get()) { + globalProducerHalt.await(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Called by waitForCompletion + public void close() { + + try { + mp.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + } + } + } + + } + + String messageText; + private String getMessageText() { + + if (messageText == null) { + + synchronized (this) { + + if (messageText == null) { + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i++) { + sb.append("X"); + } + messageText = sb.toString(); + } + } + } + + return messageText; + } + + + public class ExportQueueConsumer { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private final int totalToExpect; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + private String[] queues = null; + // Map of IDs that were consumed, keyed by queue name. + // We'll compare these against what was published to know if any got stuck or dropped. + private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>(); + private Map<String, List<ConsumerThread>> threads; + + public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + this.totalToExpect = totalToExpect; + + queues = this.activemqQueues.split(","); + + for (int i = 0; i < queues.length; i++) { + queues[i] = queues[i].trim(); + } + + threads = new HashMap<String, List<ConsumerThread>>(); + + // For each queue, create a list of threads and set up the list of ids + for (String q : queues) { + + List<ConsumerThread> list = new ArrayList<ConsumerThread>(); + + idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>())); + + for (int i = 0; i < threadsPerQueue; i++) { + list.add(new ConsumerThread(q, batchSize)); + } + + threads.put(q, list); + } + } + + public Map<String, List<String>> getIDs() { + return idsByQueue; + } + + // Start the threads + public void start() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.start(); + } + } + } + + // Tell the threads to stop + // Then wait for them to stop + public void shutdown() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.shutdown(); + } + } + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.join(); + } + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + connectionFactory.setWatchTopicAdvisories(false); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + public boolean completed() { + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + if (ct.isAlive()) { + LOG.info("thread for {} is still alive.", ct.qName); + return false; + } + } + } + return true; + } + + private class ConsumerThread extends Thread { + + private int batchSize; + private QueueConnection qc; + private Session session; + private MessageConsumer mc; + private List<String> idList; + private boolean shutdown = false; + private String qName; + + private ConsumerThread(String queueName, int batchSize) throws Exception { + + this.batchSize = batchSize; + + // Each thread has its own connection and session + qName = queueName; + qc = newQueueConnection(); + session = newSession(qc); + Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); + mc = session.createConsumer(q); + + idList = idsByQueue.get(queueName); + } + + public void run() { + + try { + + int count = 0; + + // Keep reading as long as it hasn't been told to shutdown + while (!shutdown) { + + if (idList.size() >= totalToExpect) { + LOG.info("Got {} for q: {}", +idList.size(), qName); + session.commit(); + break; + } + Message m = mc.receive(4000); + + if (m != null) { + + // We received a non-null message, add the ID to our list + + idList.add(m.getStringProperty("KEY")); + + count++; + + // If we've reached our batch size, commit the batch and reset the count + + if (count == batchSize) { + session.commit(); + count = 0; + } + } else { + + // We didn't receive anything this time, commit any current batch and reset the count + + session.commit(); + count = 0; + + // Sleep a little before trying to read after not getting a message + + try { + if (idList.size() < totalToExpect) { + LOG.info("did not receive on {}, current count: {}", qName, idList.size()); + didNotReceive.set(true); + } + //sleep(3000); + } catch (Exception e) { + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + + // Once we exit, close everything + close(); + } + } + + public void shutdown() { + shutdown = true; + } + + public void close() { + + try { + mc.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + + } + } + } + } +}