This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 764db34e9b ARTEMIS-3178 Page Limitting (max messages and max bytes)
764db34e9b is described below

commit 764db34e9ba453b23b7c8b7154ab9ba710099874
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Jan 25 09:59:06 2023 -0500

    ARTEMIS-3178 Page Limitting (max messages and max bytes)
    
    I am adding three attributes to Address-settings:
    
    * page-limit-bytes: Number of bytes. We will convert this metric into max 
number of pages internally by dividing max-bytes / page-size. It will allow a 
max based on an estimate.
    * page-limit-messages: Number of messages
    * page-full-message-policy: fail or drop
    
    We will now allow paging, until these max values and then fail or drop 
messages.
    
    Once these values are retracted, the address will remain full until a 
period where cleanup is kicked in by paging. So these values may have a certain 
delay on being applied, but they should always be cleared once cleanup happened.
---
 .../core/settings/impl/PageFullMessagePolicy.java  |  21 ++
 .../artemis/core/config/impl/Validators.java       |  13 +
 .../deployers/impl/FileConfigurationParser.java    |  20 +
 .../activemq/artemis/core/paging/PagingStore.java  |  15 +
 .../core/paging/cursor/PageCursorProvider.java     |   3 +-
 .../paging/cursor/impl/PageCursorProviderImpl.java |  37 +-
 .../cursor/impl/PageSubscriptionCounterImpl.java   |  20 +-
 .../artemis/core/paging/impl/PagingStoreImpl.java  | 143 +++++++
 .../artemis/core/server/ActiveMQMessageBundle.java |   3 +
 .../artemis/core/server/ActiveMQServerLogger.java  |  19 +
 .../core/settings/impl/AddressSettings.java        | 110 +++++-
 .../resources/schema/artemis-configuration.xsd     |  35 +-
 .../core/config/impl/ConfigurationImplTest.java    | 128 +++++++
 .../config/impl/FileConfigurationParserTest.java   |  19 +
 .../artemis/tests/util/ActiveMQTestBase.java       |  35 ++
 docs/user-manual/en/paging.md                      |  14 +
 .../artemis/jms/example/PagingExample.java         | 175 ++++++---
 .../src/main/resources/activemq/server0/broker.xml |  26 ++
 .../tests/integration/paging/PagingLimitTest.java  | 416 +++++++++++++++++++++
 .../storage/PersistMultiThreadTest.java            |  32 ++
 20 files changed, 1219 insertions(+), 65 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java
new file mode 100644
index 0000000000..8da0dd38fd
--- /dev/null
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.settings.impl;
+
+public enum PageFullMessagePolicy {
+   DROP, FAIL
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index 3aa5af5ef7..a09f75d616 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import 
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
 
@@ -195,6 +196,18 @@ public final class Validators {
       }
    };
 
+   public static final Validator PAGE_FULL_MESSAGE_POLICY_TYPE = new 
Validator() {
+      @Override
+      public void validate(final String name, final Object value) {
+         String val = (String) value;
+         if (val == null ||
+            !val.equals(PageFullMessagePolicy.DROP.toString()) &&
+            !val.equals(PageFullMessagePolicy.FAIL.toString())) {
+            throw 
ActiveMQMessageBundle.BUNDLE.invalidAddressFullPolicyType(val);
+         }
+      }
+   };
+
    public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = 
new Validator() {
       @Override
       public void validate(final String name, final Object value) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 3bdb0ca706..8461db483c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -98,6 +98,7 @@ import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import 
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
@@ -218,6 +219,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
    private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME = 
"address-full-policy";
 
+   private static final String PAGE_FULL_MESSAGE_POLICY_NODE_NAME = 
"page-full-policy";
+
    private static final String MAX_READ_PAGE_BYTES_NODE_NAME = 
"max-read-page-bytes";
 
    private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = 
"max-read-page-messages";
@@ -226,6 +229,10 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
    private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = 
"page-max-cache-size";
 
+   private static final String PAGE_LIMIT_BYTES_NODE_NAME = "page-limit-bytes";
+
+   private static final String PAGE_LIMIT_MESSAGES_NODE_NAME = 
"page-limit-messages";
+
    private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = 
"message-counter-history-day-limit";
 
    private static final String LVQ_NODE_NAME = "last-value-queue";
@@ -1281,6 +1288,14 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
                ActiveMQServerLogger.LOGGER.pageMaxSizeUsed();
             }
             addressSettings.setPageCacheMaxSize(XMLUtil.parseInt(child));
+         } else if (PAGE_LIMIT_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
+            long pageLimitBytes = 
ByteUtil.convertTextBytes(getTrimmedTextContent(child));
+            
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_BYTES_NODE_NAME, 
pageLimitBytes);
+            addressSettings.setPageLimitBytes(pageLimitBytes);
+         } else if (PAGE_LIMIT_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
+            long pageLimitMessages = 
ByteUtil.convertTextBytes(getTrimmedTextContent(child));
+            
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_MESSAGES_NODE_NAME, 
pageLimitMessages);
+            addressSettings.setPageLimitMessages(pageLimitMessages);
          } else if 
(MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(name)) {
             
addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child));
          } else if 
(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
@@ -1288,6 +1303,11 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
             
Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME,
 value);
             AddressFullMessagePolicy policy = 
Enum.valueOf(AddressFullMessagePolicy.class, value);
             addressSettings.setAddressFullMessagePolicy(policy);
+         } else if (PAGE_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) 
{
+            String value = getTrimmedTextContent(child);
+            
Validators.PAGE_FULL_MESSAGE_POLICY_TYPE.validate(PAGE_FULL_MESSAGE_POLICY_NODE_NAME,
 value);
+            PageFullMessagePolicy policy = 
Enum.valueOf(PageFullMessagePolicy.class, value);
+            addressSettings.setPageFullMessagePolicy(policy);
          } else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || 
DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
             
addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child));
          } else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index da680fce6b..d582b0d712 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessageListener;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
@@ -30,6 +31,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
 
    AddressFullMessagePolicy getAddressFullMessagePolicy();
 
+   PageFullMessagePolicy getPageFullMessagePolicy();
+
+   Long getPageLimitMessages();
+
+   Long getPageLimitBytes();
+
+   /** Callback to be used by a counter when the Page is full for that counter 
*/
+   void pageFull(PageSubscription subscription);
+
+   boolean isPageFull();
+
+   void checkPageLimit(long numberOfMessages);
+
    long getFirstPage();
 
    int getPageSizeBytes();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index d3f25e21b5..b42049fe55 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
+import java.util.concurrent.Future;
 import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -46,7 +47,7 @@ public interface PageCursorProvider {
 
    void flushExecutors();
 
-   void scheduleCleanup();
+   Future<Boolean> scheduleCleanup();
 
    void disableCleanup();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 24d65970f8..dea4cd5fbc 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -37,12 +38,14 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.ArtemisCloseable;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
 import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
 import org.apache.activemq.artemis.utils.collections.LinkedList;
 import org.apache.activemq.artemis.utils.collections.LongHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 public class PageCursorProviderImpl implements PageCursorProvider {
@@ -179,11 +182,14 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
    }
 
    @Override
-   public void scheduleCleanup() {
+   public Future<Boolean> scheduleCleanup() {
+      final SimpleFutureImpl<Boolean> future = new SimpleFutureImpl<>();
       if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
-         // Scheduled cleanup was already scheduled before.. never mind!
-         // or we have cleanup disabled
-         return;
+         // Scheduled cleanup was already scheduled before.
+         // On that case just flush the executor returning the future.set(true)
+         // after any previous scheduled cleanup is finished.
+         pagingStore.execute(() -> future.set(true));
+         return future;
       }
 
       scheduledCleanup.incrementAndGet();
@@ -199,9 +205,12 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
             } finally {
                storageManager.clearContext();
                scheduledCleanup.decrementAndGet();
+               future.set(true);
             }
          }
       });
