Author: robbie
Date: Tue Apr  3 21:21:57 2012
New Revision: 1309155

URL: http://svn.apache.org/viewvc?rev=1309155&view=rev
Log:
QPID-3927: add a systest which would highlight the underlying issue by failing 
to receive all messages present on the priority queue

Modified:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1309155&r1=1309154&r2=1309155&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
 Tue Apr  3 21:21:57 2012
@@ -21,8 +21,10 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
@@ -30,12 +32,15 @@ import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.naming.NamingException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class PriorityQueueTest extends QpidBrokerTestCase
 {
@@ -197,4 +202,99 @@ public class PriorityQueueTest extends Q
 
         return send;
     }
+
+    /**
+     * Test that after sending an initial  message with priority 0, it is able 
to be repeatedly reflected back to the queue using
+     * default priority and then consumed again, with separate transacted 
sessions with prefetch 1 for producer and consumer.
+     *
+     * Highlighted defect with PriorityQueues resolved in QPID-3927.
+     */
+    public void 
testMessageReflectionWithPriorityIncreaseOnTransactedSessionsWithPrefetch1() 
throws Exception
+    {
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, 
"1");
+        Connection conn = getConnection();
+        conn.start();
+        assertEquals("Prefetch not reset", 1, ((AMQConnection) 
conn).getMaxPrefetch());
+
+        final Session producerSess = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+        final Session consumerSess = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+
+        //declare a priority queue with 10 priorities
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-priorities",10);
+        ((AMQSession<?,?>) producerSess).createQueue(new 
AMQShortString(getTestQueueName()), false, true, false, arguments);
+
+        Queue queue = producerSess.createQueue(getTestQueueName());
+
+        //create the consumer, producer, add message listener
+        CountDownLatch latch = new CountDownLatch(5);
+        MessageConsumer cons = producerSess.createConsumer(queue);
+        MessageProducer producer = producerSess.createProducer(queue);
+
+        ReflectingMessageListener listener = new 
ReflectingMessageListener(producerSess,producer,consumerSess,latch);
+        cons.setMessageListener(listener);
+
+        //Send low priority 0 message to kick start the asynchronous 
reflection process
+        producer.setPriority(0);
+        producer.send(nextMessage(1, true, producerSess, producer));
+        producerSess.commit();
+
+        //wait for the reflection process to complete
+        assertTrue("Test process failed to complete in allowed time", 
latch.await(10, TimeUnit.SECONDS));
+        assertNull("Unexpected throwable encountered", listener.getThrown());
+    }
+
+    private static class ReflectingMessageListener implements MessageListener
+    {
+        private Session _prodSess;
+        private Session _consSess;
+        private CountDownLatch _latch;
+        private MessageProducer _prod;
+        private long _origCount;
+        private Throwable _lastThrown;
+
+        public ReflectingMessageListener(final Session prodSess, final 
MessageProducer prod,
+                final Session consSess, final CountDownLatch latch)
+        {
+            _latch = latch;
+            _origCount = _latch.getCount();
+            _prodSess = prodSess;
+            _consSess = consSess;
+            _prod = prod;
+        }
+
+        @Override
+        public void onMessage(final Message message)
+        {
+            try
+            {
+                _latch.countDown();
+                long msgNum = _origCount - _latch.getCount();
+                System.out.println("Received message " + msgNum + " with ID: " 
+ message.getIntProperty("msg"));
+
+                if(_latch.getCount() > 0)
+                {
+                    //reflect the message, updating its ID and using default 
priority
+                    message.clearProperties();
+                    message.setIntProperty("msg", (int) msgNum + 1);
+                    _prod.setPriority(Message.DEFAULT_PRIORITY);
+                    _prod.send(message);
+                    _prodSess.commit();
+                }
+
+                //commit the consumer session to consume the message
+                _consSess.commit();
+            }
+            catch(Throwable t)
+            {
+                t.printStackTrace();
+                _lastThrown = t;
+            }
+        }
+
+        public Throwable getThrown()
+        {
+            return _lastThrown;
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to