Author: robbie
Date: Wed Mar 30 16:01:23 2011
New Revision: 1087000

URL: http://svn.apache.org/viewvc?rev=1087000&view=rev
Log:
QPID-3167: add a unit test of SimpleAMQQueue#processQueue to check delivery 
when subscriptions with unique selectors are in use

Added:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Wed Mar 30 16:01:23 2011
@@ -346,7 +346,7 @@ public class AMQChannel implements Sessi
             finally
             {
                 long bodySize = _currentMessage.getSize();
-                long timestamp = ((BasicContentHeaderProperties) 
_currentMessage.getContentHeader().properties).getTimestamp();
+                long timestamp = ((BasicContentHeaderProperties) 
_currentMessage.getContentHeader().getProperties()).getTimestamp();
                 _session.registerMessageReceived(bodySize, timestamp);
                 _currentMessage = null;
             }
@@ -1079,8 +1079,8 @@ public class AMQChannel implements Sessi
     private boolean checkMessageUserId(ContentHeaderBody header)
     {
         AMQShortString userID =
-                header.properties instanceof BasicContentHeaderProperties
-                    ? ((BasicContentHeaderProperties) 
header.properties).getUserId()
+                header.getProperties() instanceof BasicContentHeaderProperties
+                    ? ((BasicContentHeaderProperties) 
header.getProperties()).getUserId()
                     : null;
 
         return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID 
== null? "" : userID.toString()));

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
 Wed Mar 30 16:01:23 2011
@@ -37,7 +37,7 @@ public class ContentHeaderBodyAdapter im
 
     private BasicContentHeaderProperties getProperties()
     {
-        return (BasicContentHeaderProperties) _contentHeaderBody.properties;
+        return (BasicContentHeaderProperties) 
_contentHeaderBody.getProperties();
     }
 
     public String getCorrelationId()

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
 Wed Mar 30 16:01:23 2011
