Repository: activemq Updated Branches: refs/heads/trunk a0c42a61d -> f19add11d
add info log message for a queue purge event Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f19add11 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f19add11 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f19add11 Branch: refs/heads/trunk Commit: f19add11deb07ce31c5f9bfa5aeccf6960b22f12 Parents: a0c42a6 Author: gtully <gary.tu...@gmail.com> Authored: Wed Sep 17 15:57:55 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Wed Sep 17 15:57:55 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 5 ++- .../activemq/broker/region/QueuePurgeTest.java | 36 +++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f19add11/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c7f768e..c4d49bd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1213,6 +1213,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List<MessageReference> list = null; + long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { doPageIn(true, false); // signal no expiry processing needed. pagedInMessagesLock.readLock().lock(); @@ -1234,7 +1235,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); if (this.destinationStatistics.getMessages().getCount() > 0) { - LOG.warn("{} after purge complete, message count stats report: {}", getActiveMQDestination().getQualifiedName(), this.destinationStatistics.getMessages().getCount()); + LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); + } else { + LOG.info("{} purged of {} messages", getActiveMQDestination().getQualifiedName(), originalMessageCount); } gc(); this.destinationStatistics.getMessages().setCount(0); http://git-wip-us.apache.org/repos/asf/activemq/blob/f19add11/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index a03e7bc..c017e87 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region; import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -38,6 +39,10 @@ import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,10 +94,39 @@ public class QueuePurgeTest extends CombinationTestSupport { createProducerAndSendMessages(NUM_TO_SEND); QueueViewMBean proxy = getProxyToQueueViewMBean(); LOG.info("purging.."); - proxy.purge(); + + org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.region.Queue.class); + final AtomicBoolean gotPurgeLogMessage = new AtomicBoolean(false); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getMessage() instanceof String) { + String message = (String) event.getMessage(); + if (message.contains("purged of " + NUM_TO_SEND +" messages")) { + LOG.info("Received a log message: {} ", event.getMessage()); + gotPurgeLogMessage.set(true); + } + } + } + }; + + Level level = log4jLogger.getLevel(); + log4jLogger.setLevel(Level.INFO); + log4jLogger.addAppender(appender); + try { + + proxy.purge(); + + } finally { + log4jLogger.setLevel(level); + log4jLogger.removeAppender(appender); + } + assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0, proxy.getQueueSize()); assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled()); + assertTrue("got expected info purge log message", gotPurgeLogMessage.get()); } public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {