Author: kwall
Date: Mon Jun 25 16:23:34 2012
New Revision: 1353637

URL: http://svn.apache.org/viewvc?rev=1353637&view=rev
Log:
NO-JIRA: Implement replacement for AMQQueueAlertTest.

Introduce unit tests for queue alerting around each component 
(NotificationCheck, SimpleAMQQueue, and QueueMBean).
Add system tests to test queue alerting end-to-end for JMX managament 
interface..

Added:
    
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
Modified:
    
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
    
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
    
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
    
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
 Mon Jun 25 16:23:34 2012
@@ -38,7 +38,7 @@ import javax.management.NotificationList
 public abstract class AMQManagedObject extends DefaultManagedObject
                                        implements NotificationBroadcaster
 {
-    private NotificationBroadcasterSupport _broadcaster = new 
NotificationBroadcasterSupport();
+    private final NotificationBroadcasterSupport _broadcaster = new 
NotificationBroadcasterSupport();
 
     private AtomicLong _notificationSequenceNumber = new AtomicLong();
 

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
 Mon Jun 25 16:23:34 2012
@@ -21,16 +21,23 @@ package org.apache.qpid.server.jmx.mbean
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.argThat;
 
 import java.util.Arrays;
 import java.util.Collections;
 
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
 import javax.management.OperationsException;
 
 import org.apache.qpid.server.jmx.ManagedObjectRegistry;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.NotificationCheck;
+import org.mockito.ArgumentMatcher;
 
 import junit.framework.TestCase;
 
@@ -50,6 +57,7 @@ public class QueueMBeanTest extends Test
     protected void setUp() throws Exception
     {
         _mockQueue = mock(Queue.class);
+        when(_mockQueue.getName()).thenReturn(QUEUE_NAME);
         _mockVirtualHostMBean = mock(VirtualHostMBean.class);
 
         _mockManagedObjectRegistry = mock(ManagedObjectRegistry.class);
@@ -60,8 +68,6 @@ public class QueueMBeanTest extends Test
 
     public void testQueueName()
     {
-        when(_mockQueue.getName()).thenReturn(QUEUE_NAME);
-
         assertEquals(QUEUE_NAME, _queueMBean.getName());
     }
 
@@ -146,4 +152,51 @@ public class QueueMBeanTest extends Test
         _queueMBean.setAlternateExchange("");
         verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null);
     }
+
+    public void testNotificationListenerCalled() throws Exception
+    {
+        NotificationListener listener = mock(NotificationListener.class);
+        _queueMBean.addNotificationListener(listener, null, null);
+
+        NotificationCheck notification = mock(NotificationCheck.class);
+        String notificationMsg = "Test notification message";
+
+        _queueMBean.notifyClients(notification, _mockQueue, notificationMsg);
+        
verify(listener).handleNotification(isNotificationWithMessage(notificationMsg),
+                                            isNull());
+    }
+
+    public void testAddRemoveNotificationListener() throws Exception
+    {
+        NotificationListener listener1 = mock(NotificationListener.class);
+        _queueMBean.addNotificationListener(listener1, null, null);
+        _queueMBean.removeNotificationListener(listener1);
+    }
+
+    public void testRemoveUnknownNotificationListener() throws Exception
+    {
+        NotificationListener listener1 = mock(NotificationListener.class);
+        try
+        {
+            _queueMBean.removeNotificationListener(listener1);
+            fail("Exception not thrown");
+        }
+        catch (ListenerNotFoundException e)
+        {
+            // PASS
+        }
+    }
+
+    private Notification isNotificationWithMessage(final String 
expectedMessage)
+    {
+        return argThat( new ArgumentMatcher<Notification>()
+        {
+            @Override
+            public boolean matches(Object argument)
+            {
+                Notification actual = (Notification) argument;
+                return actual.getMessage().endsWith(expectedMessage);
+            }
+        });
+    }
 }

