Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 1fef3dcc4 -> ade0003c3
ARTEMIS-1710 Allow management msgs to exceed global-max-size limit (cherry picked from commit 270b383e80296fb47dba6a719ef1616ddcaab1ef) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ade0003c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ade0003c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ade0003c Branch: refs/heads/2.6.x Commit: ade0003c33ba1cd3200b42e877e6fd0e5a3997e3 Parents: 1fef3dc Author: Francesco Nigro <nigro....@gmail.com> Authored: Sat Nov 3 16:37:26 2018 +0100 Committer: andytaylor <andy.tayl...@gmail.com> Committed: Mon Nov 5 10:05:13 2018 +0000 ---------------------------------------------------------------------- .../artemis/cli/commands/tools/DBOption.java | 4 +- .../amqp/broker/AMQPSessionCallback.java | 8 +- .../core/protocol/openwire/amq/AMQSession.java | 25 ++++-- .../management/impl/AddressControlImpl.java | 28 ++++-- .../core/paging/impl/PagingManagerImpl.java | 18 +++- .../journal/AbstractJournalStorageManager.java | 3 + .../core/postoffice/impl/BindingsImpl.java | 6 +- .../core/postoffice/impl/PostOfficeImpl.java | 6 +- .../artemis/core/server/QueueConfig.java | 8 +- .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../server/impl/PostOfficeJournalLoader.java | 2 +- .../core/server/impl/ScaleDownHandler.java | 13 ++- .../core/server/impl/ServerSessionImpl.java | 4 +- .../core/server/files/FileMoveManagerTest.java | 3 +- .../broker/region/policy/DestinationProxy.java | 15 +++- .../integration/paging/GlobalPagingTest.java | 95 ++++++++++++++++++++ .../replication/ReplicationTest.java | 2 +- .../core/postoffice/impl/BindingsImplTest.java | 2 +- 18 files changed, 203 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java index aa0b47a..2f2d8e9 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java @@ -241,11 +241,11 @@ public class DBOption extends OptionalLocking { storageManager, 1000L, scheduledExecutorService, executorFactory, false, null); - pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository); + pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress()); } else { storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory); PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null); - pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository); + pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 3d8ae5a..61816af 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -486,7 +486,7 @@ public class AMQPSessionCallback implements SessionCallback { try { PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); - if (store.isRejectingMessages()) { + if (store != null && store.isRejectingMessages()) { // We drop pre-settled messages (and abort any associated Tx) if (delivery.remotelySettled()) { if (transaction != null) { @@ -585,7 +585,11 @@ public class AMQPSessionCallback implements SessionCallback { pagingManager.checkMemory(runnable); } else { final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); - store.checkMemory(runnable); + if (store != null) { + store.checkMemory(runnable); + } else { + runnable.run(); + } } } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 0429297..bad9bc5 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -418,9 +418,13 @@ public class AMQSession implements SessionCallback { //non-persistent messages goes here, by default we stop reading from //transport connection.getTransportConnection().setAutoRead(false); - if (!store.checkMemory(enableAutoReadAndTtl)) { - enableAutoReadAndTtl(); - throw new ResourceAllocationException("Queue is full " + address); + if (store != null) { + if (!store.checkMemory(enableAutoReadAndTtl)) { + enableAutoReadAndTtl(); + throw new ResourceAllocationException("Queue is full " + address); + } + } else { + enableAutoReadAndTtl.run(); } getCoreSession().send(coreMsg, false, dest.isTemporary()); @@ -443,7 +447,7 @@ public class AMQSession implements SessionCallback { final AtomicInteger count, final org.apache.activemq.artemis.api.core.Message coreMsg, final SimpleString address) throws ResourceAllocationException { - if (!store.checkMemory(false, () -> { + final Runnable task = () -> { Exception exceptionToSend = null; try { @@ -496,10 +500,15 @@ public class AMQSession implements SessionCallback { }); } } - })) { - this.connection.getContext().setDontSendReponse(false); - connection.enableTtl(); - throw new ResourceAllocationException("Queue is full " + address); + }; + if (store != null) { + if (!store.checkMemory(false, task)) { + this.connection.getContext().setDontSendReponse(false); + connection.enableTtl(); + throw new ResourceAllocationException("Queue is full " + address); + } + } else { + task.run(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 0eb39e0..b24c370 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -201,17 +201,29 @@ public class AddressControlImpl extends AbstractControl implements AddressContro public long getNumberOfBytesPerPage() throws Exception { clearIO(); try { - return pagingManager.getPageStore(addressInfo.getName()).getPageSizeBytes(); + final PagingStore pagingStore = getPagingStore(); + if (pagingStore == null) { + return 0; + } + return pagingStore.getPageSizeBytes(); } finally { blockOnIO(); } } + private PagingStore getPagingStore() throws Exception { + return pagingManager.getPageStore(addressInfo.getName()); + } + @Override public long getAddressSize() throws Exception { clearIO(); try { - return pagingManager.getPageStore(addressInfo.getName()).getAddressSize(); + final PagingStore pagingStore = getPagingStore(); + if (pagingStore == null) { + return 0; + } + return pagingStore.getAddressSize(); } finally { blockOnIO(); } @@ -240,7 +252,11 @@ public class AddressControlImpl extends AbstractControl implements AddressContro public boolean isPaging() throws Exception { clearIO(); try { - return pagingManager.getPageStore(addressInfo.getName()).isPaging(); + final PagingStore pagingStore = getPagingStore(); + if (pagingStore == null) { + return false; + } + return pagingStore.isPaging(); } finally { blockOnIO(); } @@ -250,12 +266,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro public int getNumberOfPages() throws Exception { clearIO(); try { - PagingStore pageStore = pagingManager.getPageStore(addressInfo.getName()); + final PagingStore pageStore = getPagingStore(); - if (!pageStore.isPaging()) { + if (pageStore == null || !pageStore.isPaging()) { return 0; } else { - return pagingManager.getPageStore(addressInfo.getName()).getNumberOfPages(); + return pageStore.getNumberOfPages(); } } finally { blockOnIO(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 07f5ad7..357fda2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -86,6 +86,8 @@ public final class PagingManagerImpl implements PagingManager { private ActiveMQScheduledComponent scheduledComponent = null; + private final SimpleString managementAddress; + // Static // -------------------------------------------------------------------------------------------------------------------------- @@ -105,17 +107,25 @@ public final class PagingManagerImpl implements PagingManager { public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final long maxSize) { + final long maxSize, + final SimpleString managementAddress) { pagingStoreFactory = pagingSPI; this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); this.maxSize = maxSize; this.memoryExecutor = pagingSPI.newExecutor(); + this.managementAddress = managementAddress; } public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<AddressSettings> addressSettingsRepository) { - this(pagingSPI, addressSettingsRepository, -1); + this(pagingSPI, addressSettingsRepository, -1, null); + } + + public PagingManagerImpl(final PagingStoreFactory pagingSPI, + final HierarchicalRepository<AddressSettings> addressSettingsRepository, + final SimpleString managementAddress) { + this(pagingSPI, addressSettingsRepository, -1, managementAddress); } @Override @@ -329,6 +339,9 @@ public final class PagingManagerImpl implements PagingManager { */ @Override public PagingStore getPageStore(final SimpleString storeName) throws Exception { + if (managementAddress != null && storeName.startsWith(managementAddress)) { + return null; + } PagingStore store = stores.get(storeName); if (store != null) { @@ -438,6 +451,7 @@ public final class PagingManagerImpl implements PagingManager { } private PagingStore newStore(final SimpleString address) throws Exception { + assert managementAddress == null || (managementAddress != null && !address.startsWith(managementAddress)); syncLock.readLock().lock(); try { PagingStore store = stores.get(address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 166b35b..e681f07 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1244,6 +1244,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp if (queueInfo != null) { SimpleString address = queueInfo.getAddress(); PagingStore store = pagingManager.getPageStore(address); + if (store == null) { + return null; + } subs = store.getCursorProvider().getSubscription(queueID); pageSubscriptions.put(queueID, subs); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index c20e988..56abddb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -62,13 +61,10 @@ public final class BindingsImpl implements Bindings { private final GroupingHandler groupingHandler; - private final PagingStore pageStore; - private final SimpleString name; - public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, final PagingStore pageStore) { + public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) { this.groupingHandler = groupingHandler; - this.pageStore = pageStore; this.name = name; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 68df53d..3bf3d86 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1200,7 +1200,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) { PagingStore store = pagingManager.getPageStore(entry.getKey()); - if (storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) { + if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) { if (message.isLargeMessage()) { confirmLargeMessageSend(tx, message); } @@ -1564,9 +1564,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public Bindings createBindings(final SimpleString address) throws Exception { + public Bindings createBindings(final SimpleString address) { GroupingHandler groupingHandler = server.getGroupingHandler(); - BindingsImpl bindings = new BindingsImpl(address, groupingHandler, pagingManager.getPageStore(address)); + BindingsImpl bindings = new BindingsImpl(address, groupingHandler); if (groupingHandler != null) { groupingHandler.addListener(bindings); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index 75f859d..6b1c284 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.FilterUtils; import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; public final class QueueConfig { @@ -163,7 +164,12 @@ public final class QueueConfig { final PageSubscription pageSubscription; if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) { try { - pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable); + final PagingStore pageStore = this.pagingManager.getPageStore(address); + if (pageStore != null) { + pageSubscription = pageStore.getCursorProvider().createSubscription(id, filter, durable); + } else { + pageSubscription = null; + } } catch (Exception e) { throw new IllegalStateException(e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 059f6bd..80e4217 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2144,7 +2144,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public PagingManager createPagingManager() throws Exception { - return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize()); + return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getManagementAddress()); } protected PagingStoreFactory getPagingStoreFactory() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index f9ec964..0132818 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -358,7 +358,7 @@ public class PostOfficeJournalLoader implements JournalLoader { // This can't be true! assert (perQueue != null); - if (store.checkPageFileExists(pageId.intValue())) { + if (store != null && store.checkPageFileExists(pageId.intValue())) { // on this case we need to recalculate the records Page pg = store.createPage(pageId.intValue()); pg.open(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index 02fe1bf..7585165 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -154,7 +154,9 @@ public class ScaleDownHandler { Transaction tx = new TransactionImpl(storageManager); - pageStore.disableCleanup(); + if (pageStore != null) { + pageStore.disableCleanup(); + } try { @@ -240,8 +242,10 @@ public class ScaleDownHandler { return messageCount; } finally { - pageStore.enableCleanup(); - pageStore.getCursorProvider().scheduleCleanup(); + if (pageStore != null) { + pageStore.enableCleanup(); + pageStore.getCursorProvider().scheduleCleanup(); + } } } @@ -556,6 +560,9 @@ public class ScaleDownHandler { public boolean lookup(MessageReference reference) throws Exception { if (reference.isPaged()) { + if (store == null) { + return false; + } PageSubscription subscription = store.getCursorProvider().getSubscription(queue.getID()); if (subscription.contains((PagedReference) reference)) { return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 2e3e2f9..252da69 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1481,7 +1481,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString addr = removePrefix(address); PagingStore store = server.getPagingManager().getPageStore(addr); - if (!store.checkMemory(new Runnable() { + if (store == null) { + callback.sendProducerCreditsMessage(credits, address); + } else if (!store.checkMemory(new Runnable() { @Override public void run() { callback.sendProducerCreditsMessage(credits, address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java index f47c827..bf8cfb2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java @@ -28,6 +28,7 @@ import java.io.PrintWriter; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; @@ -304,7 +305,7 @@ public class FileMoveManagerTest { PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null); - PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1); + PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress()); managerImpl.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java index 3f6c252..a9acce4 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.broker.ConnectionContext; @@ -153,7 +154,11 @@ public class DestinationProxy implements Destination { @Override public long getUsage() { try { - return server.getPagingManager().getPageStore(view.getAddress()).getAddressSize(); + final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress()); + if (pageStore == null) { + return 0; + } + return pageStore.getAddressSize(); } catch (Exception e) { throw new RuntimeException(e); } @@ -221,9 +226,13 @@ public class DestinationProxy implements Destination { @Override public int getPercentUsage() { - long total = 0; + final long total; try { - total = server.getPagingManager().getPageStore(view.getAddress()).getMaxSize(); + final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress()); + if (pageStore == null) { + return 0; + } + total = pageStore.getMaxSize(); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java index 4c77532..d4cbdd3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java @@ -19,15 +19,23 @@ package org.apache.activemq.artemis.tests.integration.paging; import java.nio.ByteBuffer; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientRequestor; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; @@ -192,4 +200,91 @@ public class GlobalPagingTest extends PagingTest { session.commit(); } + @Test + public void testManagementAddressCannotPageOrChangeGlobalSize() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + final ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, -1); + + try { + final SimpleString managementAddress = server.getConfiguration().getManagementAddress(); + server.getConfiguration().setGlobalMaxSize(1); + server.start(); + + final ServerLocator locator = createInVMNonHALocator() + .setBlockOnNonDurableSend(true) + .setBlockOnDurableSend(true) + .setBlockOnAcknowledge(true); + + try (ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true)) { + + session.start(); + + if (server.locateQueue(managementAddress) == null) { + + session.createQueue(managementAddress, managementAddress, null, true); + } + + final Queue managementQueue = server.locateQueue(managementAddress); + + Assert.assertNull(managementQueue.getPageSubscription()); + + Assert.assertNull(server.getPagingManager().getPageStore(managementAddress)); + + final SimpleString address = SimpleString.toSimpleString("queue"); + + if (server.locateQueue(address) == null) { + + session.createQueue(address, address, null, true); + } + + final CountDownLatch startSendMessages = new CountDownLatch(1); + + final PagingManager pagingManager = server.getPagingManager(); + + final long globalSize = pagingManager.getGlobalSize(); + + final Thread globalSizeChecker = new Thread(() -> { + startSendMessages.countDown(); + while (!Thread.currentThread().isInterrupted()) { + Assert.assertEquals(globalSize, pagingManager.getGlobalSize()); + } + }); + + globalSizeChecker.start(); + + try (ClientRequestor requestor = new ClientRequestor(session, managementAddress)) { + + ClientMessage message = session.createMessage(false); + + ManagementHelper.putAttribute(message, "queue." + address.toString(), "messageCount"); + + Assert.assertTrue("bodySize = " + message.getBodySize() + " must be > of globalMaxSize = " + server.getConfiguration().getGlobalMaxSize(), message.getBodySize() > server.getConfiguration().getGlobalMaxSize()); + + startSendMessages.await(); + + for (int i = 0; i < 100; i++) { + try { + ClientMessage reply = requestor.request(message); + Assert.assertEquals(0L, ManagementHelper.getResult(reply)); + } catch (ActiveMQAddressFullException e) { + Assert.fail(e.getMessage()); + return; + } + } + + } finally { + globalSizeChecker.interrupt(); + } + } + + } finally { + server.stop(true); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 07c02c3..c27ea68 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -598,7 +598,7 @@ public final class ReplicationTest extends ActiveMQTestBase { final ExecutorFactory executorFactory, final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception { - PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository); + PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress()); paging.start(); return paging; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index 830e61f..40202e5 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -70,7 +70,7 @@ public class BindingsImplTest extends ActiveMQTestBase { private void internalTest(final boolean route) throws Exception { final FakeBinding fake = new FakeBinding(new SimpleString("a")); - final Bindings bind = new BindingsImpl(null, null, null); + final Bindings bind = new BindingsImpl(null, null); bind.addBinding(fake); bind.addBinding(new FakeBinding(new SimpleString("a"))); bind.addBinding(new FakeBinding(new SimpleString("a")));