This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new f2e5e02cd5 ARTEMIS-5779 add a DLQ retry method with selector support
f2e5e02cd5 is described below
commit f2e5e02cd53fa4bb7552c40f9bbbea3994f61820
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Nov 21 15:31:52 2025 -0500
ARTEMIS-5779 add a DLQ retry method with selector support
Leverage the existing functionality to add a new API that accepts a filter
string to be used when retrying messages from the DLQ.
---
.../apache/activemq/artemis/logs/AuditLogger.java | 8 ++
.../artemis/api/core/management/QueueControl.java | 9 ++
.../core/management/impl/QueueControlImpl.java | 18 ++++
.../integration/management/QueueControlTest.java | 104 +++++++++++++++++++++
.../management/QueueControlUsingCoreTest.java | 5 +
5 files changed, 144 insertions(+)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 841455db7b..4ff50500c6 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2888,4 +2888,12 @@ public interface AuditLogger {
@LogMessage(id = 601801, value = "User {} is deleting a address on target
resource: {} {}", level = LogMessage.Level.INFO)
void destroyAddress(String user, Object source, Object... args);
+
+ static void retryMessages(Object source, Object... args) {
+ BASE_LOGGER.retryMessages(getCaller(), source, parametersList(args));
+ }
+
+ @LogMessage(id = 601802, value = "User {} is retry sending messages on
target resource: {} {}", level = LogMessage.Level.INFO)
+ void retryMessages(String user, Object source, String args);
+
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 3aa4850058..97d08fc003 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -456,6 +456,15 @@ public interface QueueControl {
@Operation(desc = "Retry all messages on a DLQ to their respective original
queues", impact = MBeanOperationInfo.ACTION)
int retryMessages() throws Exception;
+ /**
+ * Retries all messages that match the given filter on a DLQ to their
respective original queues.
+ * This operation is appropriate for Dead letter queues only.
+ *
+ * @return the number of retried messages.
+ */
+ @Operation(desc = "Retry all messages matching the given filter on a DLQ to
their respective original queues", impact = MBeanOperationInfo.ACTION)
+ int retryMessages(@Parameter(name = "filter", desc = "A message filter (can
be empty)") String filter) throws Exception;
+
/**
* Moves the message corresponding to the specified message ID to the
specified other queue.
*
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index ad8b45a81d..de7b6876d1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1324,6 +1324,24 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
}
}
+ @Override
+ public int retryMessages(String filter) throws Exception {
+ // this is a critical task, we need to prevent parallel tasks running
+ try (AutoCloseable lock = server.managementLock()) {
+ if (AuditLogger.isBaseLoggingEnabled()) {
+ AuditLogger.retryMessages(queue, filter);
+ }
+ checkStarted();
+ clearIO();
+
+ try {
+ return queue.retryMessages(FilterImpl.createFilter(filter));
+ } finally {
+ blockOnIO();
+ }
+ }
+ }
+
@Override
public boolean moveMessage(final long messageID, final String
otherQueueName) throws Exception {
return moveMessage(messageID, otherQueueName, false);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 012b169bee..9f115e48a2 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -108,6 +108,7 @@ import static
org.apache.activemq.artemis.core.message.openmbean.CompositeDataCo
import static
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.STRING_PROPERTIES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -2236,6 +2237,109 @@ public class QueueControlTest extends
ManagementTestBase {
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
}
+ /**
+ * Test retry messages matching a filter from DLQ to original queue.
+ */
+ @TestTemplate
+ public void testRetryMatchingMessages() throws Exception {
+ final SimpleString dla = SimpleString.of("DLA");
+ final SimpleString qName = SimpleString.of("q1");
+ final SimpleString adName = SimpleString.of("ad1");
+ final SimpleString dlq = SimpleString.of("DLQ1");
+ final String sampleText = "Put me on DLQ";
+
+ AddressSettings addressSettings = new
AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
+ server.getAddressSettingsRepository().addMatch(adName.toString(),
addressSettings);
+
+
session.createQueue(QueueConfiguration.of(dlq).setAddress(dla).setDurable(durable));
+
session.createQueue(QueueConfiguration.of(qName).setAddress(adName).setDurable(durable));
+
+ // Send message to queue.
+ ClientProducer producer = session.createProducer(adName);
+
+ producer.send(createTextMessage(session, sampleText +
":red").putStringProperty("color", "red"));
+ producer.send(createTextMessage(session, sampleText +
":green").putStringProperty("color", "green"));
+ producer.send(createTextMessage(session, sampleText +
":blue").putStringProperty("color", "blue"));
+
+ session.start();
+
+ final LocalQueueBinding binding = (LocalQueueBinding)
server.getPostOffice().getBinding(qName);
+ Queue q = binding.getQueue();
+ final LocalQueueBinding binding2 = (LocalQueueBinding)
server.getPostOffice().getBinding(dlq);
+ Queue q2 = binding2.getQueue();
+
+ //Verify that original queue has a memory size greater than 0 and DLQ is 0
+ assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0);
+ assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
+
+ // Read and rollback all messages to DLQ
+ ClientConsumer clientConsumer = session.createConsumer(qName);
+
+ ClientMessage clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ assertNotNull(clientMessage);
+ assertEquals(sampleText + ":red",
clientMessage.getBodyBuffer().readString());
+ session.rollback();
+ clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ assertNotNull(clientMessage);
+ assertEquals(sampleText + ":green",
clientMessage.getBodyBuffer().readString());
+ session.rollback();
+ clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ assertNotNull(clientMessage);
+ assertEquals(sampleText + ":blue",
clientMessage.getBodyBuffer().readString());
+ session.rollback();
+
+ assertNull(clientConsumer.receiveImmediate());
+
+ //Verify that original queue has a memory size of 0 and DLQ is greater
than 0 after rollback
+ assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
+ assertTrue(QueueImplTestAccessor.getQueueMemorySize(q2) > 0);
+
+ QueueControl dlqQueueControl = createManagementControl(dla, dlq);
+ assertMessageMetrics(dlqQueueControl, 3, durable);
+
+ // Retry matching messages - i.e. they should go from DLQ to original
Queue.
+ assertEquals(1, dlqQueueControl.retryMessages("color = 'green'"));
+
+ // Assert DLQ is not empty...
+ assertMessageMetrics(dlqQueueControl, 2, durable);
+
+ //Verify that original queue has a memory size of greater than 0 and DLQ
still has data since the filter only matched one
+ assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0);
+ assertNotEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
+
+ // .. and that the message is now on the original queue once more.
+ {
+ ClientMessage retriedMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ assertNotNull(retriedMessage);
+ assertEquals(sampleText + ":green",
retriedMessage.getBodyBuffer().readString());
+ }
+
+ // Retry moving messages without a filter - i.e. all remaining should go
from DLQ to original Queue.
+ assertEquals(2, dlqQueueControl.retryMessages(null));
+
+ // .. and that the messages are now on the original queue once more.
+ {
+ ClientMessage retriedMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ assertNotNull(retriedMessage);
+ assertEquals(sampleText + ":red",
retriedMessage.getBodyBuffer().readString());
+ retriedMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ assertNotNull(retriedMessage);
+ assertEquals(sampleText + ":blue",
retriedMessage.getBodyBuffer().readString());
+ }
+
+ clientConsumer.close();
+
+ //Verify that original queue and DLQ have a memory size of 0
+ assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
+ assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
+ }
+
/**
* Test send to DLA while paging includes paged messages
*/
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 5cea1efd54..d467bcf3b6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -525,6 +525,11 @@ public class QueueControlUsingCoreTest extends
QueueControlTest {
return (Integer) proxy.invokeOperation(Integer.class,
"retryMessages");
}
+ @Override
+ public int retryMessages(String filter) throws Exception {
+ return (Integer) proxy.invokeOperation(Integer.class,
"retryMessages", filter);
+ }
+
@Override
public int removeMessages(final String filter) throws Exception {
return (Integer) proxy.invokeOperation(Integer.class,
"removeMessages", filter);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact