ARTEMIS-252 Added support to retry messages via JMX on JMS Queue interface
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98917259 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98917259 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98917259 Branch: refs/heads/master Commit: 989172596ee41c77fc5e6cd40c9bdbed89a61a90 Parents: 7afe879 Author: Petter Nordlander <[email protected]> Authored: Sun Oct 11 19:34:23 2015 +0200 Committer: Clebert Suconic <[email protected]> Committed: Mon Oct 12 17:03:42 2015 -0400 ---------------------------------------------------------------------- .../api/jms/management/JMSQueueControl.java | 21 ++++++ .../management/impl/JMSQueueControlImpl.java | 19 +++++ .../server/management/JMSQueueControlTest.java | 78 ++++++++++++++++++++ .../management/JMSQueueControlUsingJMSTest.java | 13 ++++ 4 files changed, 131 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java index 43e20ab..246fe7a 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java @@ -228,6 +228,27 @@ public interface JMSQueueControl extends DestinationControl { @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception; /** + * Retries the message corresponding to the given messageID to the original queue. + * This is appropriate on dead messages on Dead letter queues only. + * + * @param messageID + * @return {@code true} if the message was retried, {@code false}Â else + * @throws Exception + */ + @Operation(desc = "Retry the message corresponding to the given messageID to the original queue", impact = MBeanOperationInfo.ACTION) + boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID) throws Exception; + + /** + * Retries all messages on a DLQ to their respective original queues. + * This is appropriate on dead messages on Dead letter queues only. + * + * @return the number of retried messages. + * @throws Exception + */ + @Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact = MBeanOperationInfo.ACTION) + int retryMessages() throws Exception; + + /** * Lists the message counter for this queue. */ @Operation(desc = "List the message counters", impact = MBeanOperationInfo.INFO) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java index ed2d922..ff7a387 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java @@ -275,6 +275,25 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro return coreQueueControl.changeMessagesPriority(filter, newPriority); } + public boolean retryMessage(final String jmsMessageID) throws Exception { + + // Figure out messageID from JMSMessageID. + final String filter = createFilterForJMSMessageID(jmsMessageID); + Map<String,Object>[] messages = coreQueueControl.listMessages(filter); + if ( messages.length != 1) { // if no messages. There should not be more than one, JMSMessageID should be unique. + return false; + } + + final Map<String,Object> messageToRedeliver = messages[0]; + Long messageID = (Long)messageToRedeliver.get("messageID"); + return messageID != null && coreQueueControl.retryMessage(messageID); + } + + public int retryMessages() throws Exception { + return coreQueueControl.retryMessages(); + } + + public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception { return moveMessage(messageID, otherQueueName, false); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java index b98a165..b5183ca 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java @@ -792,6 +792,84 @@ public class JMSQueueControlTest extends ManagementTestBase { connection.close(); } + + protected ActiveMQQueue createDLQ(final String deadLetterQueueName) throws Exception { + serverManager.createQueue(false, deadLetterQueueName, null, true, deadLetterQueueName); + return (ActiveMQQueue) ActiveMQJMSClient.createQueue(deadLetterQueueName); + } + + protected ActiveMQQueue createTestQueueWithDLQ(final String queueName, final ActiveMQQueue dlq) throws Exception { + serverManager.createQueue(false,queueName,null,true,queueName); + ActiveMQQueue testQueue = (ActiveMQQueue) ActiveMQJMSClient.createQueue(queueName); + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress())); + addressSettings.setMaxDeliveryAttempts(1); + server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), addressSettings); + return testQueue; + } + + /** + * Test retrying all messages put on DLQ - i.e. they should appear on the original queue. + * @throws Exception + */ + @Test + public void testRetryMessages() throws Exception { + ActiveMQQueue dlq = createDLQ(RandomUtil.randomString()); + ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq); + + final int numMessagesToTest = 10; + JMSUtil.sendMessages(testQueue, numMessagesToTest); + + Connection connection = createConnection(); + connection.start(); + Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(testQueue); + for (int i = 0;i < numMessagesToTest;i++) { + Message msg = consumer.receive(500L); + } + session.rollback(); // All <numMessagesToTest> messages should now be on DLQ + + JMSQueueControl testQueueControl = createManagementControl(testQueue); + JMSQueueControl dlqQueueControl = createManagementControl(dlq); + Assert.assertEquals(0, getMessageCount(testQueueControl)); + Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl)); + + dlqQueueControl.retryMessages(); + + Assert.assertEquals(numMessagesToTest, getMessageCount(testQueueControl)); + Assert.assertEquals(0,getMessageCount(dlqQueueControl)); + } + + /** + * Test retrying a specific message on DLQ. + * Expected to be sent back to original queue. + * @throws Exception + */ + @Test + public void testRetryMessage() throws Exception { + ActiveMQQueue dlq = createDLQ(RandomUtil.randomString()); + ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq); + String messageID = JMSUtil.sendMessages(testQueue,1)[0]; + + Connection connection = createConnection(); + connection.start(); + Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(testQueue); + consumer.receive(500L); + session.rollback(); // All <numMessagesToTest> messages should now be on DLQ + + JMSQueueControl testQueueControl = createManagementControl(testQueue); + JMSQueueControl dlqQueueControl = createManagementControl(dlq); + Assert.assertEquals(0, getMessageCount(testQueueControl)); + Assert.assertEquals(1,getMessageCount(dlqQueueControl)); + + dlqQueueControl.retryMessage(messageID); + + Assert.assertEquals(1, getMessageCount(testQueueControl)); + Assert.assertEquals(0,getMessageCount(dlqQueueControl)); + + } + @Test public void testMoveMessage() throws Exception { String otherQueueName = RandomUtil.randomString(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java index 1d79d7d..c98f942 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.jms.server.management; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; @@ -171,6 +172,18 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest { return (String) proxy.invokeOperation("listMessageCounterHistory"); } + public boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception { + return (Boolean) proxy.invokeOperation("retryMessage",messageID); + } + + public int retryMessages() throws Exception { + return (Integer) proxy.invokeOperation("retryMessages"); + } + + public boolean retryMessage(final String messageID) throws Exception { + return (Boolean) proxy.invokeOperation("retryMessage",messageID); + } + @Override public Map<String, Object>[] listScheduledMessages() throws Exception { return null;
