Author: robbie
Date: Tue Jul 21 09:05:21 2009
New Revision: 796196
URL: http://svn.apache.org/viewvc?rev=796196&view=rev
Log:
QPID-1961: expand viewMessages() queue operation to support long parameters,
deprecate previous int version.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Tue Jul 21 09:05:21 2009
@@ -112,6 +112,17 @@
List<Long> getMessagesOnTheQueue(int num, int offest);
QueueEntry getMessageOnTheQueue(long messageId);
+
+ /**
+ * Returns a list of QueEntries from a given range of queue positions, eg
messages 5 to 10 on the queue.
+ *
+ * The 'queue position' index starts from 1. Using 0 in 'from' will be
ignored and continue from 1.
+ * Using 0 in the 'to' field will return an empty list regardless of the
'from' value.
+ * @param fromPosition
+ * @param toPosition
+ * @return
+ */
+ public List<QueueEntry> getMessagesRangeOnTheQueue(final long
fromPosition, final long toPosition);
void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId,
String queueName,
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Tue Jul 21 09:05:21 2009
@@ -380,25 +380,45 @@
/**
* Returns the header contents of the messages stored in this queue in
tabular form.
+ * Deprecated as of Qpid JMX API 1.3
*/
+ @Deprecated
public TabularData viewMessages(int beginIndex, int endIndex) throws
JMException
{
- if ((beginIndex > endIndex) || (beginIndex < 1))
+ return viewMessages((long)beginIndex,(long)endIndex);
+ }
+
+
+ /**
+ * Returns the header contents of the messages stored in this queue in
tabular form.
+ * @param startPosition The queue position of the first message to be
viewed
+ * @param endPosition The queue position of the last message to be viewed
+ */
+ public TabularData viewMessages(long startPosition, long endPosition)
throws JMException
+ {
+ if ((startPosition > endPosition) || (startPosition < 1))
{
- throw new OperationsException("From Index = " + beginIndex + ", To
Index = " + endIndex
+ throw new OperationsException("From Index = " + startPosition + ",
To Index = " + endPosition
+ "\n\"From Index\" should be greater than 0 and less than
\"To Index\"");
}
+
+ if ((endPosition - startPosition) > Integer.MAX_VALUE)
+ {
+ throw new OperationsException("Specified MessageID interval is too
large. Intervals must be less than 2^31 in size");
+ }
- List<QueueEntry> list = _queue.getMessagesOnTheQueue();
+ List<QueueEntry> list =
_queue.getMessagesRangeOnTheQueue(startPosition,endPosition);
TabularDataSupport _messageList = new
TabularDataSupport(_messagelistDataType);
try
{
// Create the tabular list of message header contents
- for (int i = beginIndex; (i <= endIndex) && (i <= list.size());
i++)
+ int size = list.size();
+
+ for (int i = 0; i < size ; i++)
{
- long position = i;
- AMQMessage msg = list.get(i - 1).getMessage();
+ long position = startPosition + i;
+ AMQMessage msg = list.get(i).getMessage();
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
String[] headerAttributes =
getMessageHeaderProperties(headerBody);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Tue Jul 21 09:05:21 2009
@@ -813,6 +813,43 @@
return entryList;
}
+
+ /**
+ * Returns a list of QueEntries from a given range of queue positions, eg
messages 5 to 10 on the queue.
+ *
+ * The 'queue position' index starts from 1. Using 0 in 'from' will be
ignored and continue from 1.
+ * Using 0 in the 'to' field will return an empty list regardless of the
'from' value.
+ * @param fromPosition
+ * @param toPosition
+ * @return
+ */
+ public List<QueueEntry> getMessagesRangeOnTheQueue(final long
fromPosition, final long toPosition)
+ {
+ List<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+
+ QueueEntryIterator it = _entries.iterator();
+
+ long index = 1;
+ for ( ; index < fromPosition && !it.atTail(); index++)
+ {
+ it.advance();
+ }
+
+ if(index < fromPosition)
+ {
+ //The queue does not contain enough entries to reach our range.
+ //return the empty list.
+ return queueEntries;
+ }
+
+ for ( ; index <= toPosition && !it.atTail(); index++)
+ {
+ it.advance();
+ queueEntries.add(it.getNode());
+ }
+
+ return queueEntries;
+ }
public void moveMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Tue Jul 21 09:05:21 2009
@@ -187,7 +187,7 @@
{
try
{
- _queueMBean.viewMessages(0, 3);
+ _queueMBean.viewMessages(0L, 3L);
fail();
}
catch (JMException ex)
@@ -197,7 +197,7 @@
try
{
- _queueMBean.viewMessages(2, 1);
+ _queueMBean.viewMessages(2L, 1L);
fail();
}
catch (JMException ex)
@@ -207,13 +207,25 @@
try
{
- _queueMBean.viewMessages(-1, 1);
+ _queueMBean.viewMessages(-1L, 1L);
fail();
}
catch (JMException ex)
{
}
+
+ try
+ {
+ long end = Integer.MAX_VALUE;
+ end+=2;
+ _queueMBean.viewMessages(1L, end);
+ fail("Expected Exception due to oversized(> 2^31) message range");
+ }
+ catch (JMException ex)
+ {
+
+ }
IncomingMessage msg = message(false, false);
long id = msg.getMessageId();
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Tue Jul 21 09:05:21 2009
@@ -209,6 +209,11 @@
{
return null; //To change body of implemented methods use File |
Settings | File Templates.
}
+
+ public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long
toPosition)
+ {
+ return null;
+ }
public void moveMessagesToAnotherQueue(long fromMessageId, long
toMessageId, String queueName, StoreContext storeContext)
{
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Tue Jul 21 09:05:21 2009
@@ -335,6 +335,69 @@
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
+
+ public void testGetMessagesRangeOnTheQueue() throws Exception
+ {
+ for (int i = 1 ; i <= 10; i++)
+ {
+ // Create message
+ Long messageId = new Long(i);
+ AMQMessage message = createMessage(messageId);
+ // Put message on queue
+ _queue.enqueue(null, message);
+ }
+
+ // Get non-existent 0th QueueEntry & check returned list was empty
+ // (the position parameters in this method are indexed from 1)
+ List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
+ assertTrue(entries.size() == 0);
+
+ // Check that when 'from' is 0 it is ignored and the range continues
from 1
+ entries = _queue.getMessagesRangeOnTheQueue(0, 2);
+ assertTrue(entries.size() == 2);
+ long msgID = entries.get(0).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 1L);
+ msgID = entries.get(1).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 2L);
+
+ // Check that when 'from' is greater than 'to' the returned list is
empty
+ entries = _queue.getMessagesRangeOnTheQueue(5, 4);
+ assertTrue(entries.size() == 0);
+
+ // Get first QueueEntry & check id
+ entries = _queue.getMessagesRangeOnTheQueue(1, 1);
+ assertTrue(entries.size() == 1);
+ msgID = entries.get(0).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 1L);
+
+ // Get 5th,6th,7th entries and check id's
+ entries = _queue.getMessagesRangeOnTheQueue(5, 7);
+ assertTrue(entries.size() == 3);
+ msgID = entries.get(0).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 5L);
+ msgID = entries.get(1).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 6L);
+ msgID = entries.get(2).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 7L);
+
+ // Get 10th QueueEntry & check id
+ entries = _queue.getMessagesRangeOnTheQueue(10, 10);
+ assertTrue(entries.size() == 1);
+ msgID = entries.get(0).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 10L);
+
+ // Get non-existent 11th QueueEntry & check returned set was empty
+ entries = _queue.getMessagesRangeOnTheQueue(11, 11);
+ assertTrue(entries.size() == 0);
+
+ // Get 9th,10th, and non-existent 11th entries & check result is of
size 2 with correct IDs
+ entries = _queue.getMessagesRangeOnTheQueue(9, 11);
+ assertTrue(entries.size() == 2);
+ msgID = entries.get(0).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 9L);
+ msgID = entries.get(1).getMessage().getMessageId();
+ assertEquals("Message ID was wrong", msgID, 10L);
+ }
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue()
throws AMQException
{
Modified:
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
---
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
(original)
+++
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
Tue Jul 21 09:05:21 2009
@@ -207,18 +207,42 @@
/**
* Returns a subset of all the messages stored in the queue. The messages
* are returned based on the given index numbers.
+ *
+ * Deprecated as of Qpid JMX API 1.3
* @param fromIndex
* @param toIndex
* @return
* @throws IOException
* @throws JMException
*/
+ @Deprecated
@MBeanOperation(name="viewMessages",
description="Message headers for messages in this queue
within given index range. eg. from index 1 - 100")
TabularData viewMessages(@MBeanOperationParameter(name="from index",
description="from index")int fromIndex,
@MBeanOperationParameter(name="to index",
description="to index")int toIndex)
throws IOException, JMException;
+
+ /**
+ * Returns a subset (up to 2^31 messages at a time) of all the messages
stored on the queue.
+ * The messages are returned based on the given queue position range.
+ * @param startPosition
+ * @param endPosition
+ * @return
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanOperation(name="viewMessages",
+ description="Message headers for messages in this queue
within given queue positions range. eg. from index 1 - 100")
+ TabularData viewMessages(@MBeanOperationParameter(name="start position",
description="start position")long startPosition,
+ @MBeanOperationParameter(name="end position",
description="end position")long endPosition)
+ throws IOException, JMException;
+ /**
+ * Returns the content for the given AMQ Message ID.
+ *
+ * @throws IOException
+ * @throws JMException
+ */
@MBeanOperation(name="viewMessageContent", description="The message
content for given Message Id")
CompositeData viewMessageContent(@MBeanOperationParameter(name="Message
Id", description="Message Id")long messageId)
throws IOException, JMException;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]