Author: kwall
Date: Sun Nov 25 22:38:22 2012
New Revision: 1413433
URL: http://svn.apache.org/viewvc?rev=1413433&view=rev
Log:
QPID-4470: Allow 'maximum queue depth' queue alerting threshold to be set from
JMX/AMQP
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1413433&r1=1413432&r2=1413433&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
Sun Nov 25 22:38:22 2012
@@ -50,15 +50,16 @@ final class QueueAdapter extends Abstrac
static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String,
String>();
static
{
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP,
"x-qpid-minimum-alert-repeat-gap");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE,
"x-qpid-maximum-message-age");
-
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE,
"x-qpid-maximum-message-size");
-
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
"x-qpid-maximum-message-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP,
AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE,
AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE);
+
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE,
AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE);
+
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT);
+
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS,
"x-qpid-maximum-delivery-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS,
AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT);
-
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES,
"x-qpid-capacity");
-
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,
"x-qpid-flow-resume-capacity");
+
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES,
AMQQueueFactory.X_QPID_CAPACITY);
+
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,
AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY,
AMQQueueFactory.QPID_QUEUE_SORT_KEY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY,
AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1413433&r1=1413432&r2=1413433&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
Sun Nov 25 22:38:22 2012
@@ -41,6 +41,14 @@ import org.apache.qpid.server.virtualhos
public class AMQQueueFactory
{
+ public static final String X_QPID_FLOW_RESUME_CAPACITY =
"x-qpid-flow-resume-capacity";
+ public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP =
"x-qpid-minimum-alert-repeat-gap";
+ public static final String X_QPID_MAXIMUM_MESSAGE_COUNT =
"x-qpid-maximum-message-count";
+ public static final String X_QPID_MAXIMUM_MESSAGE_SIZE =
"x-qpid-maximum-message-size";
+ public static final String X_QPID_MAXIMUM_MESSAGE_AGE =
"x-qpid-maximum-message-age";
+ public static final String X_QPID_MAXIMUM_QUEUE_DEPTH =
"x-qpid-maximum-queue-depth";
+
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
@@ -119,42 +127,49 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty("x-qpid-maximum-message-age")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-size")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-count")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumQueueDepth(value);
+ }
+ },
+ new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty("x-qpid-capacity")
+ new QueueLongProperty(X_QPID_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty("x-qpid-flow-resume-capacity")
+ new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java?rev=1413433&r1=1413432&r2=1413433&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
Sun Nov 25 22:38:22 2012
@@ -206,13 +206,41 @@ public class QueueManagementTest extends
managedBroker.createNewQueue(queueName, null, true, arguments);
// Ensure the queue exists
- assertNotNull("Queue object name expected to exist",
_jmxUtils.getQueueObjectName("test", queueName));
+ assertNotNull("Queue object name expected to exist",
_jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName));
assertNotNull("Manager queue expected to be available",
_jmxUtils.getManagedQueue(queueName));
final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
assertEquals("Unexpected maximum delivery count", deliveryCount,
managedQueue.getMaximumDeliveryCount());
}
+ public void testCreateQueueWithAlertingThresholdsSet() throws Exception
+ {
+ final String queueName = getName();
+ final ManagedBroker managedBroker =
_jmxUtils.getManagedBroker(VIRTUAL_HOST);
+
+ final Long maximumMessageCount = 100l;
+ final Long maximumMessageSize = 200l;
+ final Long maximumQueueDepth = 300l;
+ final Long maximumMessageAge = 400l;
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT,
maximumMessageCount);
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE,
maximumMessageSize);
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH,
maximumQueueDepth);
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE,
maximumMessageAge);
+
+ managedBroker.createNewQueue(queueName, null, true, arguments);
+
+ // Ensure the queue exists
+ assertNotNull("Queue object name expected to exist",
_jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName));
+ assertNotNull("Manager queue expected to be available",
_jmxUtils.getManagedQueue(queueName));
+
+ ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ assertEquals("Unexpected maximum message count", maximumMessageCount,
managedQueue.getMaximumMessageCount());
+ assertEquals("Unexpected maximum message size", maximumMessageSize,
managedQueue.getMaximumMessageSize());
+ assertEquals("Unexpected maximum queue depth", maximumQueueDepth,
managedQueue.getMaximumQueueDepth());
+ assertEquals("Unexpected maximum message age", maximumMessageAge,
managedQueue.getMaximumMessageAge());
+ }
+
/**
* Requires 0-10 as relies on ADDR addresses.
* @see AddressBasedDestinationTest for the testing of message routing to
the alternate exchange
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]