Repository: activemq Updated Branches: refs/heads/master 57795bafc -> a0ba0bf4c
[AMQ-6667] gate cursor cache enablement on a single pending send and tidy up setbatch to always check outstanding async future list. Fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0ba0bf4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0ba0bf4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0ba0bf4 Branch: refs/heads/master Commit: a0ba0bf4c6beaf50ce5e021ef5e4d493119bb1ef Parents: 57795ba Author: gtully <[email protected]> Authored: Wed May 3 11:36:06 2017 +0100 Committer: gtully <[email protected]> Committed: Wed May 3 11:36:06 2017 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 13 +- .../region/cursors/AbstractStoreCursor.java | 91 +++-- .../region/cursors/QueueStorePrefetch.java | 7 + .../cursors/StoreQueueCursorOrderTest.java | 12 +- .../activemq/bugs/DuplicateFromStoreTest.java | 345 +++++++++++++++++++ 5 files changed, 430 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 3ead89d..c6241b0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -38,6 +38,7 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -114,6 +115,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // Messages that are paged in but have not yet been targeted at a subscription private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList(); + private AtomicInteger pendingSends = new AtomicInteger(0); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); @@ -149,7 +151,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index private final Object iteratingMutex = new Object(); - + // gate on enabling cursor cache to ensure no outstanding sync + // send before async sends resume + public boolean singlePendingSend() { + return pendingSends.get() <= 1; + } class TimeoutMessage implements Delayed { @@ -825,6 +831,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index ListenableFuture<Object> result = null; producerExchange.incrementSend(); + pendingSends.incrementAndGet(); do { checkUsage(context, producerExchange, message); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); @@ -845,6 +852,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // we may have a store in inconsistent state, so reset the cursor // before restarting normal broker operations resetNeeded = true; + pendingSends.decrementAndGet(); throw e; } } @@ -1837,6 +1845,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } final void messageSent(final ConnectionContext context, final Message msg) throws Exception { + pendingSends.decrementAndGet(); destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); destinationStatistics.getMessageSize().addSize(msg.getSize()); @@ -1983,7 +1992,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // store should have trapped duplicate in it's index, or cursor audit trapped insert // or producerBrokerExchange suppressed send. // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id - LOG.warn("{}, duplicate message {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessage()); + LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong()); if (store != null) { ConnectionContext connectionContext = createConnectionContext(); dropMessage(ref); http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 0295b33..aef7528 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -232,12 +232,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception { boolean disableCache = false; if (hasSpace()) { - if (!isCacheEnabled() && size==0 && isStarted() && useCache) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); - } - setCacheEnabled(true); - } if (isCacheEnabled()) { if (recoverMessage(node.getMessage(),true)) { trackLastCached(node); @@ -261,41 +255,68 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return true; } + @Override + public synchronized boolean isCacheEnabled() { + return super.isCacheEnabled() || enableCacheNow(); + } + + protected boolean enableCacheNow() { + boolean result = false; + if (canEnableCash()) { + setCacheEnabled(true); + result = true; + if (LOG.isTraceEnabled()) { + LOG.trace("{} enabling cache on empty store", this); + } + } + return result; + } + + protected boolean canEnableCash() { + return useCache && size==0 && hasSpace() && isStarted(); + } + private void syncWithStore(Message currentAdd) throws Exception { pruneLastCached(); - if (lastCachedIds[SYNC_ADD] == null) { - // possibly only async adds, lets wait on the potential last add and reset from there - for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { - MessageId lastPending = it.previous(); - Object futureOrLong = lastPending.getFutureOrSequenceLong(); - if (futureOrLong instanceof Future) { - Future future = (Future) futureOrLong; - if (future.isCancelled()) { - continue; - } - try { - future.get(5, TimeUnit.SECONDS); - setLastCachedId(ASYNC_ADD, lastPending); - } catch (CancellationException ok) { - continue; - } catch (TimeoutException potentialDeadlock) { - LOG.debug("{} timed out waiting for async add", this, potentialDeadlock); - } catch (Exception worstCaseWeReplay) { - LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay); - } - } else { + for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { + MessageId lastPending = it.previous(); + Object futureOrLong = lastPending.getFutureOrSequenceLong(); + if (futureOrLong instanceof Future) { + Future future = (Future) futureOrLong; + if (future.isCancelled()) { + continue; + } + try { + future.get(5, TimeUnit.SECONDS); setLastCachedId(ASYNC_ADD, lastPending); + } catch (CancellationException ok) { + continue; + } catch (TimeoutException potentialDeadlock) { + LOG.debug("{} timed out waiting for async add", this, potentialDeadlock); + } catch (Exception worstCaseWeReplay) { + LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay); } - break; + } else { + setLastCachedId(ASYNC_ADD, lastPending); } - if (lastCachedIds[ASYNC_ADD] != null) { - // ensure we don't skip current possibly sync add b/c we waited on the future - if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) { - setBatch(lastCachedIds[ASYNC_ADD]); + break; + } + + MessageId candidate = lastCachedIds[ASYNC_ADD]; + if (candidate != null) { + // ensure we don't skip current possibly sync add b/c we waited on the future + if (!isAsync(currentAdd) && Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) < 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("no set batch from async:" + candidate.getFutureOrSequenceLong() + " >= than current: " + currentAdd.getMessageId().getFutureOrSequenceLong() + ", " + this); } + candidate = null; } - } else { - setBatch(lastCachedIds[SYNC_ADD]); + } + if (candidate == null) { + candidate = lastCachedIds[SYNC_ADD]; + } + if (candidate != null) { + setBatch(candidate); } // cleanup lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; @@ -355,6 +376,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } else if (candidateOrSequenceLong != null && Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) { lastCachedIds[index] = candidate; + } if (LOG.isTraceEnabled()) { + LOG.trace("no set last cached[" + index + "] current:" + lastCacheFutureOrSequenceLong + " <= than candidate: " + candidateOrSequenceLong+ ", " + this); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index b10b2e2..1a13a59 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; class QueueStorePrefetch extends AbstractStoreCursor { private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class); private final MessageStore store; + private final Queue queue; private final Broker broker; /** @@ -46,6 +47,7 @@ class QueueStorePrefetch extends AbstractStoreCursor { */ public QueueStorePrefetch(Queue queue, Broker broker) { super(queue); + this.queue = queue; this.store = queue.getMessageStore(); this.broker = broker; @@ -88,6 +90,11 @@ class QueueStorePrefetch extends AbstractStoreCursor { } @Override + protected boolean canEnableCash() { + return super.canEnableCash() && queue.singlePendingSend(); + } + + @Override protected synchronized boolean isStoreEmpty() { try { return this.store.isEmpty(); http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java index 90b8428..5a1ab90 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -184,7 +184,7 @@ public class StoreQueueCursorOrderTest { msg = getMessage(1); messages[0] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(1l); + msg.getMessageId().setFutureOrSequenceLong(0l); underTest.addMessageLast(msg); @@ -354,7 +354,7 @@ public class StoreQueueCursorOrderTest { msg = getMessage(3); messages[2] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(3l); + msg.getMessageId().setFutureOrSequenceLong(2l); underTest.addMessageLast(msg); @@ -375,6 +375,14 @@ public class StoreQueueCursorOrderTest { } underTest.release(); assertEquals(count, dequeueCount); + + msg = getMessage(4); + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(4l); + underTest.addMessageLast(msg); + + assertTrue("cache enabled on empty store", underTest.isCacheEnabled()); + } @Test http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java new file mode 100644 index 0000000..4f4004c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.StoreUsage; +import org.apache.activemq.usage.SystemUsage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +public class DuplicateFromStoreTest { + static Logger LOG = LoggerFactory.getLogger(DuplicateFromStoreTest.class); + String activemqURL; + BrokerService broker; + + protected final static String DESTNAME = "TEST"; + protected final static int NUM_PRODUCERS = 100; + protected final static int NUM_CONSUMERS = 20; + + protected final static int NUM_MSGS = 40000; + protected final static int CONSUMER_SLEEP = 0; + protected final static int PRODUCER_SLEEP = 10; + + public static CountDownLatch producersFinished = new CountDownLatch(NUM_PRODUCERS); + public static CountDownLatch consumersFinished = new CountDownLatch(NUM_CONSUMERS ); + + public AtomicInteger totalMessagesToSend = new AtomicInteger(NUM_MSGS); + public AtomicInteger totalReceived = new AtomicInteger(0); + + public int messageSize = 16*1000; + + + @Before + public void startBroker() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector("tcp://0.0.0.0:0"); + + // Create <policyEntry> + PolicyEntry policy = new PolicyEntry(); + ActiveMQDestination dest = new ActiveMQQueue(">"); + policy.setDestination(dest); + policy.setMemoryLimit(10 * 1024 * 1024); // 10 MB + policy.setExpireMessagesPeriod(0); + policy.setEnableAudit(false); // allow any duplicates from the store to bubble up to the q impl + PolicyMap policies = new PolicyMap(); + policies.put(dest, policy); + broker.setDestinationPolicy(policies); + + // configure <systemUsage> + MemoryUsage memoryUsage = new MemoryUsage(); + memoryUsage.setPercentOfJvmHeap(70); + + StoreUsage storeUsage = new StoreUsage(); + storeUsage.setLimit(8 * 1024 * 1024 * 1024); // 8 gb + + SystemUsage memoryManager = new SystemUsage(); + memoryManager.setMemoryUsage(memoryUsage); + memoryManager.setStoreUsage(storeUsage); + broker.setSystemUsage(memoryManager); + + // configure KahaDB persistence + PersistenceAdapter kahadb = new KahaDBStore(); + ((KahaDBStore) kahadb).setConcurrentStoreAndDispatchQueues(true); + broker.setPersistenceAdapter(kahadb); + + // start broker + broker.start(); + broker.waitUntilStarted(); + + activemqURL = broker.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + @Test + public void testDuplicateMessage() throws Exception { + LOG.info("Testing for duplicate messages."); + + //create producer and consumer threads + ExecutorService producers = Executors.newFixedThreadPool(NUM_PRODUCERS); + ExecutorService consumers = Executors.newFixedThreadPool(NUM_CONSUMERS); + + createOpenwireClients(producers, consumers); + + LOG.info("All producers and consumers got started. Awaiting their termination"); + producersFinished.await(100, TimeUnit.MINUTES); + LOG.info("All producers have terminated."); + + consumersFinished.await(100, TimeUnit.MINUTES); + LOG.info("All consumers have terminated."); + + producers.shutdownNow(); + consumers.shutdownNow(); + + assertEquals("no messages pending, i.e. dlq empty", 0l, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + + // validate cache can be enabled if disabled + + } + + + protected void createOpenwireClients(ExecutorService producers, ExecutorService consumers) { + for (int i = 0; i < NUM_CONSUMERS; i++) { + LOG.trace("Creating consumer for destination " + DESTNAME); + Consumer consumer = new Consumer(DESTNAME, false); + consumers.submit(consumer); + // wait for consumer to signal it has fully initialized + synchronized(consumer.init) { + try { + consumer.init.wait(); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + } + } + + for (int i = 0; i < NUM_PRODUCERS; i++) { + LOG.trace("Creating producer for destination " + DESTNAME ); + Producer producer = new Producer(DESTNAME, false, 0); + producers.submit(producer); + } + } + + class Producer implements Runnable { + + Logger log = LOG; + protected String destName = "TEST"; + protected boolean isTopicDest = false; + + + public Producer(String dest, boolean isTopic, int ttl) { + this.destName = dest; + this.isTopicDest = isTopic; + } + + + /** + * Connect to broker and constantly send messages + */ + public void run() { + + Connection connection = null; + Session session = null; + MessageProducer producer = null; + + try { + ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL); + connection = amq.createConnection(); + + connection.setExceptionListener(new javax.jms.ExceptionListener() { + public void onException(javax.jms.JMSException e) { + e.printStackTrace(); + } + }); + connection.start(); + + // Create a Session + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination; + if (isTopicDest) { + // Create the destination (Topic or Queue) + destination = session.createTopic(destName); + } else { + destination = session.createQueue(destName); + } + // Create a MessageProducer from the Session to the Topic or Queue + producer = session.createProducer(destination); + + // Create message + long counter = 0; + //enlarge msg to 16 kb + int msgSize = 16 * 1024; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.setLength(msgSize + 15); + stringBuilder.append("Message: "); + stringBuilder.append(counter); + for (int j = 0; j < (msgSize / 10); j++) { + stringBuilder.append("XXXXXXXXXX"); + } + String text = stringBuilder.toString(); + TextMessage message = session.createTextMessage(text); + + // send message + while (totalMessagesToSend.decrementAndGet() >= 0) { + producer.send(message); + log.debug("Sent message: " + counter); + counter++; + + if ((counter % 10000) == 0) + log.info("sent " + counter + " messages"); + + Thread.sleep(PRODUCER_SLEEP); + } + } catch (Exception ex) { + log.error(ex.getMessage()); + return; + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (Exception ignored) { + } finally { + producersFinished.countDown(); + } + } + log.debug("Closing producer for " + destName); + } + } + + class Consumer implements Runnable { + + public Object init = new Object(); + protected String queueName = "TEST"; + boolean isTopic = false; + + Logger log = LOG; + + public Consumer(String destName, boolean topic) { + this.isTopic = topic; + this.queueName = destName; + } + + /** + * connect to broker and receive messages + */ + public void run() { + + Connection connection = null; + Session session = null; + MessageConsumer consumer = null; + + try { + ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL); + connection = amq.createConnection(); + connection.setExceptionListener(new javax.jms.ExceptionListener() { + public void onException(javax.jms.JMSException e) { + e.printStackTrace(); + } + }); + connection.start(); + // Create a Session + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + // Create the destination (Topic or Queue) + Destination destination = null; + if (isTopic) + destination = session.createTopic(queueName); + else + destination = session.createQueue(queueName); + + //Create a MessageConsumer from the Session to the Topic or Queue + consumer = session.createConsumer(destination); + + synchronized (init) { + init.notifyAll(); + } + + // Wait for a message + long counter = 0; + while (totalReceived.get() < NUM_MSGS) { + Message message2 = consumer.receive(5000); + + if (message2 instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message2; + String text = textMessage.getText(); + log.debug("Received: " + text.substring(0, 50)); + } else { + if (totalReceived.get() < NUM_MSGS) { + log.error("Received message of unsupported type. Expecting TextMessage. " + message2); + } + break; + } + if (message2 != null) { + counter++; + totalReceived.incrementAndGet(); + if ((counter % 10000) == 0) + log.info("received " + counter + " messages"); + + Thread.sleep(CONSUMER_SLEEP); + } + } + } catch (Exception e) { + log.error("Error in Consumer: " + e.getMessage()); + return; + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (Exception ignored) { + } finally { + consumersFinished.countDown(); + } + } + } + } +}
