Repository: activemq-artemis Updated Branches: refs/heads/master 633b9c75d -> f798178c6
ARTEMIS-1059 option to monitor Paging counters Adding System.property artemis.debug.paging.interval (in seconds) to debug paging counters. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1c88c06a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1c88c06a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1c88c06a Branch: refs/heads/master Commit: 1c88c06abb1d1ac93148bd59c2c9b307df574e83 Parents: 633b9c7 Author: Clebert Suconic <[email protected]> Authored: Wed Mar 22 12:17:20 2017 -0400 Committer: Justin Bertram <[email protected]> Committed: Thu Mar 23 09:35:40 2017 -0500 ---------------------------------------------------------------------- .../artemis/core/paging/PagingManager.java | 4 ++ .../artemis/core/paging/PagingStoreFactory.java | 11 ++++ .../core/paging/impl/PagingManagerImpl.java | 41 ++++++++++++++ .../paging/impl/PagingStoreFactoryDatabase.java | 10 ++++ .../core/paging/impl/PagingStoreFactoryNIO.java | 10 ++++ .../tests/integration/client/ConsumerTest.java | 56 ++++++++++++++++++++ 6 files changed, 132 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index 35d2235..4d472e1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -107,4 +107,8 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository boolean isDiskFull(); + default long getGlobalSize() { + return 0; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java index a90fd44..75799d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -49,4 +50,14 @@ public interface PagingStoreFactory { void injectMonitor(FileStoreMonitor monitor) throws Exception; + default ScheduledExecutorService getScheduledExecutor() { + return null; + } + + default Executor newExecutor() { + return null; + } + + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 8c2e1f2..e036c16 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -31,6 +32,7 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -40,6 +42,8 @@ import org.jboss.logging.Logger; public final class PagingManagerImpl implements PagingManager { + private static final int ARTEMIS_DEBUG_PAGING_INTERVAL = Integer.valueOf(System.getProperty("artemis.debug.paging.interval", "0")); + private static final Logger logger = Logger.getLogger(PagingManagerImpl.class); private volatile boolean started = false; @@ -62,6 +66,8 @@ public final class PagingManagerImpl implements PagingManager { private final AtomicLong globalSizeBytes = new AtomicLong(0); + private final AtomicLong numberOfMessages = new AtomicLong(0); + private final long maxSize; private volatile boolean cleanupEnabled = true; @@ -70,6 +76,8 @@ public final class PagingManagerImpl implements PagingManager { private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>(); + private ActiveMQScheduledComponent scheduledComponent = null; + // Static // -------------------------------------------------------------------------------------------------------------------------- @@ -109,6 +117,13 @@ public final class PagingManagerImpl implements PagingManager { @Override public PagingManagerImpl addSize(int size) { + + if (size > 0) { + numberOfMessages.incrementAndGet(); + } else { + numberOfMessages.decrementAndGet(); + } + long newSize = globalSizeBytes.addAndGet(size); if (newSize < 0) { @@ -121,6 +136,11 @@ public final class PagingManagerImpl implements PagingManager { return this; } + @Override + public long getGlobalSize() { + return globalSizeBytes.get(); + } + protected void checkMemoryRelease() { if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) { Iterator<PagingStore> storeIterator = blockedStored.iterator(); @@ -314,12 +334,28 @@ public final class PagingManagerImpl implements PagingManager { reloadStores(); + if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) { + this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false) { + @Override + public void run() { + debug(); + } + }; + + this.scheduledComponent.start(); + + } + started = true; } finally { unlock(); } } + public void debug() { + logger.info("size = " + globalSizeBytes + " bytes, messages = " + numberOfMessages); + } + @Override public synchronized void stop() throws Exception { if (!started) { @@ -327,6 +363,11 @@ public final class PagingManagerImpl implements PagingManager { } started = false; + if (scheduledComponent != null) { + this.scheduledComponent.stop(); + this.scheduledComponent = null; + } + lock(); try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index 7917165..b274848 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -79,6 +79,16 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { private JDBCSequentialFile directoryList; + @Override + public ScheduledExecutorService getScheduledExecutor() { + return scheduledExecutor; + } + + @Override + public Executor newExecutor() { + return executorFactory.getExecutor(); + } + private boolean started = false; public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index 823baf8..c65b913 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -93,6 +93,16 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { // Public -------------------------------------------------------- @Override + public ScheduledExecutorService getScheduledExecutor() { + return scheduledExecutor; + } + + @Override + public Executor newExecutor() { + return executorFactory.getExecutor(); + } + + @Override public void stop() { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 037385f..4a0ef04 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -308,8 +308,51 @@ public class ConsumerTest extends ActiveMQTestBase { } + public void internalSimpleSend(int protocolSender, int protocolConsumer) throws Throwable { + + ConnectionFactory factorySend = createFactory(protocolSender); + ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer); + + + Connection connection = factorySend.createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(QUEUE.toString()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage msg = session.createTextMessage("hello"); + msg.setIntProperty("mycount", 0); + producer.send(msg); + connection.close(); + + connection = factoryConsume.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(QUEUE.toString()); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(0, message.getIntProperty("mycount")); + Assert.assertEquals("hello", message.getText()); + + Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100); + Assert.assertEquals(0, server.getPagingManager().getGlobalSize()); + + } finally { + connection.close(); + } + } + + public void internalSend(int protocolSender, int protocolConsumer) throws Throwable { + internalSimpleSend(protocolSender, protocolConsumer); + ConnectionFactory factorySend = createFactory(protocolSender); ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer); @@ -414,6 +457,19 @@ public class ConsumerTest extends ActiveMQTestBase { TextMessage msg = (TextMessage) consumer.receive(1000); Assert.assertEquals("testSelectorExampleFromSpecs:2", msg.getText()); + consumer.close(); + + consumer = session.createConsumer(queue); + msg = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(msg); + + Assert.assertNull(consumer.receiveNoWait()); + + Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100); + + + Assert.assertEquals(0, server.getPagingManager().getGlobalSize()); + } finally { connection.close(); }
