Author: gtully
Date: Fri Aug 29 01:21:09 2008
New Revision: 690144

URL: http://svn.apache.org/viewvc?rev=690144&view=rev
Log:
fix AMQ-1917

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=690144&r1=690143&r2=690144&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Fri Aug 29 01:21:09 2008
@@ -210,11 +210,13 @@
     LinkedList<RecoveryDispatch> recoveries = new 
LinkedList<RecoveryDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) 
throws Exception {
+        // synchronize with dispatch method so that no new messages are sent
+        // while setting up a subscription. avoid out of order messages,
+        // duplicates, etc.
         dispatchLock.lock();
         try {
             sub.add(context, this);
             destinationStatistics.getConsumers().increment();
-//            MessageEvaluationContext msgContext = new 
NonCachedMessageEvaluationContext();
 
             // needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
@@ -229,32 +231,28 @@
                     dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                 }
             }
-            // synchronize with dispatch method so that no new messages are 
sent
-            // while
-            // setting up a subscription. avoid out of order messages,
-            // duplicates
-            // etc.
+            
+            // any newly paged in messages that are not dispatched are added 
to pagedInPending in iterate()
             doPageIn(false);
-
+            
             synchronized (pagedInMessages) {
                 RecoveryDispatch rd = new RecoveryDispatch();
                 rd.messages =  new 
ArrayList<QueueMessageReference>(pagedInMessages.values());
                 rd.subscription = sub;
                 recoveries.addLast(rd);
             }
-            
             if( sub instanceof QueueBrowserSubscription ) {
                 ((QueueBrowserSubscription)sub).incrementQueueRef();
             }
             if (!this.optimizedDispatch) {
-                    wakeup();
+                wakeup();
             }
         }finally {
             dispatchLock.unlock();
         }
         if (this.optimizedDispatch) {
-        // Outside of dispatchLock() to maintain the lock hierarchy of
-        // iteratingMutex -> dispatchLock. - see 
https://issues.apache.org/activemq/browse/AMQ-1878
+            // Outside of dispatchLock() to maintain the lock hierarchy of
+            // iteratingMutex -> dispatchLock. - see 
https://issues.apache.org/activemq/browse/AMQ-1878
             wakeup();
         }
     }
