Author: robbie
Date: Tue Apr 3 21:21:57 2012
New Revision: 1309155
URL: http://svn.apache.org/viewvc?rev=1309155&view=rev
Log:
QPID-3927: add a systest which would highlight the underlying issue by failing
to receive all messages present on the priority queue
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1309155&r1=1309154&r2=1309155&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
Tue Apr 3 21:21:57 2012
@@ -21,8 +21,10 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -30,12 +32,15 @@ import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class PriorityQueueTest extends QpidBrokerTestCase
{
@@ -197,4 +202,99 @@ public class PriorityQueueTest extends Q
return send;
}
+
+ /**
+ * Test that after sending an initial message with priority 0, it is able
to be repeatedly reflected back to the queue using
+ * default priority and then consumed again, with separate transacted
sessions with prefetch 1 for producer and consumer.
+ *
+ * Highlighted defect with PriorityQueues resolved in QPID-3927.
+ */
+ public void
testMessageReflectionWithPriorityIncreaseOnTransactedSessionsWithPrefetch1()
throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
"1");
+ Connection conn = getConnection();
+ conn.start();
+ assertEquals("Prefetch not reset", 1, ((AMQConnection)
conn).getMaxPrefetch());
+
+ final Session producerSess = conn.createSession(true,
Session.SESSION_TRANSACTED);
+ final Session consumerSess = conn.createSession(true,
Session.SESSION_TRANSACTED);
+
+ //declare a priority queue with 10 priorities
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",10);
+ ((AMQSession<?,?>) producerSess).createQueue(new
AMQShortString(getTestQueueName()), false, true, false, arguments);
+
+ Queue queue = producerSess.createQueue(getTestQueueName());
+
+ //create the consumer, producer, add message listener
+ CountDownLatch latch = new CountDownLatch(5);
+ MessageConsumer cons = producerSess.createConsumer(queue);
+ MessageProducer producer = producerSess.createProducer(queue);
+
+ ReflectingMessageListener listener = new
ReflectingMessageListener(producerSess,producer,consumerSess,latch);
+ cons.setMessageListener(listener);
+
+ //Send low priority 0 message to kick start the asynchronous
reflection process
+ producer.setPriority(0);
+ producer.send(nextMessage(1, true, producerSess, producer));
+ producerSess.commit();
+
+ //wait for the reflection process to complete
+ assertTrue("Test process failed to complete in allowed time",
latch.await(10, TimeUnit.SECONDS));
+ assertNull("Unexpected throwable encountered", listener.getThrown());
+ }
+
+ private static class ReflectingMessageListener implements MessageListener
+ {
+ private Session _prodSess;
+ private Session _consSess;
+ private CountDownLatch _latch;
+ private MessageProducer _prod;
+ private long _origCount;
+ private Throwable _lastThrown;
+
+ public ReflectingMessageListener(final Session prodSess, final
MessageProducer prod,
+ final Session consSess, final CountDownLatch latch)
+ {
+ _latch = latch;
+ _origCount = _latch.getCount();
+ _prodSess = prodSess;
+ _consSess = consSess;
+ _prod = prod;
+ }
+
+ @Override
+ public void onMessage(final Message message)
+ {
+ try
+ {
+ _latch.countDown();
+ long msgNum = _origCount - _latch.getCount();
+ System.out.println("Received message " + msgNum + " with ID: "
+ message.getIntProperty("msg"));
+
+ if(_latch.getCount() > 0)
+ {
+ //reflect the message, updating its ID and using default
priority
+ message.clearProperties();
+ message.setIntProperty("msg", (int) msgNum + 1);
+ _prod.setPriority(Message.DEFAULT_PRIORITY);
+ _prod.send(message);
+ _prodSess.commit();
+ }
+
+ //commit the consumer session to consume the message
+ _consSess.commit();
+ }
+ catch(Throwable t)
+ {
+ t.printStackTrace();
+ _lastThrown = t;
+ }
+ }
+
+ public Throwable getThrown()
+ {
+ return _lastThrown;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]