Author: robbie
Date: Thu Nov 29 11:26:30 2012
New Revision: 1415127
URL: http://svn.apache.org/viewvc?rev=1415127&view=rev
Log:
QPID-4441: add system test verification of the priority queue and queue with
DLQ following the store upgrade
Modified:
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1415127&r1=1415126&r2=1415127&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
Thu Nov 29 11:26:30 2012
@@ -22,7 +22,9 @@ package org.apache.qpid.server.store.ber
import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
@@ -32,6 +34,7 @@ import java.io.File;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -43,7 +46,10 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -311,6 +317,104 @@ public class BDBUpgradeTest extends Qpid
}
}
+ /**
+ * Tests store upgrade has maintained the priority queue configuration,
+ * such that sending messages with priorities out-of-order and then
consuming
+ * them gets the messages back in priority order.
+ */
+ public void testPriorityQueue() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // send some messages to the priority queue
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.setPriority(4);
+ producer.send(createMessage(1, false, session, producer));
+ producer.setPriority(1);
+ producer.send(createMessage(2, false, session, producer));
+ producer.setPriority(9);
+ producer.send(createMessage(3, false, session, producer));
+ session.close();
+
+ //consume the messages, expected order: msg 3, msg 1, msg 2.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(2, msg.getIntProperty("msg"));
+ }
+
+ /**
+ * Test that the queue configured to have a DLQ was recovered and has the
alternate exchange
+ * and max delivery count, the DLE exists, the DLQ exists with no max
delivery count, the
+ * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
+ *
+ * DLQs are NOT enabled at the virtualhost level, we are testing recovery
of the arguments
+ * that turned it on for this specific queue.
+ */
+ public void testRecoveryOfQueueWithDLQ() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ //verify the DLE exchange exists, has the expected type, and a
single binding for the DLQ
+ ManagedExchange exchange =
jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE");
+ assertEquals("Wrong exchange type", "fanout",
exchange.getExchangeType());
+ TabularDataSupport bindings = (TabularDataSupport)
exchange.bindings();
+ assertEquals(1, bindings.size());
+ for(Object o : bindings.values())
+ {
+ CompositeData binding = (CompositeData) o;
+
+ String bindingKey = (String)
binding.get(ManagedExchange.BINDING_KEY);
+ String[] queueNames = (String[])
binding.get(ManagedExchange.QUEUE_NAMES);
+
+ //Because its a fanout exchange, we just return a single '*'
key with all bound queues
+ assertEquals("unexpected binding key", "*", bindingKey);
+ assertEquals("unexpected number of queues bound", 1,
queueNames.length);
+ assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME +
"_DLQ", queueNames[0]);
+ }
+
+ //verify the queue exists, has the expected alternate exchange and
max delivery count
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME);
+ assertEquals("Queue does not have the expected AlternateExchange",
QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count",
Integer.valueOf(2), queue.getMaximumDeliveryCount());
+
+ ManagedQueue dlQqueue =
jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ");
+ assertNull("Queue should not have an AlternateExchange",
dlQqueue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count",
Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount());
+
+ String dlqDlqObjectNameString =
jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" +
"_DLQ");
+ assertFalse("a DLQ should not exist for the DLQ itself",
jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString));
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
private void consumeDurableSubscriptionMessages(Connection connection,
boolean selector) throws Exception
{
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -388,4 +492,11 @@ public class BDBUpgradeTest extends Qpid
session.close();
}
+ private Message createMessage(int msgId, boolean first, Session
producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msgId);
+ send.setIntProperty("msg", msgId);
+
+ return send;
+ }
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=1415127&r1=1415126&r2=1415127&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
Thu Nov 29 11:26:30 2012
@@ -286,9 +286,7 @@ public class JMXTestUtils
public ObjectName getQueueObjectName(String virtualHostName, String queue)
{
// Get the name of the test manager
- String query = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
- + ObjectName.quote(virtualHostName) + ",name="
- + ObjectName.quote(queue) + ",*";
+ String query = getQueueObjectNameString(virtualHostName, queue);
Set<ObjectName> objectNames = queryObjects(query);
@@ -301,6 +299,12 @@ public class JMXTestUtils
return objectName;
}
+ public String getQueueObjectNameString(String virtualHostName, String
queue) {
+ return "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
+ + ObjectName.quote(virtualHostName) + ",name="
+ + ObjectName.quote(queue) + ",*";
+ }
+
/**
* Generate the ObjectName for the given Exchange on a VirtualHost.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]