Added: 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java?rev=1353637&view=auto
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
 (added)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
 Mon Jun 25 16:23:34 2012
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import static org.apache.qpid.server.queue.NotificationCheck.MESSAGE_AGE_ALERT;
+import static 
org.apache.qpid.server.queue.NotificationCheck.MESSAGE_COUNT_ALERT;
+import static 
org.apache.qpid.server.queue.NotificationCheck.MESSAGE_SIZE_ALERT;
+import static org.apache.qpid.server.queue.NotificationCheck.QUEUE_DEPTH_ALERT;
+
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue.NotificationListener;
+
+public class NotificationCheckTest extends TestCase
+{
+
+    private ServerMessage<?> _message = mock(ServerMessage.class);
+    private AMQQueue _queue = mock(AMQQueue.class);
+    private NotificationListener _listener = mock(NotificationListener.class);
+
+    public void testMessageCountAlertFires() throws Exception
+    {
+        when(_queue.getMaximumMessageCount()).thenReturn(1000l);
+        when(_queue.getMessageCount()).thenReturn(999, 1000, 1001);
+
+        MESSAGE_COUNT_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verifyZeroInteractions(_listener);
+
+        MESSAGE_COUNT_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verify(_listener).notifyClients(eq(MESSAGE_COUNT_ALERT), eq(_queue), 
eq("1000: Maximum count on queue threshold (1000) breached."));
+
+        MESSAGE_COUNT_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verify(_listener).notifyClients(eq(MESSAGE_COUNT_ALERT), eq(_queue), 
eq("1001: Maximum count on queue threshold (1000) breached."));
+    }
+
+    public void testMessageSizeAlertFires() throws Exception
+    {
+        when(_queue.getMaximumMessageSize()).thenReturn(1024l);
+        when(_message.getSize()).thenReturn(1023l, 1024l, 1025l);
+
+        MESSAGE_SIZE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verifyZeroInteractions(_listener);
+
+        MESSAGE_SIZE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verify(_listener).notifyClients(eq(MESSAGE_SIZE_ALERT), eq(_queue), 
contains("1024b : Maximum message size threshold (1024) breached."));
+
+        MESSAGE_SIZE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verify(_listener).notifyClients(eq(MESSAGE_SIZE_ALERT), eq(_queue), 
contains("1025b : Maximum message size threshold (1024) breached."));
+    }
+
+    public void testMessageAgeAlertFires() throws Exception
+    {
+        long now = System.currentTimeMillis();
+        when(_queue.getMaximumMessageAge()).thenReturn(1000l);
+        when(_queue.getOldestMessageArrivalTime()).thenReturn(now, now - 
15000);
+
+        MESSAGE_AGE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verifyZeroInteractions(_listener);
+
+        MESSAGE_AGE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        // Uses contains as first part of message is nondeterministic
+        verify(_listener).notifyClients(eq(MESSAGE_AGE_ALERT), eq(_queue), 
contains("s : Maximum age on queue threshold (1s) breached."));
+    }
+
+    public void testQueueDepthAlertFires() throws Exception
+    {
+        when(_queue.getMaximumQueueDepth()).thenReturn(1024l);
+        when(_queue.getQueueDepth()).thenReturn(1023l, 1024l, 2048l);
+
+        QUEUE_DEPTH_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verifyZeroInteractions(_listener);
+
+        QUEUE_DEPTH_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verify(_listener).notifyClients(eq(QUEUE_DEPTH_ALERT), eq(_queue), 
eq("1Kb : Maximum queue depth threshold (1Kb) breached."));
+
+        QUEUE_DEPTH_ALERT.notifyIfNecessary(_message, _queue, _listener);
+        verify(_listener).notifyClients(eq(QUEUE_DEPTH_ALERT), eq(_queue), 
eq("2Kb : Maximum queue depth threshold (1Kb) breached."));
+    }
+}

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Mon Jun 25 16:23:34 2012
@@ -21,6 +21,13 @@
 
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+
 import org.apache.commons.configuration.PropertiesConfiguration;
 
 import org.apache.qpid.AMQException;
@@ -79,7 +86,6 @@ public class SimpleAMQQueueTest extends 
 
         public void setExchange(AMQShortString exchange)
         {
-            //To change body of implemented methods use File | Settings | File 
Templates.
         }
 
         public boolean isImmediate()
@@ -1096,7 +1102,7 @@ public class SimpleAMQQueueTest extends 
     /**
      * Tests that entry in dequeued state are not enqueued and not delivered 
to subscription
      */
-    public void testEqueueDequeuedEntry()
+    public void testEnqueueDequeuedEntry()
     {
         // create a queue where each even entry is considered a dequeued
         SimpleAMQQueue queue = new 
SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), false,
@@ -1231,6 +1237,39 @@ public class SimpleAMQQueueTest extends 
         assertEquals("Unexpected active consumer count", 1, 
queue.getActiveConsumerCount());
     }
 
