Author: robbie
Date: Tue Jul 21 09:12:28 2009
New Revision: 796203
URL: http://svn.apache.org/viewvc?rev=796203&view=rev
Log:
QPID-1968: Expose deleteMessages() queue operation through the JMX MBean
interface, add test for deleteMessages()
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=796203&r1=796202&r2=796203&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Tue Jul 21 09:12:28 2009
@@ -478,13 +478,29 @@
{
if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
- throw new OperationsException("\"From MessageId\" should be
greater then 0 and less then \"To MessageId\"");
+ throw new OperationsException("\"From MessageId\" should be
greater than 0 and less than \"To MessageId\"");
}
_queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId,
toQueueName, _storeContext);
}
/**
+ * @see ManagedQueue#deleteMessages
+ * @param fromMessageId
+ * @param toMessageId
+ * @throws JMException
+ */
+ public void deleteMessages(long fromMessageId, long toMessageId) throws
JMException
+ {
+ if ((fromMessageId > toMessageId) || (fromMessageId < 1))
+ {
+ throw new OperationsException("\"From MessageId\" should be
greater than 0 and less than \"To MessageId\"");
+ }
+
+ _queue.removeMessagesFromQueue(fromMessageId, toMessageId,
_storeContext);
+ }
+
+ /**
* returns Notifications sent by this MBean.
*/
@Override
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=796203&r1=796202&r2=796203&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Tue Jul 21 09:12:28 2009
@@ -108,6 +108,52 @@
//Ensure that the data has been removed from the Store
verifyBrokerState();
}
+
+ public void testDeleteMessages() throws Exception
+ {
+ int messageCount = 10;
+ sendMessages(messageCount, true);
+ assertEquals("", messageCount,
_queueMBean.getMessageCount().intValue());
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ long queueDepth = (messageCount * MESSAGE_SIZE);
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+ //delete first message
+ _queueMBean.deleteMessages(1L,1L);
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ try
+ {
+ _queueMBean.viewMessageContent(1L);
+ fail("Message should no longer be on the queue");
+ }
+ catch(Exception e)
+ {
+
+ }
+
+ //delete last message, leaving 2nd to 9th
+ _queueMBean.deleteMessages(10L,10L);
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 2));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ try
+ {
+ _queueMBean.viewMessageContent(10L);
+ fail("Message should no longer be on the queue");
+ }
+ catch(Exception e)
+ {
+
+ }
+
+ //delete remaining messages, leaving none
+ _queueMBean.deleteMessages(2L,9L);
+ assertTrue(_queueMBean.getMessageCount() == (0));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
// todo: collect to a general testing class -duplicated from
Systest/MessageReturntest
private void verifyBrokerState()
Modified:
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java?rev=796203&r1=796202&r2=796203&view=diff
==============================================================================
---
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
(original)
+++
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
Tue Jul 21 09:12:28 2009
@@ -284,4 +284,18 @@
@MBeanOperationParameter(name="to MessageId",
description="to MessageId")long toMessageId,
@MBeanOperationParameter(name= ManagedQueue.TYPE,
description="to Queue Name")String toQueue)
throws IOException, JMException;
+
+ /**
+ * Deletes the messages in given range of AMQ message Ids in the given
Queue.
+ * @param fromMessageId first in the range of message ids
+ * @param toMessageId last in the range of message ids
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanOperation(name="deleteMessages",
+ description="Delete a range of messages from a specified
queue",
+ impact= MBeanOperationInfo.ACTION)
+ void deleteMessages(@MBeanOperationParameter(name="from MessageId",
description="from MessageId")long fromMessageId,
+ @MBeanOperationParameter(name="to MessageId",
description="to MessageId")long toMessageId)
+ throws IOException, JMException;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]