Author: robbie
Date: Mon Jul 30 13:36:00 2012
New Revision: 1367084

URL: http://svn.apache.org/viewvc?rev=1367084&view=rev
Log:
QPID-4170: prevent JMX threads from spinning in the Queue MBean if the content 
retrieval fails, log an error if it does. Add unit tests to expose issue and 
verify fix.

Modified:
    
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
    
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java

Modified: 
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1367084&r1=1367083&r2=1367084&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
 Mon Jul 30 13:36:00 2012
@@ -43,6 +43,7 @@ import javax.management.openmbean.Tabula
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.log4j.Logger;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.server.jmx.AMQManagedObject;
 import org.apache.qpid.server.jmx.ManagedObject;
@@ -59,6 +60,8 @@ import org.apache.qpid.server.queue.Queu
 
 public class QueueMBean extends AMQManagedObject implements ManagedQueue, 
QueueNotificationListener
 {
+    private static final Logger LOGGER = Logger.getLogger(QueueMBean.class);
+
     private static final String[] VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY =
             VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new 
String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]);
 
@@ -370,12 +373,14 @@ public class QueueMBean extends AMQManag
         byte[] msgContent = new byte[bodySize];
 
         ByteBuffer buf = ByteBuffer.wrap(msgContent);
-        int position = 0;
+        int stored = serverMsg.getContent(buf, 0);
 
-        while(position < bodySize)
+        if(bodySize != stored)
         {
-            position += serverMsg.getContent(buf, position);
-
+            LOGGER.error(String.format("An unexpected amount of content was 
retrieved " +
+                    "(expected %d, got %d bytes) when viewing content for 
message with ID %d " +
+                    "on queue '%s' in virtual host '%s'",
+                    bodySize, stored, messageId, _queue.getName(), 
_vhostMBean.getName()));
         }
 
         AMQMessageHeader header = serverMsg.getMessageHeader();
@@ -591,7 +596,7 @@ public class QueueMBean extends AMQManag
     }
 
 
-    private static class GetMessageVisitor implements QueueEntryVisitor
+    protected static class GetMessageVisitor implements QueueEntryVisitor
     {
 
         private final long _messageNumber;

Modified: 
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java?rev=1367084&r1=1367083&r2=1367084&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
 Mon Jul 30 13:36:00 2012
@@ -18,12 +18,14 @@
  */
 package org.apache.qpid.server.jmx.mbeans;
 
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Matchers.argThat;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -31,19 +33,26 @@ import javax.management.ListenerNotFound
 import javax.management.Notification;
 import javax.management.NotificationListener;
 import javax.management.OperationsException;
+import javax.management.openmbean.CompositeDataSupport;
 
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.server.jmx.ManagedObjectRegistry;
+import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Statistics;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.NotificationCheck;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
-import junit.framework.TestCase;
-
-public class QueueMBeanTest extends TestCase
+public class QueueMBeanTest extends QpidTestCase
 {
     private static final String QUEUE_NAME = "QUEUE_NAME";
     private static final String QUEUE_DESCRIPTION = "QUEUE_DESCRIPTION";
@@ -59,6 +68,7 @@ public class QueueMBeanTest extends Test
     @Override
     protected void setUp() throws Exception
     {
+        super.setUp();
         _mockQueue = mock(Queue.class);
         _mockQueueStatistics = mock(Statistics.class);
         when(_mockQueue.getName()).thenReturn(QUEUE_NAME);
@@ -365,4 +375,65 @@ public class QueueMBeanTest extends Test
         verify(_mockQueue).setAttribute(underlyingAttributeName, 
originalAttributeValue, newAttributeValue);
     }
 
+    public void testViewMessageContent() throws Exception
+    {
+        viewMessageContentTestImpl(16L, 1000, 1000);
+    }
+
+    public void testViewMessageContentWithMissingPayload() throws Exception
+    {
+        viewMessageContentTestImpl(16L, 1000, 0);
+    }
+
+    private void viewMessageContentTestImpl(final long messageNumber,
+                                       final int messageSize,
+                                       final int messageContentSize) throws 
Exception
+    {
+        final byte[] content = new byte[messageContentSize];
+
+        //mock message and queue entry to return a given message size, and 
have a given content
+        final ServerMessage<?> serverMessage = mock(ServerMessage.class);
+        when(serverMessage.getMessageNumber()).thenReturn(messageNumber);
+        when(serverMessage.getSize()).thenReturn((long)messageSize);
+        doAnswer(new Answer<Object>()
+        {
+            public Object answer(InvocationOnMock invocation)
+            {
+                Object[] args = invocation.getArguments();
+
+                //verify the arg types / expected values
+                assertEquals(2, args.length);
+                assertTrue(args[0] instanceof ByteBuffer);
+                assertTrue(args[1] instanceof Integer);
+
+                ByteBuffer dest = (ByteBuffer) args[0];
+                int offset = (Integer) args[1];
+                assertEquals(0, offset);
+
+                dest.put(content);
+                return messageContentSize;
+            }
+        }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class), 
Matchers.anyInt());
+
+        final QueueEntry entry = mock(QueueEntry.class);
+        when(entry.getMessage()).thenReturn(serverMessage);
+
+        //mock the queue.visit() method to ensure we match the mock message
+        doAnswer(new Answer<Object>()
+        {
+            public Object answer(InvocationOnMock invocation)
+            {
+                Object[] args = invocation.getArguments();
+                GetMessageVisitor visitor = (GetMessageVisitor) args[0];
+                visitor.visit(entry);
+                return null;
+            }
+        }).when(_mockQueue).visit(Matchers.any(GetMessageVisitor.class));
+
+        //now retrieve the content and verify its size
+        CompositeDataSupport comp = (CompositeDataSupport) 
_queueMBean.viewMessageContent(messageNumber);
+        assertNotNull(comp);
+        byte[] data = (byte[]) comp.get(ManagedQueue.CONTENT);
+        assertEquals(messageSize, data.length);
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to