Author: kwall
Date: Mon Jun 25 16:23:34 2012
New Revision: 1353637
URL: http://svn.apache.org/viewvc?rev=1353637&view=rev
Log:
NO-JIRA: Implement replacement for AMQQueueAlertTest.
Introduce unit tests for queue alerting around each component
(NotificationCheck, SimpleAMQQueue, and QueueMBean).
Add system tests to test queue alerting end-to-end for JMX managament
interface..
Added:
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/AMQManagedObject.java
Mon Jun 25 16:23:34 2012
@@ -38,7 +38,7 @@ import javax.management.NotificationList
public abstract class AMQManagedObject extends DefaultManagedObject
implements NotificationBroadcaster
{
- private NotificationBroadcasterSupport _broadcaster = new
NotificationBroadcasterSupport();
+ private final NotificationBroadcasterSupport _broadcaster = new
NotificationBroadcasterSupport();
private AtomicLong _notificationSequenceNumber = new AtomicLong();
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
Mon Jun 25 16:23:34 2012
@@ -21,16 +21,23 @@ package org.apache.qpid.server.jmx.mbean
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.argThat;
import java.util.Arrays;
import java.util.Collections;
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
import javax.management.OperationsException;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.NotificationCheck;
+import org.mockito.ArgumentMatcher;
import junit.framework.TestCase;
@@ -50,6 +57,7 @@ public class QueueMBeanTest extends Test
protected void setUp() throws Exception
{
_mockQueue = mock(Queue.class);
+ when(_mockQueue.getName()).thenReturn(QUEUE_NAME);
_mockVirtualHostMBean = mock(VirtualHostMBean.class);
_mockManagedObjectRegistry = mock(ManagedObjectRegistry.class);
@@ -60,8 +68,6 @@ public class QueueMBeanTest extends Test
public void testQueueName()
{
- when(_mockQueue.getName()).thenReturn(QUEUE_NAME);
-
assertEquals(QUEUE_NAME, _queueMBean.getName());
}
@@ -146,4 +152,51 @@ public class QueueMBeanTest extends Test
_queueMBean.setAlternateExchange("");
verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null);
}
+
+ public void testNotificationListenerCalled() throws Exception
+ {
+ NotificationListener listener = mock(NotificationListener.class);
+ _queueMBean.addNotificationListener(listener, null, null);
+
+ NotificationCheck notification = mock(NotificationCheck.class);
+ String notificationMsg = "Test notification message";
+
+ _queueMBean.notifyClients(notification, _mockQueue, notificationMsg);
+
verify(listener).handleNotification(isNotificationWithMessage(notificationMsg),
+ isNull());
+ }
+
+ public void testAddRemoveNotificationListener() throws Exception
+ {
+ NotificationListener listener1 = mock(NotificationListener.class);
+ _queueMBean.addNotificationListener(listener1, null, null);
+ _queueMBean.removeNotificationListener(listener1);
+ }
+
+ public void testRemoveUnknownNotificationListener() throws Exception
+ {
+ NotificationListener listener1 = mock(NotificationListener.class);
+ try
+ {
+ _queueMBean.removeNotificationListener(listener1);
+ fail("Exception not thrown");
+ }
+ catch (ListenerNotFoundException e)
+ {
+ // PASS
+ }
+ }
+
+ private Notification isNotificationWithMessage(final String
expectedMessage)
+ {
+ return argThat( new ArgumentMatcher<Notification>()
+ {
+ @Override
+ public boolean matches(Object argument)
+ {
+ Notification actual = (Notification) argument;
+ return actual.getMessage().endsWith(expectedMessage);
+ }
+ });
+ }
}
Added:
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java?rev=1353637&view=auto
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
(added)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
Mon Jun 25 16:23:34 2012
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import static org.apache.qpid.server.queue.NotificationCheck.MESSAGE_AGE_ALERT;
+import static
org.apache.qpid.server.queue.NotificationCheck.MESSAGE_COUNT_ALERT;
+import static
org.apache.qpid.server.queue.NotificationCheck.MESSAGE_SIZE_ALERT;
+import static org.apache.qpid.server.queue.NotificationCheck.QUEUE_DEPTH_ALERT;
+
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue.NotificationListener;
+
+public class NotificationCheckTest extends TestCase
+{
+
+ private ServerMessage<?> _message = mock(ServerMessage.class);
+ private AMQQueue _queue = mock(AMQQueue.class);
+ private NotificationListener _listener = mock(NotificationListener.class);
+
+ public void testMessageCountAlertFires() throws Exception
+ {
+ when(_queue.getMaximumMessageCount()).thenReturn(1000l);
+ when(_queue.getMessageCount()).thenReturn(999, 1000, 1001);
+
+ MESSAGE_COUNT_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verifyZeroInteractions(_listener);
+
+ MESSAGE_COUNT_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verify(_listener).notifyClients(eq(MESSAGE_COUNT_ALERT), eq(_queue),
eq("1000: Maximum count on queue threshold (1000) breached."));
+
+ MESSAGE_COUNT_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verify(_listener).notifyClients(eq(MESSAGE_COUNT_ALERT), eq(_queue),
eq("1001: Maximum count on queue threshold (1000) breached."));
+ }
+
+ public void testMessageSizeAlertFires() throws Exception
+ {
+ when(_queue.getMaximumMessageSize()).thenReturn(1024l);
+ when(_message.getSize()).thenReturn(1023l, 1024l, 1025l);
+
+ MESSAGE_SIZE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verifyZeroInteractions(_listener);
+
+ MESSAGE_SIZE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verify(_listener).notifyClients(eq(MESSAGE_SIZE_ALERT), eq(_queue),
contains("1024b : Maximum message size threshold (1024) breached."));
+
+ MESSAGE_SIZE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verify(_listener).notifyClients(eq(MESSAGE_SIZE_ALERT), eq(_queue),
contains("1025b : Maximum message size threshold (1024) breached."));
+ }
+
+ public void testMessageAgeAlertFires() throws Exception
+ {
+ long now = System.currentTimeMillis();
+ when(_queue.getMaximumMessageAge()).thenReturn(1000l);
+ when(_queue.getOldestMessageArrivalTime()).thenReturn(now, now -
15000);
+
+ MESSAGE_AGE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verifyZeroInteractions(_listener);
+
+ MESSAGE_AGE_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ // Uses contains as first part of message is nondeterministic
+ verify(_listener).notifyClients(eq(MESSAGE_AGE_ALERT), eq(_queue),
contains("s : Maximum age on queue threshold (1s) breached."));
+ }
+
+ public void testQueueDepthAlertFires() throws Exception
+ {
+ when(_queue.getMaximumQueueDepth()).thenReturn(1024l);
+ when(_queue.getQueueDepth()).thenReturn(1023l, 1024l, 2048l);
+
+ QUEUE_DEPTH_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verifyZeroInteractions(_listener);
+
+ QUEUE_DEPTH_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verify(_listener).notifyClients(eq(QUEUE_DEPTH_ALERT), eq(_queue),
eq("1Kb : Maximum queue depth threshold (1Kb) breached."));
+
+ QUEUE_DEPTH_ALERT.notifyIfNecessary(_message, _queue, _listener);
+ verify(_listener).notifyClients(eq(QUEUE_DEPTH_ALERT), eq(_queue),
eq("2Kb : Maximum queue depth threshold (1Kb) breached."));
+ }
+}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Mon Jun 25 16:23:34 2012
@@ -21,6 +21,13 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -79,7 +86,6 @@ public class SimpleAMQQueueTest extends
public void setExchange(AMQShortString exchange)
{
- //To change body of implemented methods use File | Settings | File
Templates.
}
public boolean isImmediate()
@@ -1096,7 +1102,7 @@ public class SimpleAMQQueueTest extends
/**
* Tests that entry in dequeued state are not enqueued and not delivered
to subscription
*/
- public void testEqueueDequeuedEntry()
+ public void testEnqueueDequeuedEntry()
{
// create a queue where each even entry is considered a dequeued
SimpleAMQQueue queue = new
SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), false,
@@ -1231,6 +1237,39 @@ public class SimpleAMQQueueTest extends
assertEquals("Unexpected active consumer count", 1,
queue.getActiveConsumerCount());
}
+ public void testNotificationFiredOnEnqueue() throws Exception
+ {
+ AMQQueue.NotificationListener listener =
mock(AMQQueue.NotificationListener.class);
+
+ _queue.setNotificationListener(listener);
+ _queue.setMaximumMessageCount(2);
+
+ _queue.enqueue(createMessage(new Long(24)));
+ verifyZeroInteractions(listener);
+
+ _queue.enqueue(createMessage(new Long(25)));
+
+ verify(listener,
atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT),
eq(_queue), contains("Maximum count on queue threshold"));
+ }
+
+ public void testNotificationFiredAsync() throws Exception
+ {
+ AMQQueue.NotificationListener listener =
mock(AMQQueue.NotificationListener.class);
+
+ _queue.enqueue(createMessage(new Long(24)));
+ _queue.enqueue(createMessage(new Long(25)));
+ _queue.enqueue(createMessage(new Long(26)));
+
+ _queue.setNotificationListener(listener);
+ _queue.setMaximumMessageCount(2);
+
+ verifyZeroInteractions(listener);
+
+ _queue.checkMessageStatus();
+
+ verify(listener,
atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT),
eq(_queue), contains("Maximum count on queue threshold"));
+ }
+
/**
* A helper method to create a queue with given name
*
Modified:
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
Mon Jun 25 16:23:34 2012
@@ -27,6 +27,8 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.NotificationCheckTest;
+import org.apache.qpid.server.queue.SimpleAMQQueueTest;
import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -39,6 +41,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.management.Notification;
+import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@@ -54,6 +58,7 @@ import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Tests the JMX API for the Managed Queue.
@@ -61,6 +66,7 @@ import java.util.concurrent.atomic.Atomi
*/
public class ManagedQueueMBeanTest extends QpidBrokerTestCase
{
+
private static final Logger LOGGER =
Logger.getLogger(ManagedQueueMBeanTest.class);
private static final String VIRTUAL_HOST = "test";
@@ -286,6 +292,44 @@ public class ManagedQueueMBeanTest exten
}
/**
+ * Tests the ability to receive queue alerts as JMX notifications.
+ *
+ * @see NotificationCheckTest
+ * @see SimpleAMQQueueTest#testNotificationFiredAsync()
+ * @see SimpleAMQQueueTest#testNotificationFiredOnEnqueue()
+ */
+ public void testQueueNotification() throws Exception
+ {
+ final String queueName = getName();
+ final long maximumMessageCount = 3;
+
+ Queue queue = _session.createQueue(queueName);
+ createQueueOnBroker(queue);
+
+ ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ managedQueue.setMaximumMessageCount(maximumMessageCount);
+
+ RecordingNotificationListener listener = new
RecordingNotificationListener(1);
+
+
_jmxUtils.addNotificationListener(_jmxUtils.getQueueObjectName(VIRTUAL_HOST,
queueName), listener, null, null);
+
+ // Send two messages - this should *not* trigger the notification
+ sendMessage(_session, queue, 2);
+
+ assertEquals("Premature notification received", 0,
listener.getNumberOfNotificationsReceived());
+
+ // A further message should trigger the message count alert
+ sendMessage(_session, queue, 1);
+
+ listener.awaitExpectedNotifications(5, TimeUnit.SECONDS);
+
+ assertEquals("Unexpected number of JMX notifications received", 1,
listener.getNumberOfNotificationsReceived());
+
+ Notification notification = listener.getLastNotification();
+ assertEquals("Unexpected notification message", "MESSAGE_COUNT_ALERT
3: Maximum count on queue threshold (3) breached.", notification.getMessage());
+ }
+
+ /**
* Tests {@link ManagedQueue#viewMessages(long, long)} interface.
*/
public void testViewSingleMessage() throws Exception
@@ -525,4 +569,41 @@ public class ManagedQueueMBeanTest exten
((AMQSession<?,?>)session).sync();
}
+ private final class RecordingNotificationListener implements
NotificationListener
+ {
+ private final CountDownLatch _notificationReceivedLatch;
+ private final AtomicInteger _numberOfNotifications;
+ private final AtomicReference<Notification> _lastNotification;
+
+ private RecordingNotificationListener(int
expectedNumberOfNotifications)
+ {
+ _notificationReceivedLatch = new
CountDownLatch(expectedNumberOfNotifications);
+ _numberOfNotifications = new AtomicInteger(0);
+ _lastNotification = new AtomicReference<Notification>();
+ }
+
+ @Override
+ public void handleNotification(Notification notification, Object
handback)
+ {
+ _lastNotification.set(notification);
+ _numberOfNotifications.incrementAndGet();
+ _notificationReceivedLatch.countDown();
+ }
+
+ public int getNumberOfNotificationsReceived()
+ {
+ return _numberOfNotifications.get();
+ }
+
+ public Notification getLastNotification()
+ {
+ return _lastNotification.get();
+ }
+
+ public void awaitExpectedNotifications(long timeout, TimeUnit
timeunit) throws InterruptedException
+ {
+ _notificationReceivedLatch.await(timeout, timeunit);
+ }
+ }
+
}
Modified:
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=1353637&r1=1353636&r2=1353637&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
Mon Jun 25 16:23:34 2012
@@ -33,11 +33,15 @@ import org.apache.qpid.management.common
import org.apache.qpid.management.common.mbeans.ServerInformation;
import org.apache.qpid.management.common.mbeans.UserManagement;
+import javax.management.InstanceNotFoundException;
import javax.management.JMException;
+import javax.management.ListenerNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import java.io.IOException;
@@ -101,6 +105,18 @@ public class JMXTestUtils
}
}
+ public void addNotificationListener(ObjectName name, NotificationListener
listener, NotificationFilter filter, Object handback)
+ throws InstanceNotFoundException, IOException
+ {
+ _mbsc.addNotificationListener(name, listener, filter, handback);
+ }
+
+ public void removeNotificationListener(ObjectName name,
NotificationListener listener)
+ throws InstanceNotFoundException, IOException,
ListenerNotFoundException
+ {
+ _mbsc.removeNotificationListener(name, listener);
+ }
+
/**
* Create a non-durable exchange with the requested name
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]