gemmellr commented on code in PR #5128:
URL: https://github.com/apache/activemq-artemis/pull/5128#discussion_r1713944655
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +292,149 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of("FOO");
+ session.createQueue(QueueConfiguration.of(queueAddr));
Review Comment:
Rather than using "FOO" literals here and various points below, assigning a
variable would allow following where all the name is used. Picking a more
specific name could also assist with following things later (e.g debugging logs
etc).
For example, perhaps:
final String addressName = getTestMethodName();
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java:
##########
@@ -196,7 +197,11 @@ public void onChange() {
private void reapplySettings() {
for (PagingStore store : stores.values()) {
AddressSettings settings =
this.addressSettingsRepository.getMatch(store.getAddress().toString());
- store.applySetting(settings);
+ try {
+ store.applySetting(settings);
+ } catch (ActiveMQException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
One aspect of this change is that if it fails the validation and throws
applying for one address, this bit here looping over all the addresses means
that it wont even try for the addresses following one that fails. The feels
potentially problematic as it could leave them with unexpected settings, and
yet its entirely possible _their_ settings would have applied ok (i.e theirs
may have changed but yet not be invalid).
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +292,149 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of("FOO");
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ // set page size bytes to be larger than pageLimitBytes
+ addressSettings.setPageSizeBytes(size * 2);
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ // check the original pageSizeBytes is not changed
+ assertEquals(size, queuePagingStore.getPageSizeBytes());
+
+ // send a messages should be allowed because the page file still have
space
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ final String queueName = "FOO";
Review Comment:
Similarly here
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +292,149 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of("FOO");
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ // set page size bytes to be larger than pageLimitBytes
+ addressSettings.setPageSizeBytes(size * 2);
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ // check the original pageSizeBytes is not changed
+ assertEquals(size, queuePagingStore.getPageSizeBytes());
+
+ // send a messages should be allowed because the page file still have
space
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ final String queueName = "FOO";
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1024 * 50;
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 10));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 30;
+ int messageSize = 1024 * 10;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages > 4);
+
+ // restart the server with a smaller pageLimitSize < existing pages.
+ server.stop(true);
+ waitForServerToStop(server);
+
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 4));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // server will start regardless current pages > pageLimitBytes
Review Comment:
```suggestion
// server will start regardless of current page count >
(pageLimitBytes / pageSizeBytes)
```
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +292,149 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of("FOO");
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ // set page size bytes to be larger than pageLimitBytes
+ addressSettings.setPageSizeBytes(size * 2);
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ // check the original pageSizeBytes is not changed
+ assertEquals(size, queuePagingStore.getPageSizeBytes());
+
+ // send a messages should be allowed because the page file still have
space
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ final String queueName = "FOO";
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1024 * 50;
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
Review Comment:
Setting the address full policy to PAGE explicitly would be good as in other
test.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +292,149 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of("FOO");
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ // set page size bytes to be larger than pageLimitBytes
+ addressSettings.setPageSizeBytes(size * 2);
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ // check the original pageSizeBytes is not changed
+ assertEquals(size, queuePagingStore.getPageSizeBytes());
+
+ // send a messages should be allowed because the page file still have
space
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ final String queueName = "FOO";
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1024 * 50;
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 10));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 30;
+ int messageSize = 1024 * 10;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages > 4);
+
+ // restart the server with a smaller pageLimitSize < existing pages.
+ server.stop(true);
+ waitForServerToStop(server);
+
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 4));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // server will start regardless current pages > pageLimitBytes
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ // verify current situation
+ queue = server.locateQueue(queueAddr);
+
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging()
&& queuePagingStore.isPageFull());
+
+ long currentPages = queuePagingStore.getNumberOfPages();
+ assertEquals(existingPages, currentPages);
+
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ //consume messages until current pages goes down to below maxPage
+ locator = createFactory(isNetty());
+ final int numMessages = 25;
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ session = csf.createSession(false, true);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(queueName);
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ session.commit();
+ }
+
+ currentPages = queuePagingStore.getNumberOfPages();
+ assertTrue(currentPages < maxPages);
Review Comment:
Should it asssert that that paging store is _not_ pageFull, rather than
assuming it isnt? The below bits could fail on the first send attempt and
erroneously pass since they never actually check the store.
##########
docs/user-manual/paging.adoc:
##########
@@ -244,7 +244,16 @@ This is to avoid a single destination using the entire
disk in case their consum
You can configure either `page-limit-bytes` or `page-limit-messages`, along
with `page-full-policy` on the address settings limiting how much data will be
recorded in paging.
-If you configure `page-full-policy` as DROP, messages will be simplify dropped
while the clients will not get any exceptions, while if you configured FAIL the
producers will receive a JMS Exception for the error condition.
+If you configure `page-full-policy` as DROP, messages will be simply dropped
while the clients will not get any exceptions, while if you configured FAIL the
producers will receive a JMS Exception for the error condition.
+
+[NOTE]
+
+The `page-limit-bytes` is converted to number of page files internally (i.e.
`page-limit-bytes` / `page-size-bytes`) and used to compare against the current
number of page files.
+Its value should be equal or greater than `page-size-bytes` to allow at least
one page to be created.
+If the applied value of `page-limit-bytes`, when converted to number of pages,
is less than the number of the current page files in store,
+paging will be blocked based on `page-full-policy` until the number current
page files go down below or equal to the
+value of `page-limit-bytes`, and it will become pageFull again once the number
of page files is greater than
+the value determined by `page-limit-bytes` (`page-limit-bytes` /
`page-size-bytes`).
Review Comment:
```suggestion
The `page-limit-bytes` is used to identify a maximum number of page files
internally (i.e. `page-limit-bytes` / `page-size-bytes`) which is then compared
against the current number of page files.
If configured, `page-limit-bytes` must be equal or greater than
`page-size-bytes` to allow at least one page to be created.
If the limit determined from `page-limit-bytes`, once converted to a number
of pages, is less than the current number of page files in the store then
paging will be blocked based on `page-full-policy` until the number of current
page files drops to less than or equal to the calculated file limit. It will
become blocked again once the number of page files is greater than the value
determined by `page-limit-bytes` (`page-limit-bytes` / `page-size-bytes`).
```
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +292,149 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of("FOO");
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ // set page size bytes to be larger than pageLimitBytes
+ addressSettings.setPageSizeBytes(size * 2);
+ server.getAddressSettingsRepository().addMatch("FOO",
addressSettings);
+
+ // check the original pageSizeBytes is not changed
+ assertEquals(size, queuePagingStore.getPageSizeBytes());
+
+ // send a messages should be allowed because the page file still have
space
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ final String queueName = "FOO";
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1024 * 50;
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 10));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 30;
+ int messageSize = 1024 * 10;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages > 4);
+
+ // restart the server with a smaller pageLimitSize < existing pages.
+ server.stop(true);
+ waitForServerToStop(server);
+
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 4));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // server will start regardless current pages > pageLimitBytes
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ // verify current situation
+ queue = server.locateQueue(queueAddr);
+
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging()
&& queuePagingStore.isPageFull());
+
+ long currentPages = queuePagingStore.getNumberOfPages();
+ assertEquals(existingPages, currentPages);
+
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ //consume messages until current pages goes down to below maxPage
+ locator = createFactory(isNetty());
+ final int numMessages = 25;
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ session = csf.createSession(false, true);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(queueName);
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ session.commit();
+ }
+
+ currentPages = queuePagingStore.getNumberOfPages();
+ assertTrue(currentPages < maxPages);
+
+ //send messages one by one until page full
+ ClientProducer producer = session.createProducer(queueName);
+ boolean isFull = false;
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = createMessage(session,
messageSize, i, null);
+ producer.send(message);
+ session.commit();
+ }
+ } catch (ActiveMQAddressFullException e) {
+ isFull = true;
+ session.close();
+ }
+ assertTrue(isFull);
+ currentPages = queuePagingStore.getNumberOfPages();
+ //now current pages should be one more than maxPages
+ assertTrue(currentPages == maxPages + 1);
Review Comment:
Similarly, checking the store again would make sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact