[
https://issues.apache.org/jira/browse/ARTEMIS-3178?focusedWorklogId=842351&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-842351
]
ASF GitHub Bot logged work on ARTEMIS-3178:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jan/23 17:50
Start Date: 30/Jan/23 17:50
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4349:
URL: https://github.com/apache/activemq-artemis/pull/4349#discussion_r1090865601
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
AddressFullMessagePolicy getAddressFullMessagePolicy();
+ PageFullMessagePolicy getPageFullMessagePolicy();
+
+ Long getPageLimitMessages();
Review Comment:
Presumably this is being null checked somewhere given the Long usage. Since
the config setting is defined as having default -1, might it be nicer to just
use that for the checks? Since a non-null value would also have to be checked
for being >0 anyway, in case someone excplicitly sets it to the -1 default?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -241,6 +249,23 @@ public void resumeCleanup() {
scheduleCleanup();
}
+ private long getNumberOfMessagesOnSubscriptions() {
+ AtomicLong largerCounter = new AtomicLong();
+ activeCursors.forEach((id, sub) -> {
+ long value = sub.getCounter().getValue();
+ if (value > largerCounter.get()) {
+ largerCounter.set(value);
+ }
+ });
+
+ return largerCounter.get();
+ }
+
+ void checkClearPageLimit() {
+
Review Comment:
Superfluous newline
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings
addressSettings) {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+ pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+ pageLimitBytes = addressSettings.getPageLimitBytes();
+
+ pageLimitMessages = addressSettings.getPageLimitMessages();
+
+ if (pageLimitBytes != null && pageSize > 0) {
+ estimatedMaxPages = pageLimitBytes / pageSize;
+ logger.debug("Address {} should not allow more than {} pages",
storeName, estimatedMaxPages);
+ }
}
@Override
public String toString() {
return "PagingStoreImpl(" + this.address + ")";
}
+ @Override
+ public PageFullMessagePolicy getPageFullMessagePolicy() {
+ return pageFullMessagePolicy;
+ }
+
+ @Override
+ public Long getPageLimitMessages() {
+ return pageLimitMessages;
+ }
+
+ @Override
+ public Long getPageLimitBytes() {
+ return pageLimitBytes;
+ }
+
+ @Override
+ public void pageFull(PageSubscription subscription) {
+ this.pageFull = true;
+ try {
+
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(),
subscription.getQueue().getAddress(), pageLimitMessages,
subscription.getCounter().getValue());
+ } catch (Throwable e) {
+ // I don't think subscription would ever have a null queue. I'm being
cautious here for tests
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isPageFull() {
+ return pageFull;
+ }
+
+ private boolean isBellowPageLimitBytes() {
+ if (estimatedMaxPages != null) {
+ return (numberOfPages <= estimatedMaxPages.longValue());
+ } else {
+ return true;
+ }
+ }
+
+ private void checkNumberOfPages() {
+ if (!isBellowPageLimitBytes()) {
+ this.pageFull = true;
+ ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName,
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+ }
+ }
+
+ @Override
+ public void checkPageLimit(long numberOfMessages) {
+
+
Review Comment:
Superfluous newlines
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings
addressSettings) {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+ pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+ pageLimitBytes = addressSettings.getPageLimitBytes();
+
+ pageLimitMessages = addressSettings.getPageLimitMessages();
+
+ if (pageLimitBytes != null && pageSize > 0) {
+ estimatedMaxPages = pageLimitBytes / pageSize;
+ logger.debug("Address {} should not allow more than {} pages",
storeName, estimatedMaxPages);
+ }
}
@Override
public String toString() {
return "PagingStoreImpl(" + this.address + ")";
}
+ @Override
+ public PageFullMessagePolicy getPageFullMessagePolicy() {
+ return pageFullMessagePolicy;
+ }
+
+ @Override
+ public Long getPageLimitMessages() {
+ return pageLimitMessages;
+ }
+
+ @Override
+ public Long getPageLimitBytes() {
+ return pageLimitBytes;
+ }
+
+ @Override
+ public void pageFull(PageSubscription subscription) {
+ this.pageFull = true;
+ try {
+
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(),
subscription.getQueue().getAddress(), pageLimitMessages,
subscription.getCounter().getValue());
+ } catch (Throwable e) {
+ // I don't think subscription would ever have a null queue. I'm being
cautious here for tests
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isPageFull() {
+ return pageFull;
+ }
+
+ private boolean isBellowPageLimitBytes() {
Review Comment:
typo, Below
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -1029,6 +1131,25 @@ public boolean page(Message message,
lock.readLock().unlock();
}
+ if (pageFull && pageFullMessagePolicy != null) {
Review Comment:
Related to earlier comment, if it validates you have a policy, can this then
just simply check "if (pageFull)" ?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings
addressSettings) {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+ pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+ pageLimitBytes = addressSettings.getPageLimitBytes();
+
+ pageLimitMessages = addressSettings.getPageLimitMessages();
Review Comment:
Should it validate you have actually set a policy if setting one or both of
the others to enforcing values? That way you cant accidentally think you have a
limit when you dont.
##########
artemis-server/src/main/resources/schema/artemis-configuration.xsd:
##########
@@ -4062,6 +4079,20 @@
</xsd:simpleType>
</xsd:element>
+ <xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ After entering page mode, a second limit will be set by
page-limit-bytes, page-limit-messages. page-full-policy will configure what to
do when that limit is reach.
Review Comment:
page-limit-bytes and/or page-limit-messages. The page-full-policy...
...when that limit is reach**ed.**
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
AddressFullMessagePolicy getAddressFullMessagePolicy();
+ PageFullMessagePolicy getPageFullMessagePolicy();
+
+ Long getPageLimitMessages();
+
+ Long getPageLimitBytes();
Review Comment:
ditto
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings
addressSettings) {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+ pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+ pageLimitBytes = addressSettings.getPageLimitBytes();
+
+ pageLimitMessages = addressSettings.getPageLimitMessages();
+
+ if (pageLimitBytes != null && pageSize > 0) {
+ estimatedMaxPages = pageLimitBytes / pageSize;
+ logger.debug("Address {} should not allow more than {} pages",
storeName, estimatedMaxPages);
+ }
Review Comment:
What if someone explicitly set pageLimitBytes to -1, which is noted as the
default and allowed by the validators. This would still look to calculate a
[incorrect] value for the max?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings
addressSettings) {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+ pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+ pageLimitBytes = addressSettings.getPageLimitBytes();
+
+ pageLimitMessages = addressSettings.getPageLimitMessages();
+
+ if (pageLimitBytes != null && pageSize > 0) {
+ estimatedMaxPages = pageLimitBytes / pageSize;
+ logger.debug("Address {} should not allow more than {} pages",
storeName, estimatedMaxPages);
+ }
}
@Override
public String toString() {
return "PagingStoreImpl(" + this.address + ")";
}
+ @Override
+ public PageFullMessagePolicy getPageFullMessagePolicy() {
+ return pageFullMessagePolicy;
+ }
+
+ @Override
+ public Long getPageLimitMessages() {
+ return pageLimitMessages;
+ }
+
+ @Override
+ public Long getPageLimitBytes() {
+ return pageLimitBytes;
+ }
+
+ @Override
+ public void pageFull(PageSubscription subscription) {
+ this.pageFull = true;
+ try {
+
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(),
subscription.getQueue().getAddress(), pageLimitMessages,
subscription.getCounter().getValue());
+ } catch (Throwable e) {
+ // I don't think subscription would ever have a null queue. I'm being
cautious here for tests
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isPageFull() {
+ return pageFull;
+ }
+
+ private boolean isBellowPageLimitBytes() {
+ if (estimatedMaxPages != null) {
+ return (numberOfPages <= estimatedMaxPages.longValue());
+ } else {
+ return true;
+ }
+ }
+
+ private void checkNumberOfPages() {
+ if (!isBellowPageLimitBytes()) {
+ this.pageFull = true;
+ ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName,
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+ }
+ }
+
+ @Override
+ public void checkPageLimit(long numberOfMessages) {
+
+
+ boolean pageMessageMessagesClear = true;
+ Long pageLimitMessages = getPageLimitMessages();
+
+ if (pageLimitMessages != null) {
+ if (logger.isDebugEnabled()) { // gate to avoid boxing of msgCount
+ logger.debug("Address {} has {} messages on the larger queue",
storeName, numberOfMessages);
+ }
+
+ pageMessageMessagesClear = (numberOfMessages <
pageLimitMessages.longValue());
+ }
+
+ boolean pageMessageBytesClear = isBellowPageLimitBytes();
+
+
Review Comment:
Superfluous newline
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings
addressSettings) {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+ pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+ pageLimitBytes = addressSettings.getPageLimitBytes();
+
+ pageLimitMessages = addressSettings.getPageLimitMessages();
+
+ if (pageLimitBytes != null && pageSize > 0) {
+ estimatedMaxPages = pageLimitBytes / pageSize;
+ logger.debug("Address {} should not allow more than {} pages",
storeName, estimatedMaxPages);
+ }
}
@Override
public String toString() {
return "PagingStoreImpl(" + this.address + ")";
}
+ @Override
+ public PageFullMessagePolicy getPageFullMessagePolicy() {
+ return pageFullMessagePolicy;
+ }
+
+ @Override
+ public Long getPageLimitMessages() {
+ return pageLimitMessages;
+ }
+
+ @Override
+ public Long getPageLimitBytes() {
+ return pageLimitBytes;
+ }
+
+ @Override
+ public void pageFull(PageSubscription subscription) {
+ this.pageFull = true;
+ try {
+
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(),
subscription.getQueue().getAddress(), pageLimitMessages,
subscription.getCounter().getValue());
+ } catch (Throwable e) {
+ // I don't think subscription would ever have a null queue. I'm being
cautious here for tests
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isPageFull() {
+ return pageFull;
+ }
+
+ private boolean isBellowPageLimitBytes() {
+ if (estimatedMaxPages != null) {
+ return (numberOfPages <= estimatedMaxPages.longValue());
+ } else {
+ return true;
+ }
+ }
+
+ private void checkNumberOfPages() {
+ if (!isBellowPageLimitBytes()) {
+ this.pageFull = true;
+ ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName,
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+ }
+ }
+
+ @Override
+ public void checkPageLimit(long numberOfMessages) {
+
+
+ boolean pageMessageMessagesClear = true;
+ Long pageLimitMessages = getPageLimitMessages();
+
+ if (pageLimitMessages != null) {
+ if (logger.isDebugEnabled()) { // gate to avoid boxing of msgCount
Review Comment:
msgCount should be numberOfMessages.
Issue Time Tracking
-------------------
Worklog Id: (was: 842351)
Time Spent: 1h 10m (was: 1h)
> Provide a way to limit the size of an address after paged
> ---------------------------------------------------------
>
> Key: ARTEMIS-3178
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3178
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: Broker, Configuration
> Affects Versions: 2.17.0
> Reporter: Gary Tully
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.28.0
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> I am adding three attributes to Address-settings:
> * page-limit-bytes: Number of bytes. We will convert this metric into max
> number of pages internally by dividing max-bytes / page-size. It will allow a
> max based on an estimate.
> * page-limit-messages: Number of messages
> * page-full-message-policy: fail : drop
> We will now allow paging, until these max values and then fail or drop
> messages.
> Once these values are retracted, the address will remain full until a period
> where cleanup is kicked in by paging. So these values may have a certain
> delay on being applied, but they should always be cleared once cleanup
> happened.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)