@@ -161,7 +161,7 @@ public class MessageMetaData implements 
 
     public boolean isPersistent()
     {
-        BasicContentHeaderProperties properties = 
(BasicContentHeaderProperties) (_contentHeaderBody.properties);
+        BasicContentHeaderProperties properties = 
(BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
         return properties.getDeliveryMode() ==  
BasicContentHeaderProperties.PERSISTENT;
     }
 
@@ -229,7 +229,7 @@ public class MessageMetaData implements 
     {
         private BasicContentHeaderProperties getProperties()
         {
-            return (BasicContentHeaderProperties) 
getContentHeaderBody().properties;
+            return (BasicContentHeaderProperties) 
getContentHeaderBody().getProperties();
         }
 
         public String getCorrelationId()

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Wed Mar 30 16:01:23 2011
@@ -507,7 +507,7 @@ public class AMQQueueMBean extends AMQMa
     private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
     {
         List<String> list = new ArrayList<String>();
-        BasicContentHeaderProperties headerProperties = 
(BasicContentHeaderProperties) headerBody.properties;
+        BasicContentHeaderProperties headerProperties = 
(BasicContentHeaderProperties) headerBody.getProperties();
         list.add("reply-to = " + headerProperties.getReplyToAsString());
         list.add("propertyFlags = " + headerProperties.getPropertyFlags());
         list.add("ApplicationID = " + headerProperties.getAppIdAsString());

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 Wed Mar 30 16:01:23 2011
@@ -96,9 +96,9 @@ public class IncomingMessage implements 
     public void setExpiration()
     {
             long expiration =
-                    ((BasicContentHeaderProperties) 
_contentHeaderBody.properties).getExpiration();
+                    ((BasicContentHeaderProperties) 
_contentHeaderBody.getProperties()).getExpiration();
             long timestamp =
-                    ((BasicContentHeaderProperties) 
_contentHeaderBody.properties).getTimestamp();
+                    ((BasicContentHeaderProperties) 
_contentHeaderBody.getProperties()).getTimestamp();
 
             if (SYNCHED_CLOCKS)
             {
@@ -193,8 +193,8 @@ public class IncomingMessage implements 
 
     public boolean isPersistent()
     {
-        return getContentHeader().properties instanceof 
BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) 
getContentHeader().properties).getDeliveryMode() ==
+        return getContentHeader().getProperties() instanceof 
BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) 
getContentHeader().getProperties()).getDeliveryMode() ==
                                                              
BasicContentHeaderProperties.PERSISTENT;
     }
 

Added: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1087000&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
 (added)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
 Wed Mar 30 16:01:23 2011
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.queue.QueueRunner;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+
+
+public class QueueRunner implements ReadWriteRunnable
+{
+    private static final Logger _logger = Logger.getLogger(QueueRunner.class);
+
+    private String _name;
+    private SimpleAMQQueue _queue;
+
+    public QueueRunner(SimpleAMQQueue queue, long count)
+    {
+        _queue = queue;
+        _name = "QueueRunner-" + count + "-" + queue.getLogActor();
+    }
+
+    public void run()
+    {
+        String originalName = Thread.currentThread().getName();
+        try
+        {
+            Thread.currentThread().setName(_name);
+            CurrentActor.set(_queue.getLogActor());
+
+            _queue.processQueue(this);
+        }
+        catch (AMQException e)
+        {
+            _logger.error(e);
+        }
+        finally
+        {
+            CurrentActor.remove();
+            Thread.currentThread().setName(originalName);
+        }
+    }
+
+    public boolean isRead()
+    {
+        return false;
+    }
+
+    public boolean isWrite()
+    {
+        return true;
+    }
+
+    public String toString()
+    {
+        return _name;
+    }
+}
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Wed Mar 30 16:01:23 2011
@@ -1585,7 +1585,7 @@ public class SimpleAMQQueue implements A
 
     public void deliverAsync()
     {
-        Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+        QueueRunner runner = new QueueRunner(this, 
_stateChangeCount.incrementAndGet());
 
         if (_asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1604,52 +1604,6 @@ public class SimpleAMQQueue implements A
         _asyncDelivery.execute(flusher);
     }
 
-
-    private class Runner implements ReadWriteRunnable
-    {
-        String _name;
-        public Runner(long count)
-        {
-            _name = "QueueRunner-" + count + "-" + _logActor;
-        }
-
-        public void run()
-        {
-            String originalName = Thread.currentThread().getName();
-            try
-            {
-                Thread.currentThread().setName(_name);
-                CurrentActor.set(_logActor);
-
-                processQueue(this);
-            }
-            catch (AMQException e)
-            {
-                _logger.error(e);
-            }
-            finally
-            {
-                CurrentActor.remove();
-                Thread.currentThread().setName(originalName);
-            }
-        }
-
-        public boolean isRead()
-        {
-            return false;
-        }
-
-        public boolean isWrite()
-        {
-            return true;
-        }
-
-        public String toString()
-        {
-            return _name;
-        }
-    }
-
     public void flushSubscription(Subscription sub) throws AMQException
     {
         // Access control
@@ -1834,7 +1788,7 @@ public class SimpleAMQQueue implements A
      * @param runner the Runner to schedule
      * @throws AMQException
      */
-    private void processQueue(Runnable runner) throws AMQException
+    public void processQueue(QueueRunner runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
@@ -2289,4 +2243,9 @@ public class SimpleAMQQueue implements A
             }
         }
     }
+
+    public LogActor getLogActor()
+    {
+        return _logActor;
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 Wed Mar 30 16:01:23 2011
@@ -441,7 +441,7 @@ public class Subscription_0_10 implement
             Struct[] headers = new Struct[] { deliveryProps, messageProps };
 
             BasicContentHeaderProperties properties =
-                    (BasicContentHeaderProperties) 
message_0_8.getContentHeaderBody().properties;
+                    (BasicContentHeaderProperties) 
message_0_8.getContentHeaderBody().getProperties();
             final AMQShortString exchange = 
message_0_8.getMessagePublishInfo().getExchange();
             if(exchange != null)
             {

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
 Wed Mar 30 16:01:23 2011
@@ -364,7 +364,7 @@ public class Show extends AbstractComman
             {
                 if(msg instanceof AMQMessage)
                 {
-                    headers = ((BasicContentHeaderProperties) 
((AMQMessage)msg).getContentHeaderBody().properties);
+                    headers = ((BasicContentHeaderProperties) 
((AMQMessage)msg).getContentHeaderBody().getProperties());
                 }
             }
             catch (AMQException e)

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 Wed Mar 30 16:01:23 2011
@@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTest
     static ContentHeaderBody getContentHeader(FieldTable headers)
     {
         ContentHeaderBody header = new ContentHeaderBody();
-        header.properties = getProperties(headers);
+        header.setProperties(getProperties(headers));
         return header;
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
 Wed Mar 30 16:01:23 2011
@@ -396,7 +396,7 @@ public class TopicExchangeTest extends I
         IncomingMessage message = new IncomingMessage(info);
         final ContentHeaderBody chb = new ContentHeaderBody();
         BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
-        chb.properties = props;
+        chb.setProperties(props);
         message.setContentHeaderBody(chb);
 
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
 Wed Mar 30 16:01:23 2011
@@ -96,7 +96,7 @@ public class AMQPriorityQueueTest extend
         AMQMessage msg = super.createMessage(id);
         BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
         props.setPriority(i);
-        msg.getContentHeaderBody().properties = props;
+        msg.getContentHeaderBody().setProperties(props);
         return msg;
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Wed Mar 30 16:01:23 2011
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends I
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
-        contentHeaderBody.properties = props;
+        contentHeaderBody.setProperties(props);
         contentHeaderBody.bodySize = size;   // in bytes
         IncomingMessage message = new IncomingMessage(publish);
         message.setContentHeaderBody(contentHeaderBody);

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Wed Mar 30 16:01:23 2011
@@ -402,8 +402,8 @@ public class AMQQueueMBeanTest extends I
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
-        contentHeaderBody.properties = new BasicContentHeaderProperties();
-        ((BasicContentHeaderProperties) 
contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+        contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+        ((BasicContentHeaderProperties) 
contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
         IncomingMessage msg = new IncomingMessage(publish);
         msg.setContentHeaderBody(contentHeaderBody);
         return msg;

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
 Wed Mar 30 16:01:23 2011
@@ -126,7 +126,7 @@ public class AckTest extends InternalBro
             //IncomingMessage msg2 = null;
             BasicContentHeaderProperties b = new 
BasicContentHeaderProperties();
             ContentHeaderBody cb = new ContentHeaderBody();
-            cb.properties = b;
+            cb.setProperties(b);
 
             if (persistent)
             {

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Wed Mar 30 16:01:23 2011
@@ -660,8 +660,8 @@ public class SimpleAMQQueueTest extends 
         // Create IncomingMessage and nondurable queue
         final IncomingMessage msg = new IncomingMessage(info);
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
-        contentHeaderBody.properties = new BasicContentHeaderProperties();
-        ((BasicContentHeaderProperties) 
contentHeaderBody.properties).setDeliveryMode((byte) 2);
+        contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+        ((BasicContentHeaderProperties) 
contentHeaderBody.getProperties()).setDeliveryMode((byte) 2);
         msg.setContentHeaderBody(contentHeaderBody);
 
         final ArrayList<BaseQueue> qs = new ArrayList<BaseQueue>();
@@ -707,6 +707,111 @@ public class SimpleAMQQueueTest extends 
     }
 
 
+    /**
+     * processQueue() is used when asynchronously delivering messages to
+     * subscriptions which could not be delivered immediately during the
+     * enqueue() operation.
+     *
+     * A defect within the method would mean that delivery of these messages 
may
+     * not occur should the Runner stop before all messages have been 
processed.
+     * Such a defect was discovered when Selectors were used such that one and
+     * only one subscription can/will accept any given messages, but multiple
+     * subscriptions are present, and one of the earlier subscriptions receives
+     * more messages than the others.
+     *
+     * This test is to validate that the processQueue() method is able to
+     * correctly deliver all of the messages present for asynchronous delivery
+     * to subscriptions in such a scenario.
+     */
+    public void testProcessQueueWithUniqueSelectors() throws Exception
+    {
+        TestSimpleQueueEntryListFactory factory = new 
TestSimpleQueueEntryListFactory();
+        SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, 
"testOwner",false,
+                                                      false, _virtualHost, 
factory, null)
+        {
+            @Override
+            public void deliverAsync(Subscription sub)
+            {
+                // do nothing, i.e prevent deliveries by the SubFlushRunner
+                // when registering the new subscriptions
+            }
+        };
+
+        // retrieve the QueueEntryList the queue creates and insert the test
+        // messages, thus avoiding straight-through delivery attempts during
+        //enqueue() process.
+        QueueEntryList list = factory.getQueueEntryList();
+        assertNotNull("QueueEntryList should have been created", list);
+
+        QueueEntry msg1 = list.add(createMessage(1L));
+        QueueEntry msg2 = list.add(createMessage(2L));
+        QueueEntry msg3 = list.add(createMessage(3L));
+        QueueEntry msg4 = list.add(createMessage(4L));
+        QueueEntry msg5 = list.add(createMessage(5L));
+
+        // Create lists of the entries each subscription should be interested
+        // in.Bias over 50% of the messages to the first subscription so that
+        // the later subscriptions reject them and report being done before
+        // the first subscription as the processQueue method proceeds.
+        List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+        List<QueueEntry> msgListSub2 = createEntriesList(msg4);
+        List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+
+        MockSubscription sub1 = new MockSubscription(msgListSub1);
+        MockSubscription sub2 = new MockSubscription(msgListSub2);
+        MockSubscription sub3 = new MockSubscription(msgListSub3);
+
+        // register the subscriptions
+        testQueue.registerSubscription(sub1, false);
+        testQueue.registerSubscription(sub2, false);
+        testQueue.registerSubscription(sub3, false);
+
+        //check that no messages have been delivered to the
+        //subscriptions during registration
+        assertEquals("No messages should have been delivered yet", 0, 
sub1.getMessages().size());
+        assertEquals("No messages should have been delivered yet", 0, 
sub2.getMessages().size());
+        assertEquals("No messages should have been delivered yet", 0, 
sub3.getMessages().size());
+
+        // call processQueue to deliver the messages
+        testQueue.processQueue(new QueueRunner(testQueue, 1)
+        {
+            @Override
+            public void run()
+            {
+                // we dont actually want/need this runner to do any work
+                // because we we are already doing it!
+            }
+        });
+
+        // check expected messages delivered to correct consumers
+        verifyRecievedMessages(msgListSub1, sub1.getMessages());
+        verifyRecievedMessages(msgListSub2, sub2.getMessages());
+        verifyRecievedMessages(msgListSub3, sub3.getMessages());
+    }
+
+    private List<QueueEntry> createEntriesList(QueueEntry... entries)
+    {
+        ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+        for (QueueEntry entry : entries)
+        {
+            entriesList.add(entry);
+        }
+        return entriesList;
+    }
+
+    private void verifyRecievedMessages(List<QueueEntry> expected,
+            List<QueueEntry> delivered)
+    {
+        assertEquals("Consumer did not receive the expected number of 
messages",
+                    expected.size(), delivered.size());
+
+        for (QueueEntry msg : expected)
+        {
+            assertTrue("Consumer did not recieve msg: "
+                    + msg.getMessage().getMessageNumber(), 
delivered.contains(msg));
+        }
+    }
+
     public class TestMessage extends AMQMessage
     {
         private final long _tag;
@@ -747,4 +852,20 @@ public class SimpleAMQQueueTest extends 
         AMQMessage messageA = new TestMessage(id, id, info);
         return messageA;
     }
+
+    class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+    {
+        QueueEntryList _list;
+
+        public QueueEntryList createQueueEntryList(AMQQueue queue)
+        {
+            _list = new SimpleQueueEntryList(queue);
+            return _list;
+        }
+
+        public QueueEntryList getQueueEntryList()
+        {
+            return _list;
+        }
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 Wed Mar 30 16:01:23 2011
@@ -589,7 +589,7 @@ public class MessageStoreTest extends In
         headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
         headerBody.bodySize = 0;
 
-        headerBody.properties = properties;
+        headerBody.setProperties(properties);
 
         try
         {

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
 Wed Mar 30 16:01:23 2011
@@ -102,7 +102,7 @@ public class ReferenceCountingTest exten
         ContentHeaderBody chb = new ContentHeaderBody();
         BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
         bchp.setDeliveryMode((byte)2);
-        chb.properties = bchp;
+        chb.setProperties(bchp);
         return chb;
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 Wed Mar 30 16:01:23 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.server.subscript
 */
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -45,6 +46,7 @@ public class MockSubscription implements
     private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
+    private List<QueueEntry> _acceptEntries = null;
 
     private final QueueEntry.SubscriptionAcquiredState _owningState = new 
QueueEntry.SubscriptionAcquiredState(this);
     private final QueueEntry.SubscriptionAssignedState _assignedState = new 
QueueEntry.SubscriptionAssignedState(this);
@@ -54,6 +56,15 @@ public class MockSubscription implements
     // Create a simple ID that increments for ever new Subscription
     private final long _subscriptionID = idGenerator.getAndIncrement();
 
+    public MockSubscription()
+    {
+    }
+
+    public MockSubscription(List<QueueEntry> acceptEntries)
+    {
+        _acceptEntries = acceptEntries;
+    }
+
     public void close()
     {
         _closed = true;
@@ -119,8 +130,15 @@ public class MockSubscription implements
         _stateChangeLock.lock();
     }
 
-    public boolean hasInterest(QueueEntry msg)
+    public boolean hasInterest(QueueEntry entry)
     {
+        if(_acceptEntries != null)
+        {
+            //simulate selector behaviour, only signal
+            //interest in the dictated queue entries
+            return _acceptEntries.contains(entry);
+        }
+
         return true;
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
 Wed Mar 30 16:01:23 2011
@@ -243,7 +243,7 @@ public class InternalBrokerBaseCase exte
             //Make Message Persistent
             properties.setDeliveryMode((byte) 2);
 
-            _headerBody.properties = properties;
+            _headerBody.setProperties(properties);
 
             channel.publishContentHeader(_headerBody);
         }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 Wed Mar 30 16:01:23 2011
@@ -99,7 +99,7 @@ public abstract class AbstractJMSMessage
         }
 
         AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
-                                                                 
(BasicContentHeaderProperties) contentHeader.properties,
+                                                                 
(BasicContentHeaderProperties) contentHeader.getProperties(),
                                                                  exchange, 
routingKey);
 
         return createMessage(delegate, data);

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
 Wed Mar 30 16:01:23 2011
@@ -104,7 +104,7 @@ public class MessageFactoryRegistry
                                             AMQShortString routingKey, 
ContentHeaderBody contentHeader, List bodies)
             throws AMQException, JMSException
     {
-        BasicContentHeaderProperties properties = 
(BasicContentHeaderProperties) contentHeader.properties;
+        BasicContentHeaderProperties properties = 
(BasicContentHeaderProperties) contentHeader.getProperties();
 
         // Get the message content type. This may be null for pure AMQP 
messages, but will always be set for JMS over
         // AMQP. When the type is null, it can only be assumed that the 
message is a byte message.

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1087000&r1=1086999&r2=1087000&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
 Wed Mar 30 16:01:23 2011
@@ -36,7 +36,7 @@ public class ContentHeaderBody implement
     public long bodySize;
 
     /** must never be null */
-    public ContentHeaderProperties properties;
+    private ContentHeaderProperties properties;
 
     public ContentHeaderBody()
     {
@@ -128,4 +128,14 @@ public class ContentHeaderBody implement
     {
         return new AMQFrame(channelId, body);
     }
+
+    public ContentHeaderProperties getProperties()
+    {
+        return properties;
+    }
+
+    public void setProperties(ContentHeaderProperties props)
+    {
+        properties = props;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to