Author: robbie
Date: Tue Jul 21 09:05:21 2009
New Revision: 796196

URL: http://svn.apache.org/viewvc?rev=796196&view=rev
Log:
QPID-1961: expand viewMessages() queue operation to support long parameters, 
deprecate previous int version.

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Tue Jul 21 09:05:21 2009
@@ -112,6 +112,17 @@
     List<Long> getMessagesOnTheQueue(int num, int offest);
 
     QueueEntry getMessageOnTheQueue(long messageId);
+    
+    /**
+     * Returns a list of QueEntries from a given range of queue positions, eg 
messages 5 to 10 on the queue.
+     * 
+     * The 'queue position' index starts from 1. Using 0 in 'from' will be 
ignored and continue from 1. 
+     * Using 0 in the 'to' field will return an empty list regardless of the 
'from' value.
+     * @param fromPosition
+     * @param toPosition
+     * @return
+     */
+    public List<QueueEntry> getMessagesRangeOnTheQueue(final long 
fromPosition, final long toPosition);
 
 
     void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, 
String queueName,

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Tue Jul 21 09:05:21 2009
@@ -380,25 +380,45 @@
 
     /**
      * Returns the header contents of the messages stored in this queue in 
tabular form.
+     * Deprecated as of Qpid JMX API 1.3
      */
+    @Deprecated
     public TabularData viewMessages(int beginIndex, int endIndex) throws 
JMException
     {
-        if ((beginIndex > endIndex) || (beginIndex < 1))
+        return viewMessages((long)beginIndex,(long)endIndex);
+    }
+    
+    
+    /**
+     * Returns the header contents of the messages stored in this queue in 
tabular form.
+     * @param startPosition The queue position of the first message to be 
viewed
+     * @param endPosition The queue position of the last message to be viewed
+     */
+    public TabularData viewMessages(long startPosition, long endPosition) 
throws JMException
+    {
+        if ((startPosition > endPosition) || (startPosition < 1))
         {
-            throw new OperationsException("From Index = " + beginIndex + ", To 
Index = " + endIndex
+            throw new OperationsException("From Index = " + startPosition + ", 
To Index = " + endPosition
                 + "\n\"From Index\" should be greater than 0 and less than 
\"To Index\"");
         }
+        
+        if ((endPosition - startPosition) > Integer.MAX_VALUE)
+        {
+            throw new OperationsException("Specified MessageID interval is too 
large. Intervals must be less than 2^31 in size");
+        }
 
-        List<QueueEntry> list = _queue.getMessagesOnTheQueue();
+        List<QueueEntry> list = 
_queue.getMessagesRangeOnTheQueue(startPosition,endPosition);
         TabularDataSupport _messageList = new 
TabularDataSupport(_messagelistDataType);
 
         try
         {
             // Create the tabular list of message header contents
-            for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); 
i++)
+            int size = list.size();
+
+            for (int i = 0; i < size ; i++)
             {
-                long position = i;
-                AMQMessage msg = list.get(i - 1).getMessage();
+                long position = startPosition + i;
+                AMQMessage msg = list.get(i).getMessage();
                 ContentHeaderBody headerBody = msg.getContentHeaderBody();
                 // Create header attributes list
                 String[] headerAttributes = 
getMessageHeaderProperties(headerBody);

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Tue Jul 21 09:05:21 2009
@@ -813,6 +813,43 @@
         return entryList;
 
     }
+    
+    /**
+     * Returns a list of QueEntries from a given range of queue positions, eg 
messages 5 to 10 on the queue.
+     * 
+     * The 'queue position' index starts from 1. Using 0 in 'from' will be 
ignored and continue from 1. 
+     * Using 0 in the 'to' field will return an empty list regardless of the 
'from' value.
+     * @param fromPosition
+     * @param toPosition
+     * @return
+     */
+    public List<QueueEntry> getMessagesRangeOnTheQueue(final long 
fromPosition, final long toPosition)
+    {
+        List<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        
+        QueueEntryIterator it = _entries.iterator();
+
+        long index = 1;
+        for ( ; index < fromPosition && !it.atTail(); index++)
+        {
+            it.advance();
+        }
+        
+        if(index < fromPosition)
+        {
+            //The queue does not contain enough entries to reach our range.
+            //return the empty list.
+            return queueEntries;
+        }
+        
+        for ( ; index <= toPosition && !it.atTail(); index++)
+        {
+            it.advance();
+            queueEntries.add(it.getNode());
+        }
+        
+        return queueEntries;
+    }
 
     public void moveMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Tue Jul 21 09:05:21 2009
@@ -187,7 +187,7 @@
     {
         try
         {
-            _queueMBean.viewMessages(0, 3);
+            _queueMBean.viewMessages(0L, 3L);
             fail();
         }
         catch (JMException ex)
@@ -197,7 +197,7 @@
 
         try
         {
-            _queueMBean.viewMessages(2, 1);
+            _queueMBean.viewMessages(2L, 1L);
             fail();
         }
         catch (JMException ex)
@@ -207,13 +207,25 @@
 
         try
         {
-            _queueMBean.viewMessages(-1, 1);
+            _queueMBean.viewMessages(-1L, 1L);
             fail();
         }
         catch (JMException ex)
         {
 
         }
+        
+        try
+        {
+            long end = Integer.MAX_VALUE;
+            end+=2;
+            _queueMBean.viewMessages(1L, end);
+            fail("Expected Exception due to oversized(> 2^31) message range");
+        }
+        catch (JMException ex)
+        {
+
+        }
 
         IncomingMessage msg = message(false, false);
         long id = msg.getMessageId();

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 Tue Jul 21 09:05:21 2009
@@ -209,6 +209,11 @@
     {
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
+    
+    public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long 
toPosition)
+    {
+        return null;
+    }
 
     public void moveMessagesToAnotherQueue(long fromMessageId, long 
toMessageId, String queueName, StoreContext storeContext)
     {

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Tue Jul 21 09:05:21 2009
@@ -335,6 +335,69 @@
             assertEquals("Message ID was wrong", messageId, msgids.get(i));
         }
     }
+    
+    public void testGetMessagesRangeOnTheQueue() throws Exception
+    {
+        for (int i = 1 ; i <= 10; i++)
+        {
+            // Create message
+            Long messageId = new Long(i);
+            AMQMessage message = createMessage(messageId);
+            // Put message on queue
+            _queue.enqueue(null, message);
+        }
+        
+        // Get non-existent 0th QueueEntry & check returned list was empty
+        // (the position parameters in this method are indexed from 1)
+        List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
+        assertTrue(entries.size() == 0);
+        
+        // Check that when 'from' is 0 it is ignored and the range continues 
from 1
+        entries = _queue.getMessagesRangeOnTheQueue(0, 2);
+        assertTrue(entries.size() == 2);
+        long msgID = entries.get(0).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 1L);
+        msgID = entries.get(1).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 2L);
+
+        // Check that when 'from' is greater than 'to' the returned list is 
empty
+        entries = _queue.getMessagesRangeOnTheQueue(5, 4);
+        assertTrue(entries.size() == 0);
+        
+        // Get first QueueEntry & check id 
+        entries = _queue.getMessagesRangeOnTheQueue(1, 1);
+        assertTrue(entries.size() == 1);
+        msgID = entries.get(0).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 1L);
+        
+        // Get 5th,6th,7th entries and check id's
+        entries = _queue.getMessagesRangeOnTheQueue(5, 7);
+        assertTrue(entries.size() == 3);
+        msgID = entries.get(0).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 5L);
+        msgID = entries.get(1).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 6L);
+        msgID = entries.get(2).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 7L);
+        
+        // Get 10th QueueEntry & check id
+        entries = _queue.getMessagesRangeOnTheQueue(10, 10);
+        assertTrue(entries.size() == 1);
+        msgID = entries.get(0).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 10L);
+        
+        // Get non-existent 11th QueueEntry & check returned set was empty
+        entries = _queue.getMessagesRangeOnTheQueue(11, 11);
+        assertTrue(entries.size() == 0);
+        
+        // Get 9th,10th, and non-existent 11th entries & check result is of 
size 2 with correct IDs
+        entries = _queue.getMessagesRangeOnTheQueue(9, 11);
+        assertTrue(entries.size() == 2);
+        msgID = entries.get(0).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 9L);
+        msgID = entries.get(1).getMessage().getMessageId();
+        assertEquals("Message ID was wrong", msgID, 10L);
+    }
   
     public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() 
