This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 764db34e9b ARTEMIS-3178 Page Limitting (max messages and max bytes)
764db34e9b is described below
commit 764db34e9ba453b23b7c8b7154ab9ba710099874
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Jan 25 09:59:06 2023 -0500
ARTEMIS-3178 Page Limitting (max messages and max bytes)
I am adding three attributes to Address-settings:
* page-limit-bytes: Number of bytes. We will convert this metric into max
number of pages internally by dividing max-bytes / page-size. It will allow a
max based on an estimate.
* page-limit-messages: Number of messages
* page-full-message-policy: fail or drop
We will now allow paging, until these max values and then fail or drop
messages.
Once these values are retracted, the address will remain full until a
period where cleanup is kicked in by paging. So these values may have a certain
delay on being applied, but they should always be cleared once cleanup happened.
---
.../core/settings/impl/PageFullMessagePolicy.java | 21 ++
.../artemis/core/config/impl/Validators.java | 13 +
.../deployers/impl/FileConfigurationParser.java | 20 +
.../activemq/artemis/core/paging/PagingStore.java | 15 +
.../core/paging/cursor/PageCursorProvider.java | 3 +-
.../paging/cursor/impl/PageCursorProviderImpl.java | 37 +-
.../cursor/impl/PageSubscriptionCounterImpl.java | 20 +-
.../artemis/core/paging/impl/PagingStoreImpl.java | 143 +++++++
.../artemis/core/server/ActiveMQMessageBundle.java | 3 +
.../artemis/core/server/ActiveMQServerLogger.java | 19 +
.../core/settings/impl/AddressSettings.java | 110 +++++-
.../resources/schema/artemis-configuration.xsd | 35 +-
.../core/config/impl/ConfigurationImplTest.java | 128 +++++++
.../config/impl/FileConfigurationParserTest.java | 19 +
.../artemis/tests/util/ActiveMQTestBase.java | 35 ++
docs/user-manual/en/paging.md | 14 +
.../artemis/jms/example/PagingExample.java | 175 ++++++---
.../src/main/resources/activemq/server0/broker.xml | 26 ++
.../tests/integration/paging/PagingLimitTest.java | 416 +++++++++++++++++++++
.../storage/PersistMultiThreadTest.java | 32 ++
20 files changed, 1219 insertions(+), 65 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java
new file mode 100644
index 0000000000..8da0dd38fd
--- /dev/null
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.settings.impl;
+
+public enum PageFullMessagePolicy {
+ DROP, FAIL
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index 3aa5af5ef7..a09f75d616 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
@@ -195,6 +196,18 @@ public final class Validators {
}
};
+ public static final Validator PAGE_FULL_MESSAGE_POLICY_TYPE = new
Validator() {
+ @Override
+ public void validate(final String name, final Object value) {
+ String val = (String) value;
+ if (val == null ||
+ !val.equals(PageFullMessagePolicy.DROP.toString()) &&
+ !val.equals(PageFullMessagePolicy.FAIL.toString())) {
+ throw
ActiveMQMessageBundle.BUNDLE.invalidAddressFullPolicyType(val);
+ }
+ }
+ };
+
public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT =
new Validator() {
@Override
public void validate(final String name, final Object value) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 3bdb0ca706..8461db483c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -98,6 +98,7 @@ import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
@@ -218,6 +219,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME =
"address-full-policy";
+ private static final String PAGE_FULL_MESSAGE_POLICY_NODE_NAME =
"page-full-policy";
+
private static final String MAX_READ_PAGE_BYTES_NODE_NAME =
"max-read-page-bytes";
private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME =
"max-read-page-messages";
@@ -226,6 +229,10 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME =
"page-max-cache-size";
+ private static final String PAGE_LIMIT_BYTES_NODE_NAME = "page-limit-bytes";
+
+ private static final String PAGE_LIMIT_MESSAGES_NODE_NAME =
"page-limit-messages";
+
private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME =
"message-counter-history-day-limit";
private static final String LVQ_NODE_NAME = "last-value-queue";
@@ -1281,6 +1288,14 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
ActiveMQServerLogger.LOGGER.pageMaxSizeUsed();
}
addressSettings.setPageCacheMaxSize(XMLUtil.parseInt(child));
+ } else if (PAGE_LIMIT_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
+ long pageLimitBytes =
ByteUtil.convertTextBytes(getTrimmedTextContent(child));
+
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_BYTES_NODE_NAME,
pageLimitBytes);
+ addressSettings.setPageLimitBytes(pageLimitBytes);
+ } else if (PAGE_LIMIT_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
+ long pageLimitMessages =
ByteUtil.convertTextBytes(getTrimmedTextContent(child));
+
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_MESSAGES_NODE_NAME,
pageLimitMessages);
+ addressSettings.setPageLimitMessages(pageLimitMessages);
} else if
(MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child));
} else if
(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
@@ -1288,6 +1303,11 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME,
value);
AddressFullMessagePolicy policy =
Enum.valueOf(AddressFullMessagePolicy.class, value);
addressSettings.setAddressFullMessagePolicy(policy);
+ } else if (PAGE_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name))
{
+ String value = getTrimmedTextContent(child);
+
Validators.PAGE_FULL_MESSAGE_POLICY_TYPE.validate(PAGE_FULL_MESSAGE_POLICY_NODE_NAME,
value);
+ PageFullMessagePolicy policy =
Enum.valueOf(PageFullMessagePolicy.class, value);
+ addressSettings.setPageFullMessagePolicy(policy);
} else if (LVQ_NODE_NAME.equalsIgnoreCase(name) ||
DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child));
} else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index da680fce6b..d582b0d712 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
@@ -30,6 +31,7 @@ import
org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
AddressFullMessagePolicy getAddressFullMessagePolicy();
+ PageFullMessagePolicy getPageFullMessagePolicy();
+
+ Long getPageLimitMessages();
+
+ Long getPageLimitBytes();
+
+ /** Callback to be used by a counter when the Page is full for that counter
*/
+ void pageFull(PageSubscription subscription);
+
+ boolean isPageFull();
+
+ void checkPageLimit(long numberOfMessages);
+
long getFirstPage();
int getPageSizeBytes();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index d3f25e21b5..b42049fe55 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;
+import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.filter.Filter;
@@ -46,7 +47,7 @@ public interface PageCursorProvider {
void flushExecutors();
- void scheduleCleanup();
+ Future<Boolean> scheduleCleanup();
void disableCleanup();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 24d65970f8..dea4cd5fbc 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,12 +38,14 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class PageCursorProviderImpl implements PageCursorProvider {
@@ -179,11 +182,14 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
}
@Override
- public void scheduleCleanup() {
+ public Future<Boolean> scheduleCleanup() {
+ final SimpleFutureImpl<Boolean> future = new SimpleFutureImpl<>();
if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
- // Scheduled cleanup was already scheduled before.. never mind!
- // or we have cleanup disabled
- return;
+ // Scheduled cleanup was already scheduled before.
+ // On that case just flush the executor returning the future.set(true)
+ // after any previous scheduled cleanup is finished.
+ pagingStore.execute(() -> future.set(true));
+ return future;
}
scheduledCleanup.incrementAndGet();
@@ -199,9 +205,12 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
} finally {
storageManager.clearContext();
scheduledCleanup.decrementAndGet();
+ future.set(true);
}
}
});
+
+ return future;
}
/**
@@ -241,6 +250,22 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
scheduleCleanup();
}
+ private long getNumberOfMessagesOnSubscriptions() {
+ AtomicLong largerCounter = new AtomicLong();
+ activeCursors.forEach((id, sub) -> {
+ long value = sub.getCounter().getValue();
+ if (value > largerCounter.get()) {
+ largerCounter.set(value);
+ }
+ });
+
+ return largerCounter.get();
+ }
+
+ void checkClearPageLimit() {
+ pagingStore.checkPageLimit(getNumberOfMessagesOnSubscriptions());
+ }
+
protected void cleanup() {
if (!countersRebuilt) {
@@ -299,6 +324,10 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
// Then we do some check on eventual pages that can be already
removed but they are away from the streaming
cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList,
minPage, firstPage);
+ if (pagingStore.isPageFull()) {
+ checkClearPageLimit();
+ }
+
assert pagingStore.getNumberOfPages() >= 0;
if (pagingStore.getNumberOfPages() == 0 ||
pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null ||
pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index f580be47f5..41d0255a21 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -20,6 +20,7 @@ import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -56,6 +57,8 @@ public class PageSubscriptionCounterImpl extends
BasePagingCounter {
private PageSubscription subscription;
+ private PagingStore pagingStore;
+
private final StorageManager storage;
private volatile long value;
@@ -187,11 +190,16 @@ public class PageSubscriptionCounterImpl extends
BasePagingCounter {
if (logger.isTraceEnabled()) {
logger.trace("process subscription={} add={}, size={}",
subscriptionID, add, size);
}
- valueUpdater.addAndGet(this, add);
+ long value = valueUpdater.addAndGet(this, add);
persistentSizeUpdater.addAndGet(this, size);
if (add > 0) {
addedUpdater.addAndGet(this, add);
addedPersistentSizeUpdater.addAndGet(this, size);
+
+ /// we could have pagingStore null on tests, so we need to validate
if pagingStore != null before anything...
+ if (pagingStore != null && pagingStore.getPageFullMessagePolicy() !=
null && !pagingStore.isPageFull()) {
+ checkAdd(value);
+ }
}
if (isRebuilding()) {
@@ -200,6 +208,15 @@ public class PageSubscriptionCounterImpl extends
BasePagingCounter {
}
}
+ private void checkAdd(long numberOfMessages) {
+ Long pageLimitMessages = pagingStore.getPageLimitMessages();
+ if (pageLimitMessages != null) {
+ if (numberOfMessages >= pageLimitMessages.longValue()) {
+ pagingStore.pageFull(this.subscription);
+ }
+ }
+ }
+
@Override
public void delete() throws Exception {
Transaction tx = new TransactionImpl(storage);
@@ -420,6 +437,7 @@ public class PageSubscriptionCounterImpl extends
BasePagingCounter {
@Override
public PageSubscriptionCounter setSubscription(PageSubscription
subscription) {
this.subscription = subscription;
+ this.pagingStore = subscription.getPagingStore();
return this;
}
}
\ No newline at end of file
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 81d10b9691..137d18a21b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -52,6 +53,7 @@ import
org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@@ -99,6 +101,16 @@ public class PagingStoreImpl implements PagingStore {
private long maxMessages;
+ private volatile boolean pageFull;
+
+ private Long pageLimitBytes;
+
+ private Long estimatedMaxPages;
+
+ private Long pageLimitMessages;
+
+ private PageFullMessagePolicy pageFullMessagePolicy;
+
private int pageSize;
private volatile AddressFullMessagePolicy addressFullMessagePolicy;
@@ -225,6 +237,40 @@ public class PagingStoreImpl implements PagingStore {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+ pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+ pageLimitBytes = addressSettings.getPageLimitBytes();
+
+ if (pageLimitBytes != null && pageLimitBytes.longValue() < 0) {
+ logger.debug("address {} had pageLimitBytes<0, setting it as null",
address);
+ pageLimitBytes = null;
+ }
+
+ pageLimitMessages = addressSettings.getPageLimitMessages();
+
+ if (pageLimitMessages != null && pageLimitMessages.longValue() < 0) {
+ logger.debug("address {} had pageLimitMessages<0, setting it as
null", address);
+ pageLimitMessages = null;
+ }
+
+
+ if (pageLimitBytes == null && pageLimitMessages == null &&
pageFullMessagePolicy != null) {
+ ActiveMQServerLogger.LOGGER.noPageLimitsSet(address,
pageFullMessagePolicy);
+ this.pageFullMessagePolicy = null;
+ }
+
+ if (pageLimitBytes != null && pageLimitMessages != null &&
pageFullMessagePolicy == null) {
+ ActiveMQServerLogger.LOGGER.noPagefullPolicySet(address,
pageLimitBytes, pageLimitMessages);
+ this.pageFullMessagePolicy = null;
+ this.pageLimitMessages = null;
+ this.pageLimitBytes = null;
+ }
+
+ if (pageLimitBytes != null && pageSize > 0) {
+ estimatedMaxPages = pageLimitBytes / pageSize;
+ logger.debug("Address {} should not allow more than {} pages",
storeName, estimatedMaxPages);
+ }
}
@Override
@@ -232,6 +278,79 @@ public class PagingStoreImpl implements PagingStore {
return "PagingStoreImpl(" + this.address + ")";
}
+ @Override
+ public PageFullMessagePolicy getPageFullMessagePolicy() {
+ return pageFullMessagePolicy;
+ }
+
+ @Override
+ public Long getPageLimitMessages() {
+ return pageLimitMessages;
+ }
+
+ @Override
+ public Long getPageLimitBytes() {
+ return pageLimitBytes;
+ }
+
+ @Override
+ public void pageFull(PageSubscription subscription) {
+ this.pageFull = true;
+ try {
+
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(),
subscription.getQueue().getAddress(), pageLimitMessages,
subscription.getCounter().getValue());
+ } catch (Throwable e) {
+ // I don't think subscription would ever have a null queue. I'm being
cautious here for tests
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isPageFull() {
+ return pageFull;
+ }
+
+ private boolean isBelowPageLimitBytes() {
+ if (estimatedMaxPages != null) {
+ return (numberOfPages <= estimatedMaxPages.longValue());
+ } else {
+ return true;
+ }
+ }
+
+ private void checkNumberOfPages() {
+ if (!isBelowPageLimitBytes()) {
+ this.pageFull = true;
+ ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName,
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+ }
+ }
+
+ @Override
+ public void checkPageLimit(long numberOfMessages) {
+ boolean pageMessageMessagesClear = true;
+ Long pageLimitMessages = getPageLimitMessages();
+
+ if (pageLimitMessages != null) {
+ if (logger.isDebugEnabled()) { // gate to avoid boxing of
numberOfMessages
+ logger.debug("Address {} has {} messages on the larger queue",
storeName, numberOfMessages);
+ }
+
+ pageMessageMessagesClear = (numberOfMessages <
pageLimitMessages.longValue());
+ }
+
+ boolean pageMessageBytesClear = isBelowPageLimitBytes();
+
+ if (pageMessageBytesClear && pageMessageMessagesClear) {
+ pageLimitReleased();
+ }
+ }
+
+ private void pageLimitReleased() {
+ if (pageFull) {
+ ActiveMQServerLogger.LOGGER.pageFree(getAddress());
+ this.pageFull = false;
+ }
+ }
+
@Override
public boolean lock(long timeout) {
if (timeout == -1) {
@@ -480,6 +599,8 @@ public class PagingStoreImpl implements PagingStore {
numberOfPages = files.size();
+ checkNumberOfPages();
+
for (String fileName : files) {
final int fileId =
PagingStoreImpl.getPageIdFromFileName(fileName);
@@ -556,6 +677,7 @@ public class PagingStoreImpl implements PagingStore {
if (isPaging) {
paging = false;
ActiveMQServerLogger.LOGGER.pageStoreStop(storeName,
getPageInfo());
+ pageLimitReleased();
}
this.cursorProvider.onPageModeCleared();
} finally {
@@ -1029,6 +1151,25 @@ public class PagingStoreImpl implements PagingStore {
lock.readLock().unlock();
}
+ if (pageFull) {
+ if (message.isLargeMessage()) {
+ ((LargeServerMessage) message).deleteFile();
+ }
+
+ if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) {
+ throw
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+ }
+
+ if (!printedDropMessagesWarning) {
+ printedDropMessagesWarning = true;
+ ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName,
getPageInfo());
+ }
+
+ // we are in page mode, if we got to this point, we are dropping the
message while still paging
+ // this needs to return true as it is paging
+ return true;
+ }
+
return writePage(message, tx, listCtx);
}
@@ -1282,6 +1423,8 @@ public class PagingStoreImpl implements PagingStore {
try {
numberOfPages++;
+ checkNumberOfPages();
+
final long newPageId = currentPageId + 1;
if (logger.isTraceEnabled()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 2c2f632d93..05625fd3af 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -527,4 +527,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229245, value = "Management controller is busy with another
task. Please try again")
ActiveMQTimeoutException managementBusy();
+
+ @Message(id = 229246, value = "Invalid page full message policy type {}")
+ IllegalArgumentException invalidPageFullPolicyType(String val);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index eaec474036..03426d92b8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1563,4 +1563,23 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224119, value = "Unable to refresh security settings: {}",
level = LogMessage.Level.WARN)
void unableToRefreshSecuritySettings(String exceptionMessage);
+
+ @LogMessage(id = 224120, value = "Queue {} on Address {} has more messages
than configured page limit. PageLimitMesages={} while currentValue={}", level =
LogMessage.Level.WARN)
+ void pageFull(SimpleString queue, SimpleString address, Object
pageLImitMessage, Object currentValue);
+
+ @LogMessage(id = 224121, value = "Queue {} on Address {} is out of page
limit now. We will issue a cleanup to check other queues.", level =
LogMessage.Level.WARN)
+ void pageFree(SimpleString queue, SimpleString address);
+
+ @LogMessage(id = 224122, value = "Address {} number of messages is under
page limit again, and it should be allowed to page again.", level =
LogMessage.Level.INFO)
+ void pageFree(SimpleString address);
+
+ @LogMessage(id = 224123, value = "Address {} has more pages than allowed.
System currently has {} pages, while the estimated max number of pages is {},
based on the limitPageBytes ({}) / page-size ({})", level =
LogMessage.Level.WARN)
+ void pageFullMaxBytes(SimpleString address, long pages, long maxPages, long
limitBytes, long bytes);
+
+ @LogMessage(id = 224124, value = "Address {} has a pageFullPolicy set as {}
but there are not page-limit-bytes or page-limit-messages set. Page full
configuration being ignored on this address.", level = LogMessage.Level.WARN)
+ void noPageLimitsSet(Object address, Object policy);
+
+ @LogMessage(id = 224125, value = "Address {} has page-limit-bytes={},
page-limit-messages={} and no page-full-policy set. Page full configuration
being ignored on this address", level = LogMessage.Level.WARN)
+ void noPagefullPolicySet(Object address, Object limitBytes, Object
limitMessages);
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 01ede20416..c06ff5bcf0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -147,6 +147,12 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
private Integer maxReadPageMessages = null;
+ private Long pageLimitBytes = null;
+
+ private Long pageLimitMessages = null;
+
+ private PageFullMessagePolicy pageFullMessagePolicy = null;
+
private Long maxSizeMessages = null;
private Integer pageSizeBytes = null;
@@ -289,6 +295,9 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
this.maxSizeMessages = other.maxSizeMessages;
this.maxReadPageMessages = other.maxReadPageMessages;
this.maxReadPageBytes = other.maxReadPageBytes;
+ this.pageLimitBytes = other.pageLimitBytes;
+ this.pageLimitMessages = other.pageLimitMessages;
+ this.pageFullMessagePolicy = other.pageFullMessagePolicy;
this.pageSizeBytes = other.pageSizeBytes;
this.pageMaxCache = other.pageMaxCache;
this.dropMessagesWhenFull = other.dropMessagesWhenFull;
@@ -644,6 +653,33 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
return this;
}
+ public Long getPageLimitBytes() {
+ return pageLimitBytes;
+ }
+
+ public AddressSettings setPageLimitBytes(Long pageLimitBytes) {
+ this.pageLimitBytes = pageLimitBytes;
+ return this;
+ }
+
+ public Long getPageLimitMessages() {
+ return pageLimitMessages;
+ }
+
+ public AddressSettings setPageLimitMessages(Long pageLimitMessages) {
+ this.pageLimitMessages = pageLimitMessages;
+ return this;
+ }
+
+ public PageFullMessagePolicy getPageFullMessagePolicy() {
+ return this.pageFullMessagePolicy;
+ }
+
+ public AddressSettings setPageFullMessagePolicy(PageFullMessagePolicy
policy) {
+ this.pageFullMessagePolicy = policy;
+ return this;
+ }
+
public int getMaxReadPageBytes() {
return maxReadPageBytes != null ? maxReadPageBytes : 2 *
getPageSizeBytes();
}
@@ -1223,6 +1259,15 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
if (enableIngressTimestamp == null) {
enableIngressTimestamp = merged.enableIngressTimestamp;
}
+ if (pageFullMessagePolicy == null) {
+ pageFullMessagePolicy = merged.pageFullMessagePolicy;
+ }
+ if (pageLimitBytes == null) {
+ pageLimitBytes = merged.pageLimitBytes;
+ }
+ if (pageLimitMessages == null) {
+ pageLimitMessages = merged.pageLimitMessages;
+ }
}
@Override
@@ -1472,6 +1517,24 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
maxReadPageMessages = BufferHelper.readNullableInteger(buffer);
}
+
+ if (buffer.readableBytes() > 0) {
+ pageLimitBytes = BufferHelper.readNullableLong(buffer);
+ }
+
+ if (buffer.readableBytes() > 0) {
+ pageLimitMessages = BufferHelper.readNullableLong(buffer);
+ }
+
+ if (buffer.readableBytes() > 0) {
+ policyStr = buffer.readNullableSimpleString();
+
+ if (policyStr != null) {
+ pageFullMessagePolicy =
PageFullMessagePolicy.valueOf(policyStr.toString());
+ } else {
+ pageFullMessagePolicy = null;
+ }
+ }
}
@Override
@@ -1542,7 +1605,10 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) +
BufferHelper.sizeOfNullableLong(maxSizeMessages) +
BufferHelper.sizeOfNullableInteger(maxReadPageMessages) +
- BufferHelper.sizeOfNullableInteger(maxReadPageBytes);
+ BufferHelper.sizeOfNullableInteger(maxReadPageBytes) +
+ BufferHelper.sizeOfNullableLong(pageLimitBytes) +
+ BufferHelper.sizeOfNullableLong(pageLimitMessages) +
+ BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null
? pageFullMessagePolicy.toString() : null);
}
@Override
@@ -1682,6 +1748,13 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableInteger(buffer, maxReadPageBytes);
BufferHelper.writeNullableInteger(buffer, maxReadPageMessages);
+
+ BufferHelper.writeNullableLong(buffer, pageLimitBytes);
+
+ BufferHelper.writeNullableLong(buffer, pageLimitMessages);
+
+ buffer.writeNullableSimpleString(pageFullMessagePolicy != null ? new
SimpleString(pageFullMessagePolicy.toString()) : null);
+
}
/* (non-Javadoc)
@@ -1758,6 +1831,10 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
result = prime * result + ((slowConsumerThresholdMeasurementUnit ==
null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
result = prime * result + ((enableIngressTimestamp == null) ? 0 :
enableIngressTimestamp.hashCode());
result = prime * result + ((maxSizeMessages == null) ? 0 :
maxSizeMessages.hashCode());
+ result = prime * result + ((pageLimitBytes == null) ? 0 :
pageLimitBytes.hashCode());
+ result = prime * result + ((pageLimitMessages == null) ? 0 :
pageLimitMessages.hashCode());
+ result = prime * result + ((pageFullMessagePolicy == null) ? 0 :
pageFullMessagePolicy.hashCode());
+
return result;
}
@@ -2130,6 +2207,31 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
} else if (!maxSizeMessages.equals(other.maxSizeMessages))
return false;
+ if (pageLimitBytes == null) {
+ if (other.pageLimitBytes != null) {
+ return false;
+ }
+ } else if (!pageLimitBytes.equals(other.pageLimitBytes)) {
+ return false;
+ }
+
+ if (pageLimitMessages == null) {
+ if (other.pageLimitMessages != null) {
+ return false;
+ }
+ } else if (!pageLimitMessages.equals(other.pageLimitMessages)) {
+ return false;
+ }
+
+ if (pageFullMessagePolicy == null) {
+ if (other.pageFullMessagePolicy != null) {
+ return false;
+ }
+ } else if (!pageFullMessagePolicy.equals(other.pageFullMessagePolicy)) {
+ return false;
+ }
+
+
return true;
}
@@ -2267,6 +2369,12 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
enableMetrics +
", enableIngressTime=" +
enableIngressTimestamp +
+ ", pageLimitBytes=" +
+ pageLimitBytes +
+ ", pageLimitMessages=" +
+ pageLimitMessages +
+ ", pageFullMessagePolicy=" +
+ pageFullMessagePolicy +
"]";
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index fdb30c7c24..a6953148c1 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3757,8 +3757,25 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="page-limit-bytes" type="xsd:string" maxOccurs="1"
minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ After the address enters into page mode, this attribute will
configure how many pages can be written into page before activating the
page-full-policy.
+ Supports byte notation like "K", "Mb", "MiB", "GB", etc.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="page-limit-messages" type="xsd:long" maxOccurs="1"
minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ After the address enters into page mode, this attribute will
configure how many messages can be written into page before activating the
page-full-policy.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="max-size-bytes-reject-threshold" type="xsd:long"
default="-1" maxOccurs="1"
- minOccurs="0">
+ minOccurs="0">
<xsd:annotation>
<xsd:documentation>
used with the address full BLOCK policy, the maximum size
(in bytes) an address can reach before
@@ -3819,6 +3836,20 @@
</xsd:simpleType>
</xsd:element>
+ <xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ After entering page mode, a second limit will be set by
page-limit-bytes and/or page-limit-messages. The page-full-policy will
configure what to do when that limit is reached.
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="DROP"/>
+ <xsd:enumeration value="FAIL"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:element>
+
<xsd:element name="message-counter-history-day-limit" type="xsd:int"
default="0" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
@@ -4315,7 +4346,7 @@
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
</xsd:element>
-
+
<xsd:complexType name="connector-serviceType">
<xsd:sequence>
<xsd:element maxOccurs="1" minOccurs="1" name="factory-class"
type="xsd:string">
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 581cc78ea8..1f846cc1eb 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -30,6 +30,7 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
@@ -50,6 +51,12 @@ import
org.apache.activemq.artemis.core.config.federation.FederationPolicySet;
import
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.security.Role;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
@@ -58,10 +65,12 @@ import
org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServer
import org.apache.activemq.artemis.core.server.routing.KeyType;
import
org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.commons.lang3.ClassUtils;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@@ -1004,6 +1013,125 @@ public class ConfigurationImplTest extends
ActiveMQTestBase {
Assert.assertEquals(SimpleString.toSimpleString("moreImportant"),
configuration.getAddressSettings().get("Name.With.Dots").getExpiryAddress());
}
+ @Test
+ public void testAddressSettingsPageLimit() throws Throwable {
+ ConfigurationImpl configuration = new ConfigurationImpl();
+
+ Properties properties = new Properties();
+
+ String randomString = RandomUtil.randomString();
+
+ properties.put("addressSettings.#.expiryAddress", randomString);
+ properties.put("addressSettings.#.pageLimitMessages", "300");
+ properties.put("addressSettings.#.pageLimitBytes", "300000");
+ properties.put("addressSettings.#.pageFullMessagePolicy", "DROP");
+
+ configuration.parsePrefixedProperties(properties, null);
+
+ Assert.assertEquals(1, configuration.getAddressSettings().size());
+ Assert.assertEquals(SimpleString.toSimpleString(randomString),
configuration.getAddressSettings().get("#").getExpiryAddress());
+ Assert.assertEquals(300L,
configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
+ Assert.assertEquals(300000L,
configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
+ Assert.assertEquals("DROP",
configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
+
+ PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"),
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class),
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class),
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"),
configuration.getAddressSettings().get("#"), null, null, true);
+
+ Assert.assertEquals(300L, storeImpl.getPageLimitMessages().longValue());
+ Assert.assertEquals(300000L, storeImpl.getPageLimitBytes().longValue());
+ Assert.assertEquals("DROP",
storeImpl.getPageFullMessagePolicy().toString());
+ }
+
+ @Test
+ public void testAddressSettingsPageLimitInvalidConfiguration1() throws
Throwable {
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+ ConfigurationImpl configuration = new ConfigurationImpl();
+
+ Properties properties = new Properties();
+
+ String randomString = RandomUtil.randomString();
+
+ properties.put("addressSettings.#.expiryAddress", randomString);
+ properties.put("addressSettings.#.pageLimitMessages", "300");
+ properties.put("addressSettings.#.pageLimitBytes", "300000");
+ //properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); //
removing the pageFull on purpose
+
+ configuration.parsePrefixedProperties(properties, null);
+
+ Assert.assertEquals(1, configuration.getAddressSettings().size());
+ Assert.assertEquals(SimpleString.toSimpleString(randomString),
configuration.getAddressSettings().get("#").getExpiryAddress());
+ Assert.assertEquals(300L,
configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
+ Assert.assertEquals(300000L,
configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
+ Assert.assertEquals(null,
configuration.getAddressSettings().get("#").getPageFullMessagePolicy());
+
+ PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"),
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class),
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class),
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"),
configuration.getAddressSettings().get("#"), null, null, true);
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224125"));
+
+ Assert.assertEquals(null, storeImpl.getPageLimitMessages());
+ Assert.assertEquals(null, storeImpl.getPageLimitBytes());
+ Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
+ }
+
+ @Test
+ public void testAddressSettingsPageLimitInvalidConfiguration2() throws
Throwable {
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+ ConfigurationImpl configuration = new ConfigurationImpl();
+
+ Properties properties = new Properties();
+
+ String randomString = RandomUtil.randomString();
+
+ properties.put("addressSettings.#.expiryAddress", randomString);
+ //properties.put("addressSettings.#.pageLimitMessages", "300"); //
removing this on purpose
+ //properties.put("addressSettings.#.pageLimitBytes", "300000"); //
removing this on purpose
+ properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); //
keeping this on purpose
+
+ configuration.parsePrefixedProperties(properties, null);
+
+ Assert.assertEquals(1, configuration.getAddressSettings().size());
+ Assert.assertEquals(SimpleString.toSimpleString(randomString),
configuration.getAddressSettings().get("#").getExpiryAddress());
+ Assert.assertEquals(null,
configuration.getAddressSettings().get("#").getPageLimitMessages());
+ Assert.assertEquals(null,
configuration.getAddressSettings().get("#").getPageLimitBytes());
+ Assert.assertEquals("DROP",
configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
+
+ PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"),
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class),
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class),
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"),
configuration.getAddressSettings().get("#"), null, null, true);
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224124"));
+
+ Assert.assertEquals(null, storeImpl.getPageLimitMessages());
+ Assert.assertEquals(null, storeImpl.getPageLimitBytes());
+ Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
+ }
+
+
+ @Test
+ public void testAddressSettingsPageLimitInvalidConfiguration3() throws
Throwable {
+ ConfigurationImpl configuration = new ConfigurationImpl();
+
+ Properties properties = new Properties();
+
+ String randomString = RandomUtil.randomString();
+
+ properties.put("addressSettings.#.expiryAddress", randomString);
+ properties.put("addressSettings.#.pageLimitMessages", "-1"); // -1 on
purpose, to make it null on final parsing
+ properties.put("addressSettings.#.pageLimitBytes", "-1"); // -1 on
purpose, to make it null on final parsing
+ properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); //
keeping this on purpose
+
+ configuration.parsePrefixedProperties(properties, null);
+
+ Assert.assertEquals(1, configuration.getAddressSettings().size());
+ Assert.assertEquals(SimpleString.toSimpleString(randomString),
configuration.getAddressSettings().get("#").getExpiryAddress());
+ Assert.assertEquals(-1L,
configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
+ Assert.assertEquals(-1L,
configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
+ Assert.assertEquals("DROP",
configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
+
+ PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"),
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class),
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class),
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"),
configuration.getAddressSettings().get("#"), null, null, true);
+
+ Assert.assertEquals(null, storeImpl.getPageLimitMessages());
+ Assert.assertEquals(null, storeImpl.getPageLimitBytes());
+ Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
+ }
+
@Test
public void testDivertViaProperties() throws Exception {
ConfigurationImpl configuration = new ConfigurationImpl();
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 6cf7fda451..1cf36fca22 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -393,6 +393,25 @@ public class FileConfigurationParserTest extends
ActiveMQTestBase {
AddressSettings settings = configuration.getAddressSettings().get("foo");
Assert.assertEquals(1024, settings.getMaxReadPageBytes());
Assert.assertEquals(33, settings.getMaxReadPageMessages());
+ Assert.assertNull(settings.getPageLimitBytes());
+ Assert.assertNull(settings.getPageLimitMessages());
+ Assert.assertNull(settings.getPageFullMessagePolicy());
+ }
+
+ @Test
+ public void testParsePageLimitSettings() throws Exception {
+ String configStr = "<configuration><address-settings>" + "\n" +
"<address-setting match=\"foo\">" + "\n" +
"<max-read-page-bytes>1k</max-read-page-bytes><page-limit-bytes>2k</page-limit-bytes><page-limit-messages>337</page-limit-messages><page-full-policy>FAIL</page-full-policy><max-read-page-messages>33</max-read-page-messages>.\n"
+ "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
+
+ FileConfigurationParser parser = new FileConfigurationParser();
+ ByteArrayInputStream input = new
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+ Configuration configuration = parser.parseMainConfig(input);
+ AddressSettings settings = configuration.getAddressSettings().get("foo");
+ Assert.assertEquals(1024, settings.getMaxReadPageBytes());
+ Assert.assertEquals(33, settings.getMaxReadPageMessages());
+ Assert.assertEquals(2048L, settings.getPageLimitBytes().longValue());
+ Assert.assertEquals(337L, settings.getPageLimitMessages().longValue());
+ Assert.assertEquals("FAIL",
settings.getPageFullMessagePolicy().toString());
}
@Test
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index db80b028ab..3a19d098dc 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -134,6 +134,7 @@ import
org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation;
import
org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@@ -1505,12 +1506,28 @@ public abstract class ActiveMQTestBase extends Assert {
return createServer(realFiles, configuration, pageSize, maxAddressSize,
null, null, settings);
}
+
+
+ protected final ActiveMQServer createServer(final boolean realFiles,
+ final Configuration
configuration,
+ final int pageSize,
+ final long maxAddressSize,
+ final Integer
maxReadPageMessages,
+ final Integer maxReadPageBytes,
+ final Map<String,
AddressSettings> settings) {
+ return createServer(realFiles, configuration, pageSize, maxAddressSize,
maxReadPageMessages, maxReadPageBytes, null, null, null, settings);
+
+ }
+
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer maxReadPageMessages,
final Integer maxReadPageBytes,
+ final Long pageLimitBytes,
+ final Long pageLimitMessages,
+ final String pageLimitPolicy,
final Map<String, AddressSettings>
settings) {
ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
@@ -1522,6 +1539,15 @@ public abstract class ActiveMQTestBase extends Assert {
if (maxReadPageMessages != null) {
setting.getValue().setMaxReadPageMessages(maxReadPageMessages.intValue());
}
+ if (pageLimitBytes != null) {
+ setting.getValue().setPageLimitBytes(pageLimitBytes);
+ }
+ if (pageLimitMessages != null) {
+ setting.getValue().setPageLimitMessages(pageLimitMessages);
+ }
+ if (pageLimitPolicy != null) {
+
setting.getValue().setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy));
+ }
server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
}
}
@@ -1533,6 +1559,15 @@ public abstract class ActiveMQTestBase extends Assert {
if (maxReadPageMessages != null) {
defaultSetting.setMaxReadPageMessages(maxReadPageMessages.intValue());
}
+ if (pageLimitBytes != null) {
+ defaultSetting.setPageLimitBytes(pageLimitBytes);
+ }
+ if (pageLimitMessages != null) {
+ defaultSetting.setPageLimitMessages(pageLimitMessages);
+ }
+ if (pageLimitPolicy != null) {
+
defaultSetting.setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy));
+ }
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
diff --git a/docs/user-manual/en/paging.md b/docs/user-manual/en/paging.md
index 8b33e26958..b1fe0dcc38 100644
--- a/docs/user-manual/en/paging.md
+++ b/docs/user-manual/en/paging.md
@@ -74,6 +74,9 @@ Configuration is done at the address settings in `broker.xml`.
<max-size-messages>1000</max-size-messages>
<page-size-bytes>10485760</page-size-bytes>
<address-full-policy>PAGE</address-full-policy>
+ <page-limit-bytes>10G</page-limit-bytes>
+ <page-limit-messages>1000000</page-limit-messages>
+ <page-full-policy>FAIL</page-full-policy>
</address-setting>
</address-settings>
```
@@ -98,6 +101,9 @@ Property Name|Description|Default
`address-full-policy`|This must be set to `PAGE` for paging to enable. If the
value is `PAGE` then further messages will be paged to disk. If the value is
`DROP` then further messages will be silently dropped. If the value is `FAIL`
then the messages will be dropped and the client message producers will receive
an exception. If the value is `BLOCK` then client message producers will block
when they try and send further messages.|`PAGE`
`max-read-page-messages` | how many message can be read from paging into the
Queue whenever more messages are needed. The system wtill stop reading if
`max-read-page-bytes hits the limit first. | -1
`max-read-page-bytes` | how much memory the messages read from paging can take
on the Queue whenever more messages are needed. The system will stop reading if
`max-read-page-messages` hits the limit first. | 2 * page-size-bytes
+`page-limit-bytes` | After entering page mode, how much data would the system
allow incoming. Notice this will be internally converted as number of pages. |
+`page-limit-messages` | After entering page mode, how many messages would the
system allow incoming on paging. |
+`page-full-policy` | Valid results are DROP or FAIL. This tells what to do if
the system is reaching `page-limit-bytes` or `page-limit-messages` after paging
|
### max-size-bytes and max-size-messages simultaneous usage
@@ -205,6 +211,14 @@ The system should keep at least one paged file in memory
caching ahead reading m
Also every active subscription could keep one paged file in memory.
So, if your system has too many queues it is recommended to minimize the
page-size.
+## Page Limits and Page Full Policy
+
+Since version `2.28.0` is possible to configure limits on how much data is
paged. This is to avoid a single destination using the entire disk in case
their consumers are gone.
+
+You can configure either `page-limit-bytes` or `page-limit-messages`, along
with `page-full-policy` on the address settings limiting how much data will be
recorded in paging.
+
+If you configure `page-full-policy` as DROP, messages will be simplify dropped
while the clients will not get any exceptions, while if you configured FAIL the
producers will receive a JMS Exception for the error condition.
+
## Example
See the [Paging Example](examples.md#paging) which shows how to use paging
with
diff --git
a/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
b/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
index 165fae2739..ad97020b94 100644
---
a/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
+++
b/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
@@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -32,92 +33,157 @@ import javax.naming.InitialContext;
public class PagingExample {
public static void main(final String[] args) throws Exception {
- Connection connection = null;
+ // simple routing showing how paging should work
+ simplePaging();
+ // simple routing showing what happens when paging enters into page-full
+ pageFullLimit();
+ }
+
+
+ public static void pageFullLimit() throws Exception {
+ InitialContext initialContext = null;
+ try {
+ // Create an initial context to perform the JNDI lookup.
+ initialContext = new InitialContext();
+
+ // Perform a lookup on the Connection Factory
+ ConnectionFactory cf = (ConnectionFactory)
initialContext.lookup("ConnectionFactory");
+
+ // Create a JMS Connection
+ try (Connection connection = cf.createConnection()) {
+
+ // Create a JMS Session
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+
+ // lookup the queue
+ Queue queue = session.createQueue("pagingQueueLimited");
+
+ // Create a JMS Message Producer for pageQueueAddress
+ MessageProducer pageMessageProducer =
session.createProducer(queue);
+
+ // We don't need persistent messages in order to use paging. (This
step is optional)
+ pageMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ // Create a Binary Bytes Message with 10K arbitrary bytes
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[10 * 1024]);
+
+ try {
+ // Send messages to the queue until the address is full
+ for (int i = 0; i < 2000; i++) {
+ pageMessageProducer.send(message);
+ if (i > 0 && i % 100 == 0) {
+ // batch commit on the sends
+ session.commit();
+ }
+ }
+
+ throw new RuntimeException("Example was supposed to get a page
full exception. Check your example configuration or report a bug");
+ } catch (JMSException e) {
+ System.out.println("The producer has thrown an expected
exception " + e);
+ }
+ session.commit();
+ }
+ } finally {
+ // And finally, always remember to close your JMS connections after
use, in a finally block. Closing a JMS
+ // connection will automatically close all of its sessions,
consumers, producer and browser objects
+
+ if (initialContext != null) {
+ initialContext.close();
+ }
+
+ }
+ }
+
+
+ public static void simplePaging() throws Exception {
InitialContext initialContext = null;
try {
- // Step 1. Create an initial context to perform the JNDI lookup.
+ // Create an initial context to perform the JNDI lookup.
initialContext = new InitialContext();
- // Step 2. Perform a lookup on the Connection Factory
+ // Perform a lookup on the Connection Factory
ConnectionFactory cf = (ConnectionFactory)
initialContext.lookup("ConnectionFactory");
- // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is
configured to hold a very limited number
+ // We look-up the JMS queue object from JNDI. pagingQueue is
configured to hold a very limited number
// of bytes in memory
Queue pageQueue = (Queue) initialContext.lookup("queue/pagingQueue");
- // Step 4. Lookup for a JMS Queue
+ // Lookup for a JMS Queue
Queue queue = (Queue) initialContext.lookup("queue/exampleQueue");
- // Step 5. Create a JMS Connection
- connection = cf.createConnection();
+ // Create a JMS Connection
+ try (Connection connection = cf.createConnection()) {
- // Step 6. Create a JMS Session
- Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ // Create a JMS Session
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
- // Step 7. Create a JMS Message Producer for pageQueueAddress
- MessageProducer pageMessageProducer =
session.createProducer(pageQueue);
+ // Create a JMS Message Producer for pageQueueAddress
+ MessageProducer pageMessageProducer =
session.createProducer(pageQueue);
- // Step 8. We don't need persistent messages in order to use paging.
(This step is optional)
- pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ // We don't need persistent messages in order to use paging. (This
step is optional)
+ pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(new byte[10 * 1024]);
+ // Create a Binary Bytes Message with 10K arbitrary bytes
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[10 * 1024]);
- // Step 10. Send only 20 messages to the Queue. This will be already
enough for pagingQueue. Look at
- // ./paging/config/activemq-queues.xml for the config.
- for (int i = 0; i < 20; i++) {
- pageMessageProducer.send(message);
- }
+ // Send only 20 messages to the Queue. This will be already enough
for pagingQueue. Look at
+ // ./paging/config/activemq-queues.xml for the config.
+ for (int i = 0; i < 20; i++) {
+ pageMessageProducer.send(message);
+ }
- // Step 11. Create a JMS Message Producer
- MessageProducer messageProducer = session.createProducer(queue);
+ // Create a JMS Message Producer
+ MessageProducer messageProducer = session.createProducer(queue);
- // Step 12. We don't need persistent messages in order to use paging.
(This step is optional)
- messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ // We don't need persistent messages in order to use paging. (This
step is optional)
+ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // Step 13. Send the message for about 1K, which should be over the
memory limit imposed by the server
- for (int i = 0; i < 1000; i++) {
- messageProducer.send(message);
- }
+ // Send the message for about 1K, which should be over the memory
limit imposed by the server
+ for (int i = 0; i < 1000; i++) {
+ messageProducer.send(message);
+ }
- // Step 14. if you pause this example here, you will see several
files under ./build/data/paging
- // Thread.sleep(30000); // if you want to just our of curiosity, you
can sleep here and inspect the created
- // files just for
+ // if you pause this example here, you will see several files
under ./build/data/paging
+ // Thread.sleep(30000); // if you want to just our of curiosity,
you can sleep here and inspect the created
+ // files just for
- // Step 15. Create a JMS Message Consumer
- MessageConsumer messageConsumer = session.createConsumer(queue);
+ // Create a JMS Message Consumer
+ MessageConsumer messageConsumer = session.createConsumer(queue);
- // Step 16. Start the JMS Connection. This step will activate the
subscribers to receive messages.
- connection.start();
+ // Start the JMS Connection. This step will activate the
subscribers to receive messages.
+ connection.start();
- // Step 17. Receive the messages. It's important to ACK for messages
as ActiveMQ Artemis will not read messages from
- // paging
- // until messages are ACKed
+ // Receive the messages. It's important to ACK for messages as
ActiveMQ Artemis will not read messages from
+ // paging
+ // until messages are ACKed
- for (int i = 0; i < 1000; i++) {
- message = (BytesMessage) messageConsumer.receive(3000);
+ for (int i = 0; i < 1000; i++) {
+ message = (BytesMessage) messageConsumer.receive(3000);
- if (i % 100 == 0) {
- System.out.println("Received " + i + " messages");
- message.acknowledge();
+ if (i % 100 == 0) {
+ System.out.println("Received " + i + " messages");
+ message.acknowledge();
+ }
}
- }
- message.acknowledge();
+ message.acknowledge();
- // Step 18. Receive the messages from the Queue names pageQueue.
Create the proper consumer for that
- messageConsumer.close();
- messageConsumer = session.createConsumer(pageQueue);
+ // Receive the messages from the Queue names pageQueue. Create the
proper consumer for that
+ messageConsumer.close();
+ messageConsumer = session.createConsumer(pageQueue);
- for (int i = 0; i < 20; i++) {
- message = (BytesMessage) messageConsumer.receive(1000);
+ for (int i = 0; i < 20; i++) {
+ message = (BytesMessage) messageConsumer.receive(1000);
- System.out.println("Received message " + i + " from pageQueue");
+ System.out.println("Received message " + i + " from pageQueue");
- message.acknowledge();
+ message.acknowledge();
+ }
}
+
} finally {
// And finally, always remember to close your JMS connections after
use, in a finally block. Closing a JMS
// connection will automatically close all of its sessions,
consumers, producer and browser objects
@@ -126,9 +192,6 @@ public class PagingExample {
initialContext.close();
}
- if (connection != null) {
- connection.close();
- }
}
}
}
diff --git
a/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
b/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
index 9144c901f2..a4dfcd3d0e 100644
---
a/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
+++
b/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
@@ -52,6 +52,16 @@ under the License.
<permission roles="guest" type="send"/>
</security-setting>
+ <!--security for example pagingQueueLimited-->
+ <security-setting match="pagingQueueLimited">
+ <permission roles="guest" type="createDurableQueue"/>
+ <permission roles="guest" type="deleteDurableQueue"/>
+ <permission roles="guest" type="createNonDurableQueue"/>
+ <permission roles="guest" type="deleteNonDurableQueue"/>
+ <permission roles="guest" type="consume"/>
+ <permission roles="guest" type="send"/>
+ </security-setting>
+
<security-setting match="pagingQueue">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
@@ -68,6 +78,17 @@ under the License.
<page-size-bytes>20000</page-size-bytes>
</address-setting>
+ <address-setting match="pagingQueueLimited">
+ <!-- what to do after max-size-messages is reached. We start
paging at 100 messages -->
+ <address-full-policy>PAGE</address-full-policy>
+ <!-- how soon we should enter into paging -->
+ <max-size-messages>100</max-size-messages>
+ <!-- how many messages we should accept into paging before
failing. We start failing after we recorded 1000 messages on paging. -->
+ <page-limit-messages>1000</page-limit-messages>
+ <!-- what to do after page-limit is used -->
+ <page-full-policy>FAIL</page-full-policy>
+ </address-setting>
+
<address-setting match="exampleQueue">
<max-size-bytes>10Mb</max-size-bytes>
<page-size-bytes>1Mb</page-size-bytes>
@@ -84,6 +105,11 @@ under the License.
<queue name="pagingQueue"/>
</anycast>
</address>
+ <address name="pagingQueueLimited">
+ <anycast>
+ <queue name="pagingQueueLimited"/>
+ </anycast>
+ </address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java
new file mode 100644
index 0000000000..a3e1bf04d1
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PagingLimitTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ ActiveMQServer server;
+
+ @Test
+ public void testPageLimitMessageCoreFail() throws Exception {
+ testPageLimitMessage("CORE", false);
+ }
+
+ @Test
+ public void testPageLimitAMQPFail() throws Exception {
+ testPageLimitMessage("AMQP", false);
+ }
+
+ @Test
+ public void testPageLimitMessagesOpenWireFail() throws Exception {
+ testPageLimitMessage("OPENWIRE", false);
+ }
+
+ @Test
+ public void testPageLimitMessageCoreDrop() throws Exception {
+ testPageLimitMessage("CORE", false);
+ }
+
+ @Test
+ public void testPageLimitAMQPDrop() throws Exception {
+ testPageLimitMessage("AMQP", false);
+ }
+
+ @Test
+ public void testPageLimitMessagesOpenWireDrop() throws Exception {
+ testPageLimitMessage("OPENWIRE", false);
+ }
+
+ public void testPageLimitMessage(String protocol, boolean drop) throws
Exception {
+
+ String queueNameTX = getName() + "_TX";
+ String queueNameNonTX = getName() + "_NONTX";
+
+ Configuration config = createDefaultConfig(true);
+
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
+
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, null,
300L, drop ? "DROP" : "FAIL", null);
+ server.start();
+
+ server.addAddressInfo(new
AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST));
+ server.addAddressInfo(new
AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST));
+
+ Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null);
+ Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null);
+
+ testPageLimitMessageFailInternal(queueNameTX, protocol, true, drop);
+ testPageLimitMessageFailInternal(queueNameNonTX, protocol, false, drop);
+
+ }
+
+ private void testPageLimitMessageFailInternal(String queueName,
+ String protocol,
+ boolean transacted,
+ boolean drop) throws
Exception {
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.locateQueue(queueName);
+ Assert.assertNotNull(serverQueue);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(transacted, transacted ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+ connection.start();
+
+ for (int i = 0; i < 100; i++) {
+ TextMessage message = session.createTextMessage("initial " + i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ if (transacted) {
+ session.commit();
+ Assert.assertTrue(serverQueue.getPagingStore().isPaging());
+ }
+
+ for (int i = 0; i < 300; i++) {
+ if (i == 200) {
+ // the initial sent has to be consumed on transaction as we
need a sync on the consumer for AMQP
+ try (MessageConsumer consumer = session.createConsumer(queue)) {
+ for (int initI = 0; initI < 100; initI++) {
+ TextMessage recMessage = (TextMessage)
consumer.receive(1000);
+ Assert.assertEquals("initial " + initI,
recMessage.getText());
+ }
+ }
+ if (transacted) {
+ session.commit();
+ }
+ Wait.assertEquals(200L, serverQueue::getMessageCount);
+ }
+
+ try {
+ TextMessage message = session.createTextMessage("hello world "
+ i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ if (i % 100 == 0) {
+ logger.info("sent " + i);
+ }
+ if (transacted) {
+ if (i % 100 == 0 && i > 0) {
+ session.commit();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ Assert.fail("Exception happened at " + i);
+ }
+ }
+ if (transacted) {
+ session.commit();
+ }
+
+ try {
+ producer.send(session.createTextMessage("should not complete"));
+ if (transacted) {
+ session.commit();
+ }
+ if (!drop) {
+ Assert.fail("an Exception was expected");
+ }
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224120"));
+ } catch (JMSException e) {
+ logger.debug("Expected exception, ok!", e);
+ }
+
+
+ Assert.assertTrue(serverQueue.getPagingStore().isPaging());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ for (int i = 0; i < 150; i++) { // we will consume half of the
messages
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("hello world " + i, message.getText());
+ Assert.assertEquals(i, message.getIntProperty("i"));
+ if (transacted) {
+ if (i % 100 == 0 && i > 0) {
+ session.commit();
+ }
+ }
+ }
+ if (transacted) {
+ session.commit();
+ }
+ Future<Boolean> cleanupDone =
serverQueue.getPagingStore().getCursorProvider().scheduleCleanup();
+
+ Assert.assertTrue(cleanupDone.get(30, TimeUnit.SECONDS));
+
+
+
+ for (int i = 300; i < 450; i++) {
+ try {
+ TextMessage message = session.createTextMessage("hello world "
+ i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ if (i % 100 == 0) {
+ logger.info("sent " + i);
+ }
+ if (transacted) {
+ if (i % 10 == 0 && i > 0) {
+ session.commit();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ Assert.fail("Exception happened at " + i);
+ }
+ }
+ if (transacted) {
+ session.commit();
+ }
+ AssertionLoggerHandler.clear();
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
+
+
+ try {
+ producer.send(session.createTextMessage("should not complete"));
+ if (transacted) {
+ session.commit();
+ }
+ if (!drop) {
+ Assert.fail("an Exception was expected");
+ } else {
+
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
+ }
+ } catch (JMSException e) {
+ logger.debug("Expected exception, ok!", e);
+ }
+
+ for (int i = 150; i < 450; i++) { // we will consume half of the
messages
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("hello world " + i, message.getText());
+ Assert.assertEquals(i, message.getIntProperty("i"));
+ if (transacted) {
+ if (i % 100 == 0 && i > 0) {
+ session.commit();
+ }
+ }
+ }
+
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+
+ }
+
+
+ @Test
+ public void testPageLimitBytesAMQP() throws Exception {
+ testPageLimitBytes("AMQP");
+ }
+
+ @Test
+ public void testPageLimitBytesCore() throws Exception {
+ testPageLimitBytes("CORE");
+ }
+
+ @Test
+ public void testPageLimitBytesOpenWire() throws Exception {
+ testPageLimitBytes("OPENWIRE");
+ }
+
+ public void testPageLimitBytes(String protocol) throws Exception {
+
+ String queueNameTX = getName() + "_TX";
+ String queueNameNonTX = getName() + "_NONTX";
+
+ Configuration config = createDefaultConfig(true);
+
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
+
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1,
(long)(PAGE_MAX * 10), null, "FAIL", null);
+ server.start();
+
+ server.addAddressInfo(new
AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST));
+ server.addAddressInfo(new
AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST));
+
+ Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null);
+ Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null);
+
+ testPageLimitBytesFailInternal(queueNameTX, protocol, true);
+ testPageLimitBytesFailInternal(queueNameNonTX, protocol, false);
+
+ }
+
+
+
+ private void testPageLimitBytesFailInternal(String queueName,
+ String protocol,
+ boolean transacted) throws
Exception {
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.locateQueue(queueName);
+ Assert.assertNotNull(serverQueue);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(transacted, transacted ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+ connection.start();
+
+ int successfullSends = 0;
+ boolean failed = false;
+
+ for (int i = 0; i < 1000; i++) {
+ try {
+ TextMessage message = session.createTextMessage("hello world "
+ i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ if (transacted) {
+ session.commit();
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ failed = true;
+ break;
+ }
+ successfullSends++;
+ }
+
+ Wait.assertEquals(successfullSends, serverQueue::getMessageCount);
+ Assert.assertTrue(failed);
+
+ int reads = successfullSends / 2;
+
+ connection.start();
+ try (MessageConsumer consumer = session.createConsumer(queue)) {
+ for (int i = 0; i < reads; i++) { // we will consume half of the
messages
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("hello world " + i, message.getText());
+ Assert.assertEquals(i, message.getIntProperty("i"));
+ if (transacted) {
+ if (i % 100 == 0 && i > 0) {
+ session.commit();
+ }
+ }
+ }
+ if (transacted) {
+ session.commit();
+ }
+ }
+
+ failed = false;
+
+ int originalSuccess = successfullSends;
+
+ Future<Boolean> result =
serverQueue.getPagingStore().getCursorProvider().scheduleCleanup();
+ Assert.assertTrue(result.get(10, TimeUnit.SECONDS));
+
+ for (int i = successfullSends; i < 1000; i++) {
+ try {
+ TextMessage message = session.createTextMessage("hello world "
+ i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ if (transacted) {
+ session.commit();
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ failed = true;
+ break;
+ }
+ successfullSends++;
+ }
+
+ Assert.assertTrue(failed);
+ Assert.assertTrue(successfullSends > originalSuccess);
+
+ try (MessageConsumer consumer = session.createConsumer(queue)) {
+ for (int i = reads; i < successfullSends; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("hello world " + i, message.getText());
+ Assert.assertEquals(i, message.getIntProperty("i"));
+ if (transacted) {
+ if (i % 100 == 0 && i > 0) {
+ session.commit();
+ }
+ }
+ }
+ if (transacted) {
+ session.commit();
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+
+
+ }
+
+ }
+
+
+
+}
\ No newline at end of file
diff --git
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 5297e6dcf0..4317ceaea1 100644
---
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -30,6 +30,7 @@ import
org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@@ -245,6 +247,36 @@ public class PersistMultiThreadTest extends
ActiveMQTestBase {
class FakePagingStore implements PagingStore {
+ @Override
+ public PageFullMessagePolicy getPageFullMessagePolicy() {
+ return null;
+ }
+
+ @Override
+ public Long getPageLimitMessages() {
+ return null;
+ }
+
+ @Override
+ public Long getPageLimitBytes() {
+ return null;
+ }
+
+ @Override
+ public void pageFull(PageSubscription subscription) {
+
+ }
+
+ @Override
+ public boolean isPageFull() {
+ return false;
+ }
+
+ @Override
+ public void checkPageLimit(long numberOfMessages) {
+
+ }
+
@Override
public void counterSnapshot() {
}