@@ -262,11 +260,10 @@
     public void removeSubscription(ConnectionContext context, Subscription sub)
             throws Exception {
         destinationStatistics.getConsumers().decrement();
+        // synchronize with dispatch method so that no new messages are sent
+        // while removing up a subscription.
         dispatchLock.lock();
         try {
-            // synchronize with dispatch method so that no new messages are 
sent
-            // while
-            // removing up a subscription.
             synchronized (consumers) {
                 removeFromConsumerList(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
@@ -324,7 +321,6 @@
     }
 
     public void send(final ProducerBrokerExchange producerExchange, final 
Message message) throws Exception {
-//        System.out.println(getName()+" send "+message.getMessageId());
         final ConnectionContext context = 
producerExchange.getConnectionContext();
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
@@ -934,9 +930,17 @@
                        for (QueueMessageReference node : rd.messages) {
                            if (!node.isDropped() && !node.isAcked() && 
(!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
                                msgContext.setMessageReference(node);
-                                   if (rd.subscription.matches(node, 
msgContext)) {
-                                       rd.subscription.add(node);
+                               if (rd.subscription.matches(node, msgContext)) {
+                                   rd.subscription.add(node);
+                               } else {
+                                   // make sure it gets queued for dispatched 
again
+                                   dispatchLock.lock();
+                                   try {
+                                       pagedInPendingDispatch.add(node);
+                                   } finally {
+                                       dispatchLock.unlock();
                                    }
+                               }
                            }
                        }
                        
@@ -949,24 +953,24 @@
                    }
                }
        
-               boolean result = false;
+               boolean pageInMoreMessages = false;
                synchronized (messages) {
-                   result = !messages.isEmpty();
+                   pageInMoreMessages = !messages.isEmpty();
                }               
                
                // Kinda ugly.. but I think dispatchLock is the only mutex 
protecting the 
                // pagedInPendingDispatch variable.             
                dispatchLock.lock();
                try {
-                   result |= !pagedInPendingDispatch.isEmpty();
+                   pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
                } finally {
                    dispatchLock.unlock();
                }
                
                // Perhaps we should page always into the 
pagedInPendingDispatch list is 
-                // !messages.isEmpty(), and then if 
!pagedInPendingDispatch.isEmpty()
-                // then we do a dispatch.
-               if (result) {
+               // !messages.isEmpty(), and then if 
!pagedInPendingDispatch.isEmpty()
+               // then we do a dispatch.
+               if (pageInMoreMessages) {
                    try {
                       pageInMessages(false);
                       
@@ -1116,8 +1120,8 @@
             int toPageIn = 
(getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - 
pagedInMessages.size();
             toPageIn = Math.min(toPageIn,getMaxPageSize());
             if (isLazyDispatch()&& !force) {
-             // Only page in the minimum number of messages which can be 
dispatched immediately.
-             toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
toPageIn);
+                // Only page in the minimum number of messages which can be 
dispatched immediately.
+                toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
toPageIn);
             }
             if ((force || !consumers.isEmpty()) && toPageIn > 0) {
                 messages.setMaxBatchSize(toPageIn);
@@ -1158,21 +1162,17 @@
         dispatchLock.lock();
         try {
             if(!pagedInPendingDispatch.isEmpty()) {
- //              System.out.println(getName()+": dispatching from pending: 
"+pagedInPendingDispatch.size());
                 // Try to first dispatch anything that had not been dispatched 
before.
                 pagedInPendingDispatch = 
doActualDispatch(pagedInPendingDispatch);
-//                System.out.println(getName()+": new pending list1: 
"+pagedInPendingDispatch.size());
             }
             // and now see if we can dispatch the new stuff.. and append to 
the pending 
             // list anything that does not actually get dispatched.
             if (list != null && !list.isEmpty()) {
-//                System.out.println(getName()+": dispatching from paged in: 
"+list.size());
                 if (pagedInPendingDispatch.isEmpty()) {
                     pagedInPendingDispatch.addAll(doActualDispatch(list));
                 } else {
                     pagedInPendingDispatch.addAll(list);
                 }
-//                System.out.println(getName()+": new pending list2: 
"+pagedInPendingDispatch.size());
             }
         } finally {
             dispatchLock.unlock();
@@ -1200,7 +1200,6 @@
                         if (!s.isFull()) {
                             // Dispatch it.
                             s.add(node);
-                            //System.err.println(getName()+" Dispatched to 
"+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
                             target = s;
                             break;
                         } else {

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java?rev=690144&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
 Fri Aug 29 01:21:09 2008
@@ -0,0 +1,207 @@
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+
+
+public class AMQ1917Test extends TestCase {
+
+        private static final int NUM_MESSAGES = 4000;
+        private static final int NUM_THREADS = 10;
+        public static final String REQUEST_QUEUE = "mock.in.queue";
+        public static final String REPLY_QUEUE = "mock.out.queue";
+
+        Destination requestDestination = ActiveMQDestination.createDestination(
+                REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+        Destination replyDestination = ActiveMQDestination.createDestination(
+                REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+
+        CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
+        CountDownLatch errorLatch = new CountDownLatch(1);
+        ThreadPoolExecutor tpe;
+        final String BROKER_URL = "tcp://localhost:61616";
+        BrokerService broker = null;
+        private boolean working = true;
+        
+        // trival session/producer pool
+        final Session[] sessions = new Session[NUM_THREADS];
+        final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
+
+        public void setUp() throws Exception {
+            broker = new BrokerService();
+            broker.setPersistent(false);
+            broker.addConnector(BROKER_URL);
+            broker.start();
+            
+            BlockingQueue<Runnable> queue = new 
ArrayBlockingQueue<Runnable>(10000);
+            tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000,
+                    TimeUnit.MILLISECONDS, queue);
+            ThreadFactory limitedthreadFactory = new 
LimitedThreadFactory(tpe.getThreadFactory());  
+            tpe.setThreadFactory(limitedthreadFactory);
+        }
+
+        public void tearDown() throws Exception {
+            broker.stop();
+            tpe.shutdown();
+        }
+        
+        public void testLoadedSendRecieveWithCorrelationId() throws Exception 
{            
+           
+            ActiveMQConnectionFactory connectionFactory = new 
org.apache.activemq.ActiveMQConnectionFactory();
+            connectionFactory.setBrokerURL(BROKER_URL);
+            Connection connection = connectionFactory.createConnection();      
    
+            setupReceiver(connection);
+
+            connection = connectionFactory.createConnection();
+            connection.start();
+            
+            // trival session/producer pool   
+            for (int i=0; i<NUM_THREADS; i++) {
+                sessions[i] = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                producers[i] = sessions[i].createProducer(requestDestination);
+            }
+            
+            for (int i = 0; i < NUM_MESSAGES; i++) {
+                MessageSenderReceiver msr = new 
MessageSenderReceiver(requestDestination,
+                        replyDestination, "Test Message : " + i);
+                tpe.execute(msr);
+            }
+            
+            while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
+                if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
+                    fail("there was an error, check the console for thread or 
thread allocation failure");
+                    break;
+                }
+            }
+            working = false;
+        }
+
+        private void setupReceiver(final Connection connection) throws 
Exception {
+
+            final Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = session
+                    .createConsumer(requestDestination);
+            final MessageProducer sender = 
session.createProducer(replyDestination);
+            connection.start();
+
+            new Thread() {
+                public void run() {
+                    while (working) {
+                        // wait for messages in infinitive loop
+                        // time out is set to show the client is awaiting
+                        try {
+                            TextMessage msg = (TextMessage) 
consumer.receive(20000);
+                            if (msg == null) {
+                                errorLatch.countDown();
+                                fail("Response timed out." 
+                                        + " latchCount=" + 
roundTripLatch.getCount());
+                            } else {
+                                String result = msg.getText();
+                                //System.out.println("Request:" + (i++)
+                                //        + ", msg=" + result + ", ID" + 
msg.getJMSMessageID());
+                                TextMessage response = 
session.createTextMessage();
+                                
response.setJMSCorrelationID(msg.getJMSMessageID());
+                                response.setText(result);
+                                sender.send(response);
+                            }
+                        } catch (JMSException e) {
+                            errorLatch.countDown();
+                            fail("Unexpected exception:" + e);
+                        }
+                    }
+                }
+            }.start();
+        }
+
+        class MessageSenderReceiver implements Runnable {
+
+            Destination reqDest;
+            Destination replyDest;
+            String origMsg;
+
+            public MessageSenderReceiver(Destination reqDest,
+                    Destination replyDest, String msg) throws Exception {
+                this.replyDest = replyDest;
+                this.reqDest = reqDest;
+                this.origMsg = msg;
+            }
+
+            private int getIndexFromCurrentThread() {
+                String name = Thread.currentThread().getName();
+                String num = name.substring(name.lastIndexOf('-') +1);
+                int idx = Integer.parseInt(num) -1;
+                assertTrue("idx is in range: idx=" + idx,  idx < NUM_THREADS);
+                return idx;
+            }
+
+            public void run() {
+                try {
+                    // get thread session and producer from pool
+                    int threadIndex = getIndexFromCurrentThread();
+                    Session session = sessions[threadIndex];
+                    MessageProducer producer = producers[threadIndex];
+
+                    final Message sendJmsMsg = 
session.createTextMessage(origMsg);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.send(sendJmsMsg);
+
+                    String jmsId = sendJmsMsg.getJMSMessageID();
+                    String selector = "JMSCorrelationID='" + jmsId + "'";
+
+                    MessageConsumer consumer = 
session.createConsumer(replyDest,
+                            selector);
+                    Message receiveJmsMsg = consumer.receive(2000);
+                    consumer.close();
+                    if (receiveJmsMsg == null) {
+                        errorLatch.countDown();
+                        fail("Unable to receive response for:" + origMsg
+                                + ", with selector=" + selector);
+                    } else {
+                        //System.out.println("received response message :"
+                        //        + ((TextMessage) receiveJmsMsg).getText()
+                        //        + " with selector : " + selector);
+                        roundTripLatch.countDown();
+                    }
+                } catch (JMSException e) {
+                    fail("unexpected exception:" + e);
+                }
+            }
+        }
+        
+        public class LimitedThreadFactory implements ThreadFactory {
+            int threadCount;
+            private ThreadFactory factory;
+            public LimitedThreadFactory(ThreadFactory threadFactory) {
+                this.factory = threadFactory;
+            }
+
+            public Thread newThread(Runnable arg0) {
+                if (++threadCount > NUM_THREADS) {
+                    errorLatch.countDown();
+                    fail("too many threads requested");
+                }       
+                return factory.newThread(arg0);
+            }
+        }
+    }
+

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to