[ 
https://issues.apache.org/jira/browse/ARTEMIS-3178?focusedWorklogId=842351&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-842351
 ]

ASF GitHub Bot logged work on ARTEMIS-3178:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Jan/23 17:50
            Start Date: 30/Jan/23 17:50
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4349:
URL: https://github.com/apache/activemq-artemis/pull/4349#discussion_r1090865601


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
 
    AddressFullMessagePolicy getAddressFullMessagePolicy();
 
+   PageFullMessagePolicy getPageFullMessagePolicy();
+
+   Long getPageLimitMessages();

Review Comment:
   Presumably this is being null checked somewhere given the Long usage. Since 
the config setting is defined as having default -1, might it be nicer to just 
use that for the checks? Since a non-null value would also have to be checked 
for being >0 anyway, in case someone excplicitly sets it to the -1 default?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -241,6 +249,23 @@ public void resumeCleanup() {
       scheduleCleanup();
    }
 
+   private long getNumberOfMessagesOnSubscriptions() {
+      AtomicLong largerCounter = new AtomicLong();
+      activeCursors.forEach((id, sub) -> {
+         long value = sub.getCounter().getValue();
+         if (value > largerCounter.get()) {
+            largerCounter.set(value);
+         }
+      });
+
+      return largerCounter.get();
+   }
+
+   void checkClearPageLimit() {
+

Review Comment:
   Superfluous newline



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings 
addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", 
storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), 
subscription.getQueue().getAddress(), pageLimitMessages, 
subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being 
cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {
+      if (estimatedMaxPages != null) {
+         return (numberOfPages <= estimatedMaxPages.longValue());
+      }  else {
+         return true;
+      }
+   }
+
+   private void checkNumberOfPages() {
+      if (!isBellowPageLimitBytes()) {
+         this.pageFull = true;
+         ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, 
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      }
+   }
+
+   @Override
+   public void checkPageLimit(long numberOfMessages) {
+
+

Review Comment:
   Superfluous newlines



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings 
addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", 
storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), 
subscription.getQueue().getAddress(), pageLimitMessages, 
subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being 
cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {

Review Comment:
   typo, Below



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -1029,6 +1131,25 @@ public boolean page(Message message,
          lock.readLock().unlock();
       }
 
+      if (pageFull && pageFullMessagePolicy != null) {

Review Comment:
   Related to earlier comment, if it validates you have a policy, can this then 
just simply check "if (pageFull)" ?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings 
addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();

Review Comment:
   Should it validate you have actually set a policy if setting one or both of 
the others to enforcing values? That way you cant accidentally think you have a 
limit when you dont.



##########
artemis-server/src/main/resources/schema/artemis-configuration.xsd:
##########
@@ -4062,6 +4079,20 @@
                </xsd:simpleType>
             </xsd:element>
 
+            <xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     After entering page mode, a second limit will be set by 
page-limit-bytes, page-limit-messages. page-full-policy will configure what to 
do when that limit is reach.

Review Comment:
   page-limit-bytes and/or page-limit-messages. The page-full-policy...
   
   ...when that limit is reach**ed.**



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
 
    AddressFullMessagePolicy getAddressFullMessagePolicy();
 
+   PageFullMessagePolicy getPageFullMessagePolicy();
+
+   Long getPageLimitMessages();
+
+   Long getPageLimitBytes();

Review Comment:
   ditto



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings 
addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", 
storeName, estimatedMaxPages);
+      }

Review Comment:
   What if someone explicitly set pageLimitBytes to -1, which is noted as the 
default and allowed by the validators. This would still look to calculate a 
[incorrect] value for the max?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings 
addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", 
storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), 
subscription.getQueue().getAddress(), pageLimitMessages, 
subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being 
cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {
+      if (estimatedMaxPages != null) {
+         return (numberOfPages <= estimatedMaxPages.longValue());
+      }  else {
+         return true;
+      }
+   }
+
+   private void checkNumberOfPages() {
+      if (!isBellowPageLimitBytes()) {
+         this.pageFull = true;
+         ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, 
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      }
+   }
+
+   @Override
+   public void checkPageLimit(long numberOfMessages) {
+
+
+      boolean pageMessageMessagesClear = true;
+      Long pageLimitMessages = getPageLimitMessages();
+
+      if (pageLimitMessages != null) {
+         if (logger.isDebugEnabled()) { // gate to avoid boxing of msgCount
+            logger.debug("Address {} has {} messages on the larger queue", 
storeName, numberOfMessages);
+         }
+
+         pageMessageMessagesClear = (numberOfMessages < 
pageLimitMessages.longValue());
+      }
+
+      boolean pageMessageBytesClear = isBellowPageLimitBytes();
+
+

Review Comment:
   Superfluous newline



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings 
addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", 
storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), 
subscription.getQueue().getAddress(), pageLimitMessages, 
subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being 
cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {
+      if (estimatedMaxPages != null) {
+         return (numberOfPages <= estimatedMaxPages.longValue());
+      }  else {
+         return true;
+      }
+   }
+
+   private void checkNumberOfPages() {
+      if (!isBellowPageLimitBytes()) {
+         this.pageFull = true;
+         ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, 
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      }
+   }
+
+   @Override
+   public void checkPageLimit(long numberOfMessages) {
+
+
+      boolean pageMessageMessagesClear = true;
+      Long pageLimitMessages = getPageLimitMessages();
+
+      if (pageLimitMessages != null) {
+         if (logger.isDebugEnabled()) { // gate to avoid boxing of msgCount

Review Comment:
   msgCount should be numberOfMessages.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 842351)
    Time Spent: 1h 10m  (was: 1h)

> Provide a way to limit the size of an address after paged
> ---------------------------------------------------------
>
>                 Key: ARTEMIS-3178
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3178
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>          Components: Broker, Configuration
>    Affects Versions: 2.17.0
>            Reporter: Gary Tully
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.28.0
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> I am adding three attributes to Address-settings:
>  * page-limit-bytes: Number of bytes. We will convert this metric into max 
> number of pages internally by dividing max-bytes / page-size. It will allow a 
> max based on an estimate.
>  * page-limit-messages: Number of messages
>  * page-full-message-policy: fail : drop
> We will now allow paging, until these max values and then fail or drop 
> messages.
> Once these values are retracted, the address will remain full until a period 
> where cleanup is kicked in by paging. So these values may have a certain 
> delay on being applied, but they should always be cleared once cleanup 
> happened.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to