ARTEMIS-252 Added support to retry messages via JMX on JMS Queue interface

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98917259
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98917259
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98917259

Branch: refs/heads/master
Commit: 989172596ee41c77fc5e6cd40c9bdbed89a61a90
Parents: 7afe879
Author: Petter Nordlander <[email protected]>
Authored: Sun Oct 11 19:34:23 2015 +0200
Committer: Clebert Suconic <[email protected]>
Committed: Mon Oct 12 17:03:42 2015 -0400

----------------------------------------------------------------------
 .../api/jms/management/JMSQueueControl.java     | 21 ++++++
 .../management/impl/JMSQueueControlImpl.java    | 19 +++++
 .../server/management/JMSQueueControlTest.java  | 78 ++++++++++++++++++++
 .../management/JMSQueueControlUsingJMSTest.java | 13 ++++
 4 files changed, 131 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
index 43e20ab..246fe7a 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
@@ -228,6 +228,27 @@ public interface JMSQueueControl extends 
DestinationControl {
                     @Parameter(name = "rejectDuplicates", desc = "Reject 
messages identified as duplicate by the duplicate message") boolean 
rejectDuplicates) throws Exception;
 
    /**
+    * Retries the message corresponding to the given messageID to the original 
queue.
+    * This is appropriate on dead messages on Dead letter queues only.
+    *
+    * @param messageID
+    * @return {@code true} if the message was retried, {@code false} else
+    * @throws Exception
+    */
+   @Operation(desc = "Retry the message corresponding to the given messageID 
to the original queue", impact = MBeanOperationInfo.ACTION)
+   boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") 
String messageID) throws Exception;
+
+   /**
+    * Retries all messages on a DLQ to their respective original queues.
+    * This is appropriate on dead messages on Dead letter queues only.
+    *
+    * @return the number of retried messages.
+    * @throws Exception
+    */
+   @Operation(desc = "Retry all messages on a DLQ to their respective original 
queues", impact = MBeanOperationInfo.ACTION)
+   int retryMessages() throws Exception;
+
+   /**
     * Lists the message counter for this queue.
     */
    @Operation(desc = "List the message counters", impact = 
MBeanOperationInfo.INFO)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
index ed2d922..ff7a387 100644
--- 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
+++ 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
@@ -275,6 +275,25 @@ public class JMSQueueControlImpl extends StandardMBean 
implements JMSQueueContro
       return coreQueueControl.changeMessagesPriority(filter, newPriority);
    }
 
+   public boolean retryMessage(final String jmsMessageID) throws Exception {
+
+      // Figure out messageID from JMSMessageID.
+      final String filter = createFilterForJMSMessageID(jmsMessageID);
+      Map<String,Object>[] messages = coreQueueControl.listMessages(filter);
+      if ( messages.length != 1) { // if no messages. There should not be more 
than one, JMSMessageID should be unique.
+         return false;
+      }
+
+      final Map<String,Object> messageToRedeliver = messages[0];
+      Long messageID = (Long)messageToRedeliver.get("messageID");
+      return messageID != null && coreQueueControl.retryMessage(messageID);
+   }
+
+   public int retryMessages() throws Exception {
+      return coreQueueControl.retryMessages();
+   }
+
+
    public boolean moveMessage(final String messageID, final String 
otherQueueName) throws Exception {
       return moveMessage(messageID, otherQueueName, false);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
index b98a165..b5183ca 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
@@ -792,6 +792,84 @@ public class JMSQueueControlTest extends 
ManagementTestBase {
       connection.close();
    }
 
+
+   protected ActiveMQQueue createDLQ(final String deadLetterQueueName) throws 
Exception {
+      serverManager.createQueue(false, deadLetterQueueName, null, true, 
deadLetterQueueName);
+      return (ActiveMQQueue) 
ActiveMQJMSClient.createQueue(deadLetterQueueName);
+   }
+
+   protected ActiveMQQueue createTestQueueWithDLQ(final String queueName, 
final ActiveMQQueue dlq) throws Exception {
+      serverManager.createQueue(false,queueName,null,true,queueName);
+      ActiveMQQueue testQueue = (ActiveMQQueue) 
ActiveMQJMSClient.createQueue(queueName);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress()));
+      addressSettings.setMaxDeliveryAttempts(1);
+      server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), 
addressSettings);
+      return testQueue;
+   }
+
+   /**
+    * Test retrying all messages put on DLQ - i.e. they should appear on the 
original queue.
+    * @throws Exception
+    */
+   @Test
+   public void testRetryMessages() throws Exception {
+      ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
+      ActiveMQQueue testQueue = 
createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
+
+      final int numMessagesToTest = 10;
+      JMSUtil.sendMessages(testQueue, numMessagesToTest);
+
+      Connection connection = createConnection();
+      connection.start();
+      Session session = 
connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(testQueue);
+      for (int i = 0;i < numMessagesToTest;i++) {
+         Message msg = consumer.receive(500L);
+      }
+      session.rollback(); // All <numMessagesToTest> messages should now be on 
DLQ
+
+      JMSQueueControl testQueueControl = createManagementControl(testQueue);
+      JMSQueueControl dlqQueueControl = createManagementControl(dlq);
+      Assert.assertEquals(0, getMessageCount(testQueueControl));
+      Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl));
+
+      dlqQueueControl.retryMessages();
+
+      Assert.assertEquals(numMessagesToTest, 
getMessageCount(testQueueControl));
+      Assert.assertEquals(0,getMessageCount(dlqQueueControl));
+   }
+
+   /**
+    * Test retrying a specific message on DLQ.
+    * Expected to be sent back to original queue.
+    * @throws Exception
+    */
+   @Test
+   public void testRetryMessage() throws Exception {
+      ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
+      ActiveMQQueue testQueue = 
createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
+      String messageID = JMSUtil.sendMessages(testQueue,1)[0];
+
+      Connection connection = createConnection();
+      connection.start();
+      Session session = 
connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(testQueue);
+      consumer.receive(500L);
+      session.rollback(); // All <numMessagesToTest> messages should now be on 
DLQ
+
+      JMSQueueControl testQueueControl = createManagementControl(testQueue);
+      JMSQueueControl dlqQueueControl = createManagementControl(dlq);
+      Assert.assertEquals(0, getMessageCount(testQueueControl));
+      Assert.assertEquals(1,getMessageCount(dlqQueueControl));
+
+      dlqQueueControl.retryMessage(messageID);
+
+      Assert.assertEquals(1, getMessageCount(testQueueControl));
+      Assert.assertEquals(0,getMessageCount(dlqQueueControl));
+
+   }
+
    @Test
    public void testMoveMessage() throws Exception {
       String otherQueueName = RandomUtil.randomString();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
index 1d79d7d..c98f942 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.jms.server.management;
 
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.management.Parameter;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.JMSFactoryType;
@@ -171,6 +172,18 @@ public class JMSQueueControlUsingJMSTest extends 
JMSQueueControlTest {
             return (String) proxy.invokeOperation("listMessageCounterHistory");
          }
 
+         public boolean retryMessage(@Parameter(name = "messageID", desc = "A 
message ID") long messageID) throws Exception {
+            return (Boolean) proxy.invokeOperation("retryMessage",messageID);
+         }
+
+         public int retryMessages() throws Exception {
+            return (Integer) proxy.invokeOperation("retryMessages");
+         }
+
+         public boolean retryMessage(final String messageID) throws Exception {
+            return (Boolean) proxy.invokeOperation("retryMessage",messageID);
+         }
+
          @Override
          public Map<String, Object>[] listScheduledMessages() throws Exception 
{
             return null;

Reply via email to