throws AMQException
     {

Modified: 
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java?rev=796196&r1=796195&r2=796196&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
 Tue Jul 21 09:05:21 2009
@@ -207,18 +207,42 @@
     /**
      * Returns a subset of all the messages stored in the queue. The messages
      * are returned based on the given index numbers.
+     * 
+     * Deprecated as of Qpid JMX API 1.3
      * @param fromIndex
      * @param toIndex
      * @return
      * @throws IOException
      * @throws JMException
      */
+    @Deprecated
     @MBeanOperation(name="viewMessages",
                     description="Message headers for messages in this queue 
within given index range. eg. from index 1 - 100")
     TabularData viewMessages(@MBeanOperationParameter(name="from index", 
description="from index")int fromIndex,
                              @MBeanOperationParameter(name="to index", 
description="to index")int toIndex)
             throws IOException, JMException;
+    
+    /**
+     * Returns a subset (up to 2^31 messages at a time) of all the messages 
stored on the queue. 
+     * The messages are returned based on the given queue position range.
+     * @param startPosition
+     * @param endPosition
+     * @return
+     * @throws IOException
+     * @throws JMException
+     */
+    @MBeanOperation(name="viewMessages",
+                    description="Message headers for messages in this queue 
within given queue positions range. eg. from index 1 - 100")
+    TabularData viewMessages(@MBeanOperationParameter(name="start position", 
description="start position")long startPosition,
+                             @MBeanOperationParameter(name="end position", 
description="end position")long endPosition)
+            throws IOException, JMException;
 
+    /**
+     * Returns the content for the given AMQ Message ID.
+     * 
+     * @throws IOException
+     * @throws JMException
+     */
     @MBeanOperation(name="viewMessageContent", description="The message 
content for given Message Id")
     CompositeData viewMessageContent(@MBeanOperationParameter(name="Message 
Id", description="Message Id")long messageId)
         throws IOException, JMException;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to