gemmellr commented on code in PR #5128:
URL: https://github.com/apache/activemq-artemis/pull/5128#discussion_r1715547459


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -268,7 +269,12 @@ public void applySetting(final AddressSettings 
addressSettings) {
          pageLimitBytes = null;
       }
 
-      pageLimitMessages = addressSettings.getPageLimitMessages();
+      if (!Objects.equals(this.pageLimitMessages, 
addressSettings.getPageLimitMessages())) {
+         pageLimitMessages = addressSettings.getPageLimitMessages();
+         if (this.cursorProvider != null) {
+            this.cursorProvider.checkClearPageLimit();
+         }
+      }

Review Comment:
   Feels like this should only be actioned after the null-replacement being 
done immediately below here. I'd take a variable copy of the existing value 
here, and then move this overall 'do if changed' block below the null 
replacement below...much as you did for the later bits.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +295,290 @@ public void 
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
       }
    }
 
+   @Test
+   public void testPageLimitBytesValidation() throws Exception {
+      final String addressName = getTestMethodName();
+
+      try (ClientSessionFactory sf = createSessionFactory(locator)) {
+         ClientSession session = sf.createSession(false, false);
+
+         SimpleString queueAddr = SimpleString.of(addressName);
+         session.createQueue(QueueConfiguration.of(queueAddr));
+
+         int size = 1048576;
+         AddressSettings addressSettings = new AddressSettings();
+         
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+         addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+         addressSettings.setPageSizeBytes(size);
+         addressSettings.setPageLimitBytes(Long.valueOf(size));
+         addressSettings.setMaxSizeBytes(size);
+
+         server.getAddressSettingsRepository().addMatch(addressName, 
addressSettings);
+
+         int totalMessages = 15;
+         int messageSize = 90000;
+         sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+         Queue queue = server.locateQueue(queueAddr);
+
+         // Give time Queue.deliverAsync to deliver messages
+         assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+         PagingStore queuePagingStore = queue.getPagingStore();
+         assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+         assertFalse(queuePagingStore.isPageFull());
+
+         // set pageLimitBytes to be smaller than pageSizeBytes
+         addressSettings.setPageLimitBytes(Long.valueOf(size - 1));
+         server.getAddressSettingsRepository().addMatch(addressName, 
addressSettings);
+
+         // check the settings applied
+         assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+         // check pageFull is true
+         assertTrue(queuePagingStore.isPageFull());
+
+         // send a messages should be immediately blocked (in our case FAIL)
+         try {
+            sendMessageBatch(1, messageSize, session, queueAddr);
+            fail("should be immediate blocked on paging");
+         } catch (ActiveMQAddressFullException ex) {
+            //ok
+         }
+
+         assertTrue(queuePagingStore.isPageFull());
+
+         // set pageLimitBytes to bigger value to unblock paging again
+         addressSettings.setPageLimitBytes(Long.valueOf(size * 2));
+         server.getAddressSettingsRepository().addMatch(addressName, 
addressSettings);
+
+         // now page is enabled again
+         assertFalse(queuePagingStore.isPageFull());
+
+         sendMessageBatch(1, messageSize, session, queueAddr);
+         assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+      }
+   }
+
+   @Test
+   public void testPageLimitBytesAndPageLimitMessagesValidation() throws 
Exception {
+
+      final String queueName = getTestMethodName();
+
+      try (ClientSessionFactory sf = createSessionFactory(locator)) {
+         ClientSession session = sf.createSession(true, true);
+
+         SimpleString queueAddr = SimpleString.of(queueName);
+         session.createQueue(QueueConfiguration.of(queueAddr));
+
+         final int size = 1024 * 50;
+         final Long maxMessages = 10L;
+         AddressSettings addressSettings = new AddressSettings();
+         
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+         addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+         addressSettings.setPageSizeBytes(size);
+         addressSettings.setPageLimitBytes(Long.valueOf(size * 10));
+         addressSettings.setMaxSizeBytes(size);
+         addressSettings.setPageLimitMessages(maxMessages);
+
+         server.getAddressSettingsRepository().addMatch(queueName, 
addressSettings);
+
+         int totalMessages = 0;
+         int messageSize = 1024;
+         ClientProducer producer = session.createProducer(queueAddr);
+         boolean stop = false;
+         while (!stop) {
+            try {
+               ClientMessage message = createMessage(session, messageSize, 
totalMessages, null);
+               producer.send(message);
+               totalMessages++;
+            } catch (ActiveMQAddressFullException ex) {
+               stop = true;
+            }
+         }
+
+         Queue queue = server.locateQueue(queueAddr);
+         assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+         PagingStore queuePagingStore = queue.getPagingStore();
+         assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+         // now pages still under limit
+         long existingPages = queuePagingStore.getNumberOfPages();
+         assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() / 
queuePagingStore.getPageSizeBytes());
+
+         // but the messages reach the limit
+         PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl) 
queuePagingStore.getCursorProvider();
+         assertTrue(cursorProvider.getNumberOfMessagesOnSubscriptions() == 
maxMessages);

Review Comment:
   Expand the use of PageCursorProviderTestAccessor rather than making the 
method public so you can do this



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -343,6 +353,10 @@ private void checkNumberOfPages() {
       if (!isBelowPageLimitBytes()) {
          this.pageFull = true;
          ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, 
numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      } else {
+         if (cursorProvider != null) {
+            cursorProvider.checkClearPageLimit();
+         }

Review Comment:
   It feels wrong for the wider method called checkNumberOfPages() to also be 
doing this, when its more aimed at the message count stuff. Especially as you 
already call this directly if the message limit changes.
   
   EDIT: see suggestion from earlier comment.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -290,7 +296,11 @@ public void applySetting(final AddressSettings 
addressSettings) {
       }
 
       if (pageLimitBytes != null && pageSize > 0) {
+         Long originalEstimatedMaxPages = this.estimatedMaxPages;
          estimatedMaxPages = pageLimitBytes / pageSize;
+         if (!Objects.equals(estimatedMaxPages, originalEstimatedMaxPages)) {
+            checkNumberOfPages();
+         }

Review Comment:
   I would move this below the log statement, otherwise it has possibly gone 
away and done stuff based on the change, before logging what it was.
   
   I also dont think it should ever be doing this for the constructor case, as 
it hasnt finished creating the object yet and is pre-empting start() doing this 
later. The earlier change for message count kind of gets away with it due to 
the null check on cursorProvider (which is actually final and so only could be 
null during the constructor), but its not clear this one does. It might be time 
to pass a boolean to indicate whether this shared method is being called from 
the constructor or later, so it simply cant do these things at construction 
(allowing removal of the null check)
   
   I think this bit, and the pageLimitMessages related check earlier, should 
perhaps just set booleans to indicate if they changed, which you then action at 
the end after both have been updated, by calling checkNumberOfPages() and 
cursorProvider.checkClearPageLimit() respectively (again, if not acting for the 
constructor)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to