+    public void testNotificationFiredOnEnqueue() throws Exception
+    {
+        AMQQueue.NotificationListener listener = 
mock(AMQQueue.NotificationListener.class);
+
+        _queue.setNotificationListener(listener);
+        _queue.setMaximumMessageCount(2);
+
+        _queue.enqueue(createMessage(new Long(24)));
+        verifyZeroInteractions(listener);
+
+        _queue.enqueue(createMessage(new Long(25)));
+
+        verify(listener, 
atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), 
eq(_queue), contains("Maximum count on queue threshold"));
+    }
+
+    public void testNotificationFiredAsync() throws Exception
+    {
+        AMQQueue.NotificationListener listener = 
mock(AMQQueue.NotificationListener.class);
+
+        _queue.enqueue(createMessage(new Long(24)));
+        _queue.enqueue(createMessage(new Long(25)));
+        _queue.enqueue(createMessage(new Long(26)));
+
+        _queue.setNotificationListener(listener);
+        _queue.setMaximumMessageCount(2);
+
+        verifyZeroInteractions(listener);
+
+        _queue.checkMessageStatus();
+
+        verify(listener, 
atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), 
eq(_queue), contains("Maximum count on queue threshold"));
+    }
+
     /**
      * A helper method to create a queue with given name
      *

Modified: 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
 Mon Jun 25 16:23:34 2012
@@ -27,6 +27,8 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.management.common.mbeans.ManagedBroker;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.NotificationCheckTest;
+import org.apache.qpid.server.queue.SimpleAMQQueueTest;
 import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
 import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -39,6 +41,8 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.management.Notification;
+import javax.management.NotificationListener;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
 
@@ -54,6 +58,7 @@ import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Tests the JMX API for the Managed Queue.
@@ -61,6 +66,7 @@ import java.util.concurrent.atomic.Atomi
  */
 public class ManagedQueueMBeanTest extends QpidBrokerTestCase
 {
+
     private static final Logger LOGGER = 
Logger.getLogger(ManagedQueueMBeanTest.class);
 
     private static final String VIRTUAL_HOST = "test";
@@ -286,6 +292,44 @@ public class ManagedQueueMBeanTest exten
     }
 
     /**
+     * Tests the ability to receive queue alerts as JMX notifications.
+     *
+     * @see NotificationCheckTest
+     * @see SimpleAMQQueueTest#testNotificationFiredAsync()
+     * @see SimpleAMQQueueTest#testNotificationFiredOnEnqueue()
+     */
+    public void testQueueNotification() throws Exception
+    {
+        final String queueName = getName();
+        final long maximumMessageCount = 3;
+
+        Queue queue = _session.createQueue(queueName);
+        createQueueOnBroker(queue);
+
+        ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+        managedQueue.setMaximumMessageCount(maximumMessageCount);
+
+        RecordingNotificationListener listener = new 
RecordingNotificationListener(1);
+
+        
_jmxUtils.addNotificationListener(_jmxUtils.getQueueObjectName(VIRTUAL_HOST, 
queueName), listener, null, null);
+
+        // Send two messages - this should *not* trigger the notification
+        sendMessage(_session, queue, 2);
+
+        assertEquals("Premature notification received", 0, 
listener.getNumberOfNotificationsReceived());
+
+        // A further message should trigger the message count alert
+        sendMessage(_session, queue, 1);
+
+        listener.awaitExpectedNotifications(5, TimeUnit.SECONDS);
+
+        assertEquals("Unexpected number of JMX notifications received", 1, 
listener.getNumberOfNotificationsReceived());
+
+        Notification notification = listener.getLastNotification();
+        assertEquals("Unexpected notification message", "MESSAGE_COUNT_ALERT 
3: Maximum count on queue threshold (3) breached.", notification.getMessage());
+    }
+
+    /**
      * Tests {@link ManagedQueue#viewMessages(long, long)} interface.
      */
     public void testViewSingleMessage() throws Exception
@@ -525,4 +569,41 @@ public class ManagedQueueMBeanTest exten
         ((AMQSession<?,?>)session).sync();
     }
 
+    private final class RecordingNotificationListener implements 
NotificationListener
+    {
+        private final CountDownLatch _notificationReceivedLatch;
+        private final AtomicInteger _numberOfNotifications;
+        private final AtomicReference<Notification> _lastNotification;
+
+        private RecordingNotificationListener(int 
expectedNumberOfNotifications)
+        {
+            _notificationReceivedLatch = new 
CountDownLatch(expectedNumberOfNotifications);
+            _numberOfNotifications = new AtomicInteger(0);
+            _lastNotification = new AtomicReference<Notification>();
+        }
+
+        @Override
+        public void handleNotification(Notification notification, Object 
handback)
+        {
+            _lastNotification.set(notification);
+            _numberOfNotifications.incrementAndGet();
+            _notificationReceivedLatch.countDown();
+        }
+
+        public int getNumberOfNotificationsReceived()
+        {
+            return _numberOfNotifications.get();
+        }
+
+        public Notification getLastNotification()
+        {
+            return _lastNotification.get();
+        }
+
+        public void awaitExpectedNotifications(long timeout, TimeUnit 
timeunit) throws InterruptedException
+        {
+            _notificationReceivedLatch.await(timeout, timeunit);
+        }
+    }
+
 }

Modified: 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
 Mon Jun 25 16:23:34 2012
@@ -33,11 +33,15 @@ import org.apache.qpid.management.common
 import org.apache.qpid.management.common.mbeans.ServerInformation;
 import org.apache.qpid.management.common.mbeans.UserManagement;
 
+import javax.management.InstanceNotFoundException;
 import javax.management.JMException;
+import javax.management.ListenerNotFoundException;
 import javax.management.MBeanException;
 import javax.management.MBeanServerConnection;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.MalformedObjectNameException;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
 import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import java.io.IOException;
@@ -101,6 +105,18 @@ public class JMXTestUtils
         }
     }
 
+    public void addNotificationListener(ObjectName name, NotificationListener 
listener, NotificationFilter filter, Object handback)
+            throws InstanceNotFoundException, IOException
+    {
+        _mbsc.addNotificationListener(name, listener, filter, handback);
+    }
+
+    public void removeNotificationListener(ObjectName name, 
NotificationListener listener)
+            throws InstanceNotFoundException, IOException, 
ListenerNotFoundException
+    {
+        _mbsc.removeNotificationListener(name, listener);
+    }
+
     /**
      * Create a non-durable exchange with the requested name
      *



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to