+
+      return future;
    }
 
    /**
@@ -241,6 +250,22 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
       scheduleCleanup();
    }
 
+   private long getNumberOfMessagesOnSubscriptions() {
+      AtomicLong largerCounter = new AtomicLong();
+      activeCursors.forEach((id, sub) -> {
+         long value = sub.getCounter().getValue();
+         if (value > largerCounter.get()) {
+            largerCounter.set(value);
+         }
+      });
+
+      return largerCounter.get();
+   }
+
+   void checkClearPageLimit() {
+      pagingStore.checkPageLimit(getNumberOfMessagesOnSubscriptions());
+   }
+
    protected void cleanup() {
 
       if (!countersRebuilt) {
@@ -299,6 +324,10 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
                // Then we do some check on eventual pages that can be already 
removed but they are away from the streaming
                cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, 
minPage, firstPage);
 
+               if (pagingStore.isPageFull()) {
+                  checkClearPageLimit();
+               }
+
                assert pagingStore.getNumberOfPages() >= 0;
 
                if (pagingStore.getNumberOfPages() == 0 || 
pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || 
pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index f580be47f5..41d0255a21 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -20,6 +20,7 @@ import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -56,6 +57,8 @@ public class PageSubscriptionCounterImpl extends 
BasePagingCounter {
 
    private PageSubscription subscription;
 
+   private PagingStore pagingStore;
+
    private final StorageManager storage;
 
    private volatile long value;
@@ -187,11 +190,16 @@ public class PageSubscriptionCounterImpl extends 
BasePagingCounter {
       if (logger.isTraceEnabled()) {
          logger.trace("process subscription={} add={}, size={}", 
subscriptionID, add, size);
       }
-      valueUpdater.addAndGet(this, add);
+      long value = valueUpdater.addAndGet(this, add);
       persistentSizeUpdater.addAndGet(this, size);
       if (add > 0) {
          addedUpdater.addAndGet(this, add);
          addedPersistentSizeUpdater.addAndGet(this, size);
+
+         /// we could have pagingStore null on tests, so we need to validate 
if pagingStore != null before anything...
+         if (pagingStore != null && pagingStore.getPageFullMessagePolicy() != 
null && !pagingStore.isPageFull()) {
+            checkAdd(value);
+         }
       }
 
       if (isRebuilding()) {
@@ -200,6 +208,15 @@ public class PageSubscriptionCounterImpl extends 
BasePagingCounter {
       }
    }
 
+   private void checkAdd(long numberOfMessages) {
+      Long pageLimitMessages = pagingStore.getPageLimitMessages();
+      if (pageLimitMessages != null) {
+         if (numberOfMessages >= pageLimitMessages.longValue()) {
+            pagingStore.pageFull(this.subscription);
+         }
+      }
+   }
+
    @Override
    public void delete() throws Exception {
       Transaction tx = new TransactionImpl(storage);
@@ -420,6 +437,7 @@ public class PageSubscriptionCounterImpl extends 
BasePagingCounter {
    @Override
    public PageSubscriptionCounter setSubscription(PageSubscription 
subscription) {
       this.subscription = subscription;
+      this.pagingStore = subscription.getPagingStore();
       return this;
    }
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 81d10b9691..137d18a21b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -52,6 +53,7 @@ import 
org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@@ -99,6 +101,16 @@ public class PagingStoreImpl implements PagingStore {
 
    private long maxMessages;
 
+   private volatile boolean pageFull;
+
+   private Long pageLimitBytes;
+
+   private Long estimatedMaxPages;
+
+   private Long pageLimitMessages;
+
+   private PageFullMessagePolicy pageFullMessagePolicy;
+
    private int pageSize;
 
    private volatile AddressFullMessagePolicy addressFullMessagePolicy;
@@ -225,6 +237,40 @@ public class PagingStoreImpl implements PagingStore {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      if (pageLimitBytes != null && pageLimitBytes.longValue() < 0) {
+         logger.debug("address {} had pageLimitBytes<0, setting it as null", 
address);
+         pageLimitBytes = null;
+      }
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitMessages != null && pageLimitMessages.longValue() < 0) {
+         logger.debug("address {} had pageLimitMessages<0, setting it as 
null", address);
+         pageLimitMessages = null;
+      }
+
+
+      if (pageLimitBytes == null && pageLimitMessages == null && 
pageFullMessagePolicy != null) {
+         ActiveMQServerLogger.LOGGER.noPageLimitsSet(address, 
pageFullMessagePolicy);
+         this.pageFullMessagePolicy = null;
+      }
+
+      if (pageLimitBytes != null && pageLimitMessages != null && 
pageFullMessagePolicy == null) {
+         ActiveMQServerLogger.LOGGER.noPagefullPolicySet(address, 
pageLimitBytes, pageLimitMessages);
+         this.pageFullMessagePolicy = null;
+         this.pageLimitMessages = null;
+         this.pageLimitBytes = null;
+      }
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", 
storeName, estimatedMaxPages);
+      }
    }
 
    @Override
@@ -232,6 +278,79 @@ public class PagingStoreImpl implements PagingStore {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), 
subscription.getQueue().getAddress(), pageLimitMessages, 
subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being 
cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBelowPageLimitBytes() {
+      if (estimatedMaxPages != null) {
+         return (numberOfPages <= estimatedMaxPages.longValue());
+      }  else {
+         return true;
+      }
+   }
+
+   private void checkNumberOfPages() {
+      if (!isBelowPageLimitBytes()) {
+         this.pageFull = true;
+         ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, 
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      }
+   }
+
+   @Override
+   public void checkPageLimit(long numberOfMessages) {
+      boolean pageMessageMessagesClear = true;
+      Long pageLimitMessages = getPageLimitMessages();
+
+      if (pageLimitMessages != null) {
+         if (logger.isDebugEnabled()) { // gate to avoid boxing of 
numberOfMessages
+            logger.debug("Address {} has {} messages on the larger queue", 
storeName, numberOfMessages);
+         }
+
+         pageMessageMessagesClear = (numberOfMessages < 
pageLimitMessages.longValue());
+      }
+
+      boolean pageMessageBytesClear = isBelowPageLimitBytes();
+
+      if (pageMessageBytesClear && pageMessageMessagesClear) {
+         pageLimitReleased();
+      }
+   }
+
+   private void pageLimitReleased() {
+      if (pageFull) {
+         ActiveMQServerLogger.LOGGER.pageFree(getAddress());
+         this.pageFull = false;
+      }
+   }
+
    @Override
    public boolean lock(long timeout) {
       if (timeout == -1) {
@@ -480,6 +599,8 @@ public class PagingStoreImpl implements PagingStore {
 
                numberOfPages = files.size();
 
+               checkNumberOfPages();
+
                for (String fileName : files) {
                   final int fileId = 
PagingStoreImpl.getPageIdFromFileName(fileName);
 
@@ -556,6 +677,7 @@ public class PagingStoreImpl implements PagingStore {
          if (isPaging) {
             paging = false;
             ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, 
getPageInfo());
+            pageLimitReleased();
          }
          this.cursorProvider.onPageModeCleared();
       } finally {
@@ -1029,6 +1151,25 @@ public class PagingStoreImpl implements PagingStore {
          lock.readLock().unlock();
       }
 
+      if (pageFull) {
+         if (message.isLargeMessage()) {
+            ((LargeServerMessage) message).deleteFile();
+         }
+
+         if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) {
+            throw 
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+         }
+
+         if (!printedDropMessagesWarning) {
+            printedDropMessagesWarning = true;
+            ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, 
getPageInfo());
+         }
+
+         // we are in page mode, if we got to this point, we are dropping the 
message while still paging
+         // this needs to return true as it is paging
+         return true;
+      }
+
       return writePage(message, tx, listCtx);
    }
 
@@ -1282,6 +1423,8 @@ public class PagingStoreImpl implements PagingStore {
       try {
          numberOfPages++;
 
+         checkNumberOfPages();
+
          final long newPageId = currentPageId + 1;
 
          if (logger.isTraceEnabled()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 2c2f632d93..05625fd3af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -527,4 +527,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 229245, value = "Management controller is busy with another 
task. Please try again")
    ActiveMQTimeoutException managementBusy();
+
+   @Message(id = 229246, value = "Invalid page full message policy type {}")
+   IllegalArgumentException invalidPageFullPolicyType(String val);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index eaec474036..03426d92b8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1563,4 +1563,23 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224119, value = "Unable to refresh security settings: {}", 
level = LogMessage.Level.WARN)
    void unableToRefreshSecuritySettings(String exceptionMessage);
+
+   @LogMessage(id = 224120, value = "Queue {} on Address {} has more messages 
than configured page limit. PageLimitMesages={} while currentValue={}", level = 
LogMessage.Level.WARN)
+   void pageFull(SimpleString queue, SimpleString address, Object 
pageLImitMessage, Object currentValue);
+
+   @LogMessage(id = 224121, value = "Queue {} on Address {} is out of page 
limit now. We will issue a cleanup to check other queues.", level = 
LogMessage.Level.WARN)
+   void pageFree(SimpleString queue, SimpleString address);
+
+   @LogMessage(id = 224122, value = "Address {} number of messages is under 
page limit again, and it should be allowed to page again.", level = 
LogMessage.Level.INFO)
+   void pageFree(SimpleString address);
+
+   @LogMessage(id = 224123, value = "Address {} has more pages than allowed. 
System currently has {} pages, while the estimated max number of pages is {}, 
based on the limitPageBytes ({}) / page-size ({})", level = 
LogMessage.Level.WARN)
+   void pageFullMaxBytes(SimpleString address, long pages, long maxPages, long 
limitBytes, long bytes);
+
+   @LogMessage(id = 224124, value = "Address {} has a pageFullPolicy set as {} 
but there are not page-limit-bytes or page-limit-messages set. Page full 
configuration being ignored on this address.", level = LogMessage.Level.WARN)
+   void noPageLimitsSet(Object address, Object policy);
+
+   @LogMessage(id = 224125, value = "Address {} has page-limit-bytes={}, 
page-limit-messages={} and no page-full-policy set. Page full configuration 
being ignored on this address", level = LogMessage.Level.WARN)
+   void noPagefullPolicySet(Object address, Object limitBytes, Object 
limitMessages);
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 01ede20416..c06ff5bcf0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -147,6 +147,12 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
 
    private Integer maxReadPageMessages = null;
 
+   private Long pageLimitBytes = null;
+
+   private Long pageLimitMessages = null;
+
+   private PageFullMessagePolicy pageFullMessagePolicy = null;
+
    private Long maxSizeMessages = null;
 
    private Integer pageSizeBytes = null;
@@ -289,6 +295,9 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       this.maxSizeMessages = other.maxSizeMessages;
       this.maxReadPageMessages = other.maxReadPageMessages;
       this.maxReadPageBytes = other.maxReadPageBytes;
+      this.pageLimitBytes = other.pageLimitBytes;
+      this.pageLimitMessages = other.pageLimitMessages;
+      this.pageFullMessagePolicy = other.pageFullMessagePolicy;
       this.pageSizeBytes = other.pageSizeBytes;
       this.pageMaxCache = other.pageMaxCache;
       this.dropMessagesWhenFull = other.dropMessagesWhenFull;
@@ -644,6 +653,33 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   public AddressSettings setPageLimitBytes(Long pageLimitBytes) {
+      this.pageLimitBytes = pageLimitBytes;
+      return this;
+   }
+
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   public AddressSettings setPageLimitMessages(Long pageLimitMessages) {
+      this.pageLimitMessages = pageLimitMessages;
+      return this;
+   }
+
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return this.pageFullMessagePolicy;
+   }
+
+   public AddressSettings setPageFullMessagePolicy(PageFullMessagePolicy 
policy) {
+      this.pageFullMessagePolicy = policy;
+      return this;
+   }
+
    public int getMaxReadPageBytes() {
       return maxReadPageBytes != null ? maxReadPageBytes : 2 * 
getPageSizeBytes();
    }
@@ -1223,6 +1259,15 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       if (enableIngressTimestamp == null) {
          enableIngressTimestamp = merged.enableIngressTimestamp;
       }
+      if (pageFullMessagePolicy == null) {
+         pageFullMessagePolicy = merged.pageFullMessagePolicy;
+      }
+      if (pageLimitBytes == null) {
+         pageLimitBytes = merged.pageLimitBytes;
+      }
+      if (pageLimitMessages == null) {
+         pageLimitMessages = merged.pageLimitMessages;
+      }
    }
 
    @Override
@@ -1472,6 +1517,24 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          maxReadPageMessages = BufferHelper.readNullableInteger(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         pageLimitBytes = BufferHelper.readNullableLong(buffer);
+      }
+
+      if (buffer.readableBytes() > 0) {
+         pageLimitMessages = BufferHelper.readNullableLong(buffer);
+      }
+
+      if (buffer.readableBytes() > 0) {
+         policyStr = buffer.readNullableSimpleString();
+
+         if (policyStr != null) {
+            pageFullMessagePolicy = 
PageFullMessagePolicy.valueOf(policyStr.toString());
+         } else {
+            pageFullMessagePolicy = null;
+         }
+      }
    }
 
    @Override
@@ -1542,7 +1605,10 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) +
          BufferHelper.sizeOfNullableLong(maxSizeMessages) +
          BufferHelper.sizeOfNullableInteger(maxReadPageMessages) +
-         BufferHelper.sizeOfNullableInteger(maxReadPageBytes);
+         BufferHelper.sizeOfNullableInteger(maxReadPageBytes) +
+         BufferHelper.sizeOfNullableLong(pageLimitBytes) +
+         BufferHelper.sizeOfNullableLong(pageLimitMessages) +
+         BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null 
? pageFullMessagePolicy.toString() : null);
    }
 
    @Override
@@ -1682,6 +1748,13 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       BufferHelper.writeNullableInteger(buffer, maxReadPageBytes);
 
       BufferHelper.writeNullableInteger(buffer, maxReadPageMessages);
+
+      BufferHelper.writeNullableLong(buffer, pageLimitBytes);
+
+      BufferHelper.writeNullableLong(buffer, pageLimitMessages);
+
+      buffer.writeNullableSimpleString(pageFullMessagePolicy != null ? new 
SimpleString(pageFullMessagePolicy.toString()) : null);
+
    }
 
    /* (non-Javadoc)
@@ -1758,6 +1831,10 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       result = prime * result + ((slowConsumerThresholdMeasurementUnit == 
null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
       result = prime * result + ((enableIngressTimestamp == null) ? 0 : 
enableIngressTimestamp.hashCode());
       result = prime * result + ((maxSizeMessages == null) ? 0 : 
maxSizeMessages.hashCode());
+      result = prime * result + ((pageLimitBytes == null) ? 0 : 
pageLimitBytes.hashCode());
+      result = prime * result + ((pageLimitMessages == null) ? 0 : 
pageLimitMessages.hashCode());
+      result = prime * result + ((pageFullMessagePolicy == null) ? 0 : 
pageFullMessagePolicy.hashCode());
+
       return result;
    }
 
@@ -2130,6 +2207,31 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       } else if (!maxSizeMessages.equals(other.maxSizeMessages))
          return false;
 
+      if (pageLimitBytes == null) {
+         if (other.pageLimitBytes != null) {
+            return false;
+         }
+      } else if (!pageLimitBytes.equals(other.pageLimitBytes)) {
+         return false;
+      }
+
+      if (pageLimitMessages == null) {
+         if (other.pageLimitMessages != null) {
+            return false;
+         }
+      } else if (!pageLimitMessages.equals(other.pageLimitMessages)) {
+         return false;
+      }
+
+      if (pageFullMessagePolicy == null) {
+         if (other.pageFullMessagePolicy != null) {
+            return false;
+         }
+      } else if (!pageFullMessagePolicy.equals(other.pageFullMessagePolicy)) {
+         return false;
+      }
+
+
       return true;
    }
 
@@ -2267,6 +2369,12 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
          enableMetrics +
          ", enableIngressTime=" +
          enableIngressTimestamp +
+         ", pageLimitBytes=" +
+         pageLimitBytes +
+         ", pageLimitMessages=" +
+         pageLimitMessages +
+         ", pageFullMessagePolicy=" +
+         pageFullMessagePolicy +
          "]";
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index fdb30c7c24..a6953148c1 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3757,8 +3757,25 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="page-limit-bytes" type="xsd:string" maxOccurs="1" 
minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  After the address enters into page mode, this attribute will 
configure how many pages can be written into page before activating the 
page-full-policy.
+                  Supports byte notation like "K", "Mb", "MiB", "GB", etc.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
+         <xsd:element name="page-limit-messages" type="xsd:long" maxOccurs="1" 
minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  After the address enters into page mode, this attribute will 
configure how many messages can be written into page before activating the 
page-full-policy.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" 
default="-1" maxOccurs="1"
-                        minOccurs="0">
+                      minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
                   used with the address full BLOCK policy, the maximum size 
(in bytes) an address can reach before
@@ -3819,6 +3836,20 @@
             </xsd:simpleType>
          </xsd:element>
 
+         <xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  After entering page mode, a second limit will be set by 
page-limit-bytes and/or page-limit-messages. The page-full-policy will 
configure what to do when that limit is reached.
+               </xsd:documentation>
+            </xsd:annotation>
+            <xsd:simpleType>
+               <xsd:restriction base="xsd:string">
+                  <xsd:enumeration value="DROP"/>
+                  <xsd:enumeration value="FAIL"/>
+               </xsd:restriction>
+            </xsd:simpleType>
+         </xsd:element>
+
          <xsd:element name="message-counter-history-day-limit" type="xsd:int" 
default="0" maxOccurs="1"
                         minOccurs="0">
             <xsd:annotation>
@@ -4315,7 +4346,7 @@
          <xsd:attributeGroup ref="xml:specialAttrs"/>
       </xsd:complexType>
    </xsd:element>
-   
+
    <xsd:complexType name="connector-serviceType">
       <xsd:sequence>
          <xsd:element maxOccurs="1" minOccurs="1" name="factory-class" 
type="xsd:string">
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 581cc78ea8..1f846cc1eb 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -30,6 +30,7 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
@@ -50,6 +51,12 @@ import 
org.apache.activemq.artemis.core.config.federation.FederationPolicySet;
 import 
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.security.Role;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.JournalType;
@@ -58,10 +65,12 @@ import 
org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServer
 import org.apache.activemq.artemis.core.server.routing.KeyType;
 import 
org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 import org.apache.commons.lang3.ClassUtils;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
@@ -1004,6 +1013,125 @@ public class ConfigurationImplTest extends 
ActiveMQTestBase {
       Assert.assertEquals(SimpleString.toSimpleString("moreImportant"), 
configuration.getAddressSettings().get("Name.With.Dots").getExpiryAddress());
    }
 
+   @Test
+   public void testAddressSettingsPageLimit() throws Throwable {
+      ConfigurationImpl configuration = new ConfigurationImpl();
+
+      Properties properties = new Properties();
+
+      String randomString = RandomUtil.randomString();
+
+      properties.put("addressSettings.#.expiryAddress", randomString);
+      properties.put("addressSettings.#.pageLimitMessages", "300");
+      properties.put("addressSettings.#.pageLimitBytes", "300000");
+      properties.put("addressSettings.#.pageFullMessagePolicy", "DROP");
+
+      configuration.parsePrefixedProperties(properties, null);
+
+      Assert.assertEquals(1, configuration.getAddressSettings().size());
+      Assert.assertEquals(SimpleString.toSimpleString(randomString), 
configuration.getAddressSettings().get("#").getExpiryAddress());
+      Assert.assertEquals(300L, 
configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
+      Assert.assertEquals(300000L, 
configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
+      Assert.assertEquals("DROP", 
configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
+
+      PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), 
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), 
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), 
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), 
configuration.getAddressSettings().get("#"), null, null, true);
+
+      Assert.assertEquals(300L, storeImpl.getPageLimitMessages().longValue());
+      Assert.assertEquals(300000L, storeImpl.getPageLimitBytes().longValue());
+      Assert.assertEquals("DROP", 
storeImpl.getPageFullMessagePolicy().toString());
+   }
+
+   @Test
+   public void testAddressSettingsPageLimitInvalidConfiguration1() throws 
Throwable {
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+      ConfigurationImpl configuration = new ConfigurationImpl();
+
+      Properties properties = new Properties();
+
+      String randomString = RandomUtil.randomString();
+
+      properties.put("addressSettings.#.expiryAddress", randomString);
+      properties.put("addressSettings.#.pageLimitMessages", "300");
+      properties.put("addressSettings.#.pageLimitBytes", "300000");
+      //properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // 
removing the pageFull on purpose
+
+      configuration.parsePrefixedProperties(properties, null);
+
+      Assert.assertEquals(1, configuration.getAddressSettings().size());
+      Assert.assertEquals(SimpleString.toSimpleString(randomString), 
configuration.getAddressSettings().get("#").getExpiryAddress());
+      Assert.assertEquals(300L, 
configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
+      Assert.assertEquals(300000L, 
configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
+      Assert.assertEquals(null, 
configuration.getAddressSettings().get("#").getPageFullMessagePolicy());
+
+      PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), 
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), 
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), 
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), 
configuration.getAddressSettings().get("#"), null, null, true);
+      Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224125"));
+
+      Assert.assertEquals(null, storeImpl.getPageLimitMessages());
+      Assert.assertEquals(null, storeImpl.getPageLimitBytes());
+      Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
+   }
+
+   @Test
+   public void testAddressSettingsPageLimitInvalidConfiguration2() throws 
Throwable {
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+      ConfigurationImpl configuration = new ConfigurationImpl();
+
+      Properties properties = new Properties();
+
+      String randomString = RandomUtil.randomString();
+
+      properties.put("addressSettings.#.expiryAddress", randomString);
+      //properties.put("addressSettings.#.pageLimitMessages", "300"); // 
removing this on purpose
+      //properties.put("addressSettings.#.pageLimitBytes", "300000"); // 
removing this on purpose
+      properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // 
keeping this on purpose
+
+      configuration.parsePrefixedProperties(properties, null);
+
+      Assert.assertEquals(1, configuration.getAddressSettings().size());
+      Assert.assertEquals(SimpleString.toSimpleString(randomString), 
configuration.getAddressSettings().get("#").getExpiryAddress());
+      Assert.assertEquals(null, 
configuration.getAddressSettings().get("#").getPageLimitMessages());
+      Assert.assertEquals(null, 
configuration.getAddressSettings().get("#").getPageLimitBytes());
+      Assert.assertEquals("DROP", 
configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
+
+      PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), 
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), 
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), 
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), 
configuration.getAddressSettings().get("#"), null, null, true);
+      Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224124"));
+
+      Assert.assertEquals(null, storeImpl.getPageLimitMessages());
+      Assert.assertEquals(null, storeImpl.getPageLimitBytes());
+      Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
+   }
+
+
+   @Test
+   public void testAddressSettingsPageLimitInvalidConfiguration3() throws 
Throwable {
+      ConfigurationImpl configuration = new ConfigurationImpl();
+
+      Properties properties = new Properties();
+
+      String randomString = RandomUtil.randomString();
+
+      properties.put("addressSettings.#.expiryAddress", randomString);
+      properties.put("addressSettings.#.pageLimitMessages", "-1"); // -1 on 
purpose, to make it null on final parsing
+      properties.put("addressSettings.#.pageLimitBytes", "-1"); // -1 on 
purpose, to make it null on final parsing
+      properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // 
keeping this on purpose
+
+      configuration.parsePrefixedProperties(properties, null);
+
+      Assert.assertEquals(1, configuration.getAddressSettings().size());
+      Assert.assertEquals(SimpleString.toSimpleString(randomString), 
configuration.getAddressSettings().get("#").getExpiryAddress());
+      Assert.assertEquals(-1L, 
configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
+      Assert.assertEquals(-1L, 
configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
+      Assert.assertEquals("DROP", 
configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
+
+      PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), 
(ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), 
Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), 
Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), 
configuration.getAddressSettings().get("#"), null, null, true);
+
+      Assert.assertEquals(null, storeImpl.getPageLimitMessages());
+      Assert.assertEquals(null, storeImpl.getPageLimitBytes());
+      Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
+   }
+
    @Test
    public void testDivertViaProperties() throws Exception {
       ConfigurationImpl configuration = new ConfigurationImpl();
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 6cf7fda451..1cf36fca22 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -393,6 +393,25 @@ public class FileConfigurationParserTest extends 
ActiveMQTestBase {
       AddressSettings settings = configuration.getAddressSettings().get("foo");
       Assert.assertEquals(1024, settings.getMaxReadPageBytes());
       Assert.assertEquals(33, settings.getMaxReadPageMessages());
+      Assert.assertNull(settings.getPageLimitBytes());
+      Assert.assertNull(settings.getPageLimitMessages());
+      Assert.assertNull(settings.getPageFullMessagePolicy());
+   }
+
+   @Test
+   public void testParsePageLimitSettings() throws Exception {
+      String configStr = "<configuration><address-settings>" + "\n" + 
"<address-setting match=\"foo\">" + "\n" + 
"<max-read-page-bytes>1k</max-read-page-bytes><page-limit-bytes>2k</page-limit-bytes><page-limit-messages>337</page-limit-messages><page-full-policy>FAIL</page-full-policy><max-read-page-messages>33</max-read-page-messages>.\n"
 + "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
+
+      FileConfigurationParser parser = new FileConfigurationParser();
+      ByteArrayInputStream input = new 
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      Configuration configuration = parser.parseMainConfig(input);
+      AddressSettings settings = configuration.getAddressSettings().get("foo");
+      Assert.assertEquals(1024, settings.getMaxReadPageBytes());
+      Assert.assertEquals(33, settings.getMaxReadPageMessages());
+      Assert.assertEquals(2048L, settings.getPageLimitBytes().longValue());
+      Assert.assertEquals(337L, settings.getPageLimitMessages().longValue());
+      Assert.assertEquals("FAIL", 
settings.getPageFullMessagePolicy().toString());
    }
 
    @Test
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index db80b028ab..3a19d098dc 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -134,6 +134,7 @@ import 
org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation;
 import 
org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@@ -1505,12 +1506,28 @@ public abstract class ActiveMQTestBase extends Assert {
       return createServer(realFiles, configuration, pageSize, maxAddressSize, 
null, null, settings);
    }
 
+
+
+   protected final ActiveMQServer createServer(final boolean realFiles,
+                                               final Configuration 
configuration,
+                                               final int pageSize,
+                                               final long maxAddressSize,
+                                               final Integer 
maxReadPageMessages,
+                                               final Integer maxReadPageBytes,
+                                               final Map<String, 
AddressSettings> settings) {
+      return  createServer(realFiles, configuration, pageSize, maxAddressSize, 
maxReadPageMessages, maxReadPageBytes, null, null, null, settings);
+
+   }
+
    protected final ActiveMQServer createServer(final boolean realFiles,
                                          final Configuration configuration,
                                          final int pageSize,
                                          final long maxAddressSize,
                                          final Integer maxReadPageMessages,
                                          final Integer maxReadPageBytes,
+                                         final Long pageLimitBytes,
+                                         final Long pageLimitMessages,
+                                         final String pageLimitPolicy,
                                          final Map<String, AddressSettings> 
settings) {
       ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
 
@@ -1522,6 +1539,15 @@ public abstract class ActiveMQTestBase extends Assert {
             if (maxReadPageMessages != null) {
                
setting.getValue().setMaxReadPageMessages(maxReadPageMessages.intValue());
             }
+            if (pageLimitBytes != null) {
+               setting.getValue().setPageLimitBytes(pageLimitBytes);
+            }
+            if (pageLimitMessages != null) {
+               setting.getValue().setPageLimitMessages(pageLimitMessages);
+            }
+            if (pageLimitPolicy != null) {
+               
setting.getValue().setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy));
+            }
             server.getAddressSettingsRepository().addMatch(setting.getKey(), 
setting.getValue());
          }
       }
@@ -1533,6 +1559,15 @@ public abstract class ActiveMQTestBase extends Assert {
       if (maxReadPageMessages != null) {
          defaultSetting.setMaxReadPageMessages(maxReadPageMessages.intValue());
       }
+      if (pageLimitBytes != null) {
+         defaultSetting.setPageLimitBytes(pageLimitBytes);
+      }
+      if (pageLimitMessages != null) {
+         defaultSetting.setPageLimitMessages(pageLimitMessages);
+      }
+      if (pageLimitPolicy != null) {
+         
defaultSetting.setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy));
+      }
 
       server.getAddressSettingsRepository().addMatch("#", defaultSetting);
 
diff --git a/docs/user-manual/en/paging.md b/docs/user-manual/en/paging.md
index 8b33e26958..b1fe0dcc38 100644
--- a/docs/user-manual/en/paging.md
+++ b/docs/user-manual/en/paging.md
@@ -74,6 +74,9 @@ Configuration is done at the address settings in `broker.xml`.
       <max-size-messages>1000</max-size-messages>
       <page-size-bytes>10485760</page-size-bytes>
       <address-full-policy>PAGE</address-full-policy>
+      <page-limit-bytes>10G</page-limit-bytes>
+      <page-limit-messages>1000000</page-limit-messages>
+      <page-full-policy>FAIL</page-full-policy>
    </address-setting>
 </address-settings>
 ```
@@ -98,6 +101,9 @@ Property Name|Description|Default
 `address-full-policy`|This must be set to `PAGE` for paging to enable. If the 
value is `PAGE` then further messages will be paged to disk. If the value is 
`DROP` then further messages will be silently dropped. If the value is `FAIL` 
then the messages will be dropped and the client message producers will receive 
an exception. If the value is `BLOCK` then client message producers will block 
when they try and send further messages.|`PAGE`
 `max-read-page-messages` | how many message can be read from paging into the 
Queue whenever more messages are needed. The system wtill stop reading if 
`max-read-page-bytes hits the limit first. | -1
 `max-read-page-bytes` | how much memory the messages read from paging can take 
on the Queue whenever more messages are needed. The system will stop reading if 
`max-read-page-messages` hits the limit first. | 2 * page-size-bytes
+`page-limit-bytes` | After entering page mode, how much data would the system 
allow incoming. Notice this will be internally converted as number of pages. | 
+`page-limit-messages` | After entering page mode, how many messages would the 
system allow incoming on paging. | 
+`page-full-policy` | Valid results are DROP or FAIL. This tells what to do if 
the system is reaching `page-limit-bytes` or `page-limit-messages` after paging 
| 
 
 ### max-size-bytes and max-size-messages simultaneous usage
 
@@ -205,6 +211,14 @@ The system should keep at least one paged file in memory 
caching ahead reading m
 Also every active subscription could keep one paged file in memory. 
 So, if your system has too many queues it is recommended to minimize the 
page-size.
 
+## Page Limits and Page Full Policy
+
+Since version `2.28.0` is possible to configure limits on how much data is 
paged. This is to avoid a single destination using the entire disk in case 
their consumers are gone.
+
+You can configure either `page-limit-bytes` or `page-limit-messages`, along 
with `page-full-policy` on the address settings limiting how much data will be 
recorded in paging.
+
+If you configure `page-full-policy` as DROP, messages will be simplify dropped 
while the clients will not get any exceptions, while if you configured FAIL the 
producers will receive a JMS Exception for the error condition.
+
 ## Example
 
 See the [Paging Example](examples.md#paging) which shows how to use paging 
with 
diff --git 
a/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
 
b/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
index 165fae2739..ad97020b94 100644
--- 
a/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
+++ 
b/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java
@@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -32,92 +33,157 @@ import javax.naming.InitialContext;
 public class PagingExample {
 
    public static void main(final String[] args) throws Exception {
-      Connection connection = null;
+      // simple routing showing how paging should work
+      simplePaging();
+      // simple routing showing what happens when paging enters into page-full
+      pageFullLimit();
+   }
+
+
+   public static void pageFullLimit() throws Exception {
+      InitialContext initialContext = null;
+      try {
+         // Create an initial context to perform the JNDI lookup.
+         initialContext = new InitialContext();
+
+         // Perform a lookup on the Connection Factory
+         ConnectionFactory cf = (ConnectionFactory) 
initialContext.lookup("ConnectionFactory");
+
+         // Create a JMS Connection
+         try (Connection connection = cf.createConnection()) {
+
+            // Create a JMS Session
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+            // lookup the queue
+            Queue queue = session.createQueue("pagingQueueLimited");
+
+            // Create a JMS Message Producer for pageQueueAddress
+            MessageProducer pageMessageProducer = 
session.createProducer(queue);
+
+            // We don't need persistent messages in order to use paging. (This 
step is optional)
+            pageMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            // Create a Binary Bytes Message with 10K arbitrary bytes
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[10 * 1024]);
+
+            try {
+               // Send messages to the queue until the address is full
+               for (int i = 0; i < 2000; i++) {
+                  pageMessageProducer.send(message);
+                  if (i > 0 && i % 100 == 0) {
+                     // batch commit on the sends
+                     session.commit();
+                  }
+               }
+
+               throw new RuntimeException("Example was supposed to get a page 
full exception. Check your example configuration or report a bug");
+            } catch (JMSException e) {
+               System.out.println("The producer has thrown an expected 
exception " + e);
+            }
+            session.commit();
+         }
+      } finally {
+         // And finally, always remember to close your JMS connections after 
use, in a finally block. Closing a JMS
+         // connection will automatically close all of its sessions, 
consumers, producer and browser objects
+
+         if (initialContext != null) {
+            initialContext.close();
+         }
+
+      }
+   }
+
+
+   public static void simplePaging() throws Exception {
 
       InitialContext initialContext = null;
       try {
-         // Step 1. Create an initial context to perform the JNDI lookup.
+         // Create an initial context to perform the JNDI lookup.
          initialContext = new InitialContext();
 
-         // Step 2. Perform a lookup on the Connection Factory
+         // Perform a lookup on the Connection Factory
          ConnectionFactory cf = (ConnectionFactory) 
initialContext.lookup("ConnectionFactory");
 
-         // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is 
configured to hold a very limited number
+         // We look-up the JMS queue object from JNDI. pagingQueue is 
configured to hold a very limited number
          // of bytes in memory
          Queue pageQueue = (Queue) initialContext.lookup("queue/pagingQueue");
 
-         // Step 4. Lookup for a JMS Queue
+         // Lookup for a JMS Queue
          Queue queue = (Queue) initialContext.lookup("queue/exampleQueue");
 
-         // Step 5. Create a JMS Connection
-         connection = cf.createConnection();
+         // Create a JMS Connection
+         try (Connection connection = cf.createConnection()) {
 
-         // Step 6. Create a JMS Session
-         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            // Create a JMS Session
+            Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
 
-         // Step 7. Create a JMS Message Producer for pageQueueAddress
-         MessageProducer pageMessageProducer = 
session.createProducer(pageQueue);
+            // Create a JMS Message Producer for pageQueueAddress
+            MessageProducer pageMessageProducer = 
session.createProducer(pageQueue);
 
-         // Step 8. We don't need persistent messages in order to use paging. 
(This step is optional)
-         pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            // We don't need persistent messages in order to use paging. (This 
step is optional)
+            pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
-         // Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
-         BytesMessage message = session.createBytesMessage();
-         message.writeBytes(new byte[10 * 1024]);
+            // Create a Binary Bytes Message with 10K arbitrary bytes
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[10 * 1024]);
 
-         // Step 10. Send only 20 messages to the Queue. This will be already 
enough for pagingQueue. Look at
-         // ./paging/config/activemq-queues.xml for the config.
-         for (int i = 0; i < 20; i++) {
-            pageMessageProducer.send(message);
-         }
+            // Send only 20 messages to the Queue. This will be already enough 
for pagingQueue. Look at
+            // ./paging/config/activemq-queues.xml for the config.
+            for (int i = 0; i < 20; i++) {
+               pageMessageProducer.send(message);
+            }
 
-         // Step 11. Create a JMS Message Producer
-         MessageProducer messageProducer = session.createProducer(queue);
+            // Create a JMS Message Producer
+            MessageProducer messageProducer = session.createProducer(queue);
 
-         // Step 12. We don't need persistent messages in order to use paging. 
(This step is optional)
-         messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            // We don't need persistent messages in order to use paging. (This 
step is optional)
+            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
-         // Step 13. Send the message for about 1K, which should be over the 
memory limit imposed by the server
-         for (int i = 0; i < 1000; i++) {
-            messageProducer.send(message);
-         }
+            // Send the message for about 1K, which should be over the memory 
limit imposed by the server
+            for (int i = 0; i < 1000; i++) {
+               messageProducer.send(message);
+            }
 
-         // Step 14. if you pause this example here, you will see several 
files under ./build/data/paging
-         // Thread.sleep(30000); // if you want to just our of curiosity, you 
can sleep here and inspect the created
-         // files just for
+            // if you pause this example here, you will see several files 
under ./build/data/paging
+            // Thread.sleep(30000); // if you want to just our of curiosity, 
you can sleep here and inspect the created
+            // files just for
 
-         // Step 15. Create a JMS Message Consumer
-         MessageConsumer messageConsumer = session.createConsumer(queue);
+            // Create a JMS Message Consumer
+            MessageConsumer messageConsumer = session.createConsumer(queue);
 
-         // Step 16. Start the JMS Connection. This step will activate the 
subscribers to receive messages.
-         connection.start();
+            // Start the JMS Connection. This step will activate the 
subscribers to receive messages.
+            connection.start();
 
-         // Step 17. Receive the messages. It's important to ACK for messages 
as ActiveMQ Artemis will not read messages from
-         // paging
-         // until messages are ACKed
+            // Receive the messages. It's important to ACK for messages as 
ActiveMQ Artemis will not read messages from
+            // paging
+            // until messages are ACKed
 
-         for (int i = 0; i < 1000; i++) {
-            message = (BytesMessage) messageConsumer.receive(3000);
+            for (int i = 0; i < 1000; i++) {
+               message = (BytesMessage) messageConsumer.receive(3000);
 
-            if (i % 100 == 0) {
-               System.out.println("Received " + i + " messages");
-               message.acknowledge();
+               if (i % 100 == 0) {
+                  System.out.println("Received " + i + " messages");
+                  message.acknowledge();
+               }
             }
-         }
 
-         message.acknowledge();
+            message.acknowledge();
 
-         // Step 18. Receive the messages from the Queue names pageQueue. 
Create the proper consumer for that
-         messageConsumer.close();
-         messageConsumer = session.createConsumer(pageQueue);
+            // Receive the messages from the Queue names pageQueue. Create the 
proper consumer for that
+            messageConsumer.close();
+            messageConsumer = session.createConsumer(pageQueue);
 
-         for (int i = 0; i < 20; i++) {
-            message = (BytesMessage) messageConsumer.receive(1000);
+            for (int i = 0; i < 20; i++) {
+               message = (BytesMessage) messageConsumer.receive(1000);
 
-            System.out.println("Received message " + i + " from pageQueue");
+               System.out.println("Received message " + i + " from pageQueue");
 
-            message.acknowledge();
+               message.acknowledge();
+            }
          }
+
       } finally {
          // And finally, always remember to close your JMS connections after 
use, in a finally block. Closing a JMS
          // connection will automatically close all of its sessions, 
consumers, producer and browser objects
@@ -126,9 +192,6 @@ public class PagingExample {
             initialContext.close();
          }
 
-         if (connection != null) {
-            connection.close();
-         }
       }
    }
 }
diff --git 
a/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
 
b/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
index 9144c901f2..a4dfcd3d0e 100644
--- 
a/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
+++ 
b/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml
@@ -52,6 +52,16 @@ under the License.
             <permission roles="guest" type="send"/>
          </security-setting>
 
+         <!--security for example pagingQueueLimited-->
+         <security-setting match="pagingQueueLimited">
+            <permission roles="guest" type="createDurableQueue"/>
+            <permission roles="guest" type="deleteDurableQueue"/>
+            <permission roles="guest" type="createNonDurableQueue"/>
+            <permission roles="guest" type="deleteNonDurableQueue"/>
+            <permission roles="guest" type="consume"/>
+            <permission roles="guest" type="send"/>
+         </security-setting>
+
          <security-setting match="pagingQueue">
             <permission roles="guest" type="createDurableQueue"/>
             <permission roles="guest" type="deleteDurableQueue"/>
@@ -68,6 +78,17 @@ under the License.
             <page-size-bytes>20000</page-size-bytes>
          </address-setting>
 
+         <address-setting match="pagingQueueLimited">
+            <!-- what to do after max-size-messages is reached. We start 
paging at 100 messages -->
+            <address-full-policy>PAGE</address-full-policy>
+            <!-- how soon we should enter into paging -->
+            <max-size-messages>100</max-size-messages>
+            <!-- how many messages we should accept into paging before 
failing. We start failing after we recorded 1000 messages on paging. -->
+            <page-limit-messages>1000</page-limit-messages>
+            <!-- what to do after page-limit is used -->
+            <page-full-policy>FAIL</page-full-policy>
+         </address-setting>
+
          <address-setting match="exampleQueue">
             <max-size-bytes>10Mb</max-size-bytes>
             <page-size-bytes>1Mb</page-size-bytes>
@@ -84,6 +105,11 @@ under the License.
                <queue name="pagingQueue"/>
             </anycast>
          </address>
+         <address name="pagingQueueLimited">
+            <anycast>
+               <queue name="pagingQueueLimited"/>
+            </anycast>
+         </address>
          <address name="exampleQueue">
             <anycast>
                <queue name="exampleQueue"/>
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java
new file mode 100644
index 0000000000..a3e1bf04d1
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PagingLimitTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   @Test
+   public void testPageLimitMessageCoreFail() throws Exception {
+      testPageLimitMessage("CORE", false);
+   }
+
+   @Test
+   public void testPageLimitAMQPFail() throws Exception {
+      testPageLimitMessage("AMQP", false);
+   }
+
+   @Test
+   public void testPageLimitMessagesOpenWireFail() throws Exception {
+      testPageLimitMessage("OPENWIRE", false);
+   }
+
+   @Test
+   public void testPageLimitMessageCoreDrop() throws Exception {
+      testPageLimitMessage("CORE", false);
+   }
+
+   @Test
+   public void testPageLimitAMQPDrop() throws Exception {
+      testPageLimitMessage("AMQP", false);
+   }
+
+   @Test
+   public void testPageLimitMessagesOpenWireDrop() throws Exception {
+      testPageLimitMessage("OPENWIRE", false);
+   }
+
+   public void testPageLimitMessage(String protocol, boolean drop) throws 
Exception {
+
+      String queueNameTX = getName() + "_TX";
+      String queueNameNonTX = getName() + "_NONTX";
+
+      Configuration config = createDefaultConfig(true);
+      
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, null, 
300L, drop ? "DROP" : "FAIL", null);
+      server.start();
+
+      server.addAddressInfo(new 
AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST));
+      server.addAddressInfo(new 
AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST));
+
+      Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null);
+      Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null);
+
+      testPageLimitMessageFailInternal(queueNameTX, protocol, true, drop);
+      testPageLimitMessageFailInternal(queueNameNonTX, protocol, false, drop);
+
+   }
+
+   private void testPageLimitMessageFailInternal(String queueName,
+                                                 String protocol,
+                                                 boolean transacted,
+                                                 boolean drop) throws 
Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+      org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(queueName);
+      Assert.assertNotNull(serverQueue);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(transacted, transacted ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         connection.start();
+
+         for (int i = 0; i < 100; i++) {
+            TextMessage message = session.createTextMessage("initial " + i);
+            message.setIntProperty("i", i);
+            producer.send(message);
+         }
+         if (transacted) {
+            session.commit();
+            Assert.assertTrue(serverQueue.getPagingStore().isPaging());
+         }
+
+         for (int i = 0; i < 300; i++) {
+            if (i == 200) {
+               // the initial sent has to be consumed on transaction as we 
need a sync on the consumer for AMQP
+               try (MessageConsumer consumer = session.createConsumer(queue)) {
+                  for (int initI = 0; initI < 100; initI++) {
+                     TextMessage recMessage = (TextMessage) 
consumer.receive(1000);
+                     Assert.assertEquals("initial " + initI, 
recMessage.getText());
+                  }
+               }
+               if (transacted) {
+                  session.commit();
+               }
+               Wait.assertEquals(200L, serverQueue::getMessageCount);
+            }
+
+            try {
+               TextMessage message = session.createTextMessage("hello world " 
+ i);
+               message.setIntProperty("i", i);
+               producer.send(message);
+               if (i % 100 == 0) {
+                  logger.info("sent " + i);
+               }
+               if (transacted) {
+                  if (i % 100 == 0 && i > 0) {
+                     session.commit();
+                  }
+               }
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+               Assert.fail("Exception happened at " + i);
+            }
+         }
+         if (transacted) {
+            session.commit();
+         }
+
+         try {
+            producer.send(session.createTextMessage("should not complete"));
+            if (transacted) {
+               session.commit();
+            }
+            if (!drop) {
+               Assert.fail("an Exception was expected");
+            }
+            Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224120"));
+         } catch (JMSException e) {
+            logger.debug("Expected exception, ok!", e);
+         }
+
+
+         Assert.assertTrue(serverQueue.getPagingStore().isPaging());
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         for (int i = 0; i < 150; i++) { // we will consume half of the 
messages
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("hello world " + i, message.getText());
+            Assert.assertEquals(i, message.getIntProperty("i"));
+            if (transacted) {
+               if (i % 100 == 0 && i > 0) {
+                  session.commit();
+               }
+            }
+         }
+         if (transacted) {
+            session.commit();
+         }
+         Future<Boolean> cleanupDone = 
serverQueue.getPagingStore().getCursorProvider().scheduleCleanup();
+
+         Assert.assertTrue(cleanupDone.get(30, TimeUnit.SECONDS));
+
+
+
+         for (int i = 300; i < 450; i++) {
+            try {
+               TextMessage message = session.createTextMessage("hello world " 
+ i);
+               message.setIntProperty("i", i);
+               producer.send(message);
+               if (i % 100 == 0) {
+                  logger.info("sent " + i);
+               }
+               if (transacted) {
+                  if (i % 10 == 0 && i > 0) {
+                     session.commit();
+                  }
+               }
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+               Assert.fail("Exception happened at " + i);
+            }
+         }
+         if (transacted) {
+            session.commit();
+         }
+         AssertionLoggerHandler.clear();
+         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
+
+
+         try {
+            producer.send(session.createTextMessage("should not complete"));
+            if (transacted) {
+               session.commit();
+            }
+            if (!drop) {
+               Assert.fail("an Exception was expected");
+            } else {
+               
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
+            }
+         } catch (JMSException e) {
+            logger.debug("Expected exception, ok!", e);
+         }
+
+         for (int i = 150; i < 450; i++) { // we will consume half of the 
messages
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("hello world " + i, message.getText());
+            Assert.assertEquals(i, message.getIntProperty("i"));
+            if (transacted) {
+               if (i % 100 == 0 && i > 0) {
+                  session.commit();
+               }
+            }
+         }
+
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+
+   }
+
+
+   @Test
+   public void testPageLimitBytesAMQP() throws Exception {
+      testPageLimitBytes("AMQP");
+   }
+
+   @Test
+   public void testPageLimitBytesCore() throws Exception {
+      testPageLimitBytes("CORE");
+   }
+
+   @Test
+   public void testPageLimitBytesOpenWire() throws Exception {
+      testPageLimitBytes("OPENWIRE");
+   }
+
+   public void testPageLimitBytes(String protocol) throws Exception {
+
+      String queueNameTX = getName() + "_TX";
+      String queueNameNonTX = getName() + "_NONTX";
+
+      Configuration config = createDefaultConfig(true);
+      
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, 
(long)(PAGE_MAX * 10), null, "FAIL", null);
+      server.start();
+
+      server.addAddressInfo(new 
AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST));
+      server.addAddressInfo(new 
AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST));
+
+      Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null);
+      Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null);
+
+      testPageLimitBytesFailInternal(queueNameTX, protocol, true);
+      testPageLimitBytesFailInternal(queueNameNonTX, protocol, false);
+
+   }
+
+
+
+   private void testPageLimitBytesFailInternal(String queueName,
+                                                 String protocol,
+                                                 boolean transacted) throws 
Exception {
+      org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(queueName);
+      Assert.assertNotNull(serverQueue);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(transacted, transacted ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         connection.start();
+
+         int successfullSends = 0;
+         boolean failed = false;
+
+         for (int i = 0; i < 1000; i++) {
+            try {
+               TextMessage message = session.createTextMessage("hello world " 
+ i);
+               message.setIntProperty("i", i);
+               producer.send(message);
+               if (transacted) {
+                  session.commit();
+               }
+            } catch (Exception e) {
+               logger.debug(e.getMessage(), e);
+               failed = true;
+               break;
+            }
+            successfullSends++;
+         }
+
+         Wait.assertEquals(successfullSends, serverQueue::getMessageCount);
+         Assert.assertTrue(failed);
+
+         int reads = successfullSends / 2;
+
+         connection.start();
+         try (MessageConsumer consumer = session.createConsumer(queue)) {
+            for (int i = 0; i < reads; i++) { // we will consume half of the 
messages
+               TextMessage message = (TextMessage) consumer.receive(5000);
+               Assert.assertNotNull(message);
+               Assert.assertEquals("hello world " + i, message.getText());
+               Assert.assertEquals(i, message.getIntProperty("i"));
+               if (transacted) {
+                  if (i % 100 == 0 && i > 0) {
+                     session.commit();
+                  }
+               }
+            }
+            if (transacted) {
+               session.commit();
+            }
+         }
+
+         failed = false;
+
+         int originalSuccess = successfullSends;
+
+         Future<Boolean> result = 
serverQueue.getPagingStore().getCursorProvider().scheduleCleanup();
+         Assert.assertTrue(result.get(10, TimeUnit.SECONDS));
+
+         for (int i = successfullSends; i < 1000; i++) {
+            try {
+               TextMessage message = session.createTextMessage("hello world " 
+ i);
+               message.setIntProperty("i", i);
+               producer.send(message);
+               if (transacted) {
+                  session.commit();
+               }
+            } catch (Exception e) {
+               logger.debug(e.getMessage(), e);
+               failed = true;
+               break;
+            }
+            successfullSends++;
+         }
+
+         Assert.assertTrue(failed);
+         Assert.assertTrue(successfullSends > originalSuccess);
+
+         try (MessageConsumer consumer = session.createConsumer(queue)) {
+            for (int i = reads; i < successfullSends; i++) {
+               TextMessage message = (TextMessage) consumer.receive(5000);
+               Assert.assertNotNull(message);
+               Assert.assertEquals("hello world " + i, message.getText());
+               Assert.assertEquals(i, message.getIntProperty("i"));
+               if (transacted) {
+                  if (i % 100 == 0 && i > 0) {
+                     session.commit();
+                  }
+               }
+            }
+            if (transacted) {
+               session.commit();
+            }
+            Assert.assertNull(consumer.receiveNoWait());
+         }
+
+
+      }
+
+   }
+
+
+
+}
\ No newline at end of file
diff --git 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 5297e6dcf0..4317ceaea1 100644
--- 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -30,6 +30,7 @@ import 
org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@@ -245,6 +247,36 @@ public class PersistMultiThreadTest extends 
ActiveMQTestBase {
 
    class FakePagingStore implements PagingStore {
 
+      @Override
+      public PageFullMessagePolicy getPageFullMessagePolicy() {
+         return null;
+      }
+
+      @Override
+      public Long getPageLimitMessages() {
+         return null;
+      }
+
+      @Override
+      public Long getPageLimitBytes() {
+         return null;
+      }
+
+      @Override
+      public void pageFull(PageSubscription subscription) {
+
+      }
+
+      @Override
+      public boolean isPageFull() {
+         return false;
+      }
+
+      @Override
+      public void checkPageLimit(long numberOfMessages) {
+
+      }
+
       @Override
       public void counterSnapshot() {
       }


Reply via email to