Author: robbie
Date: Tue Dec  7 12:23:38 2010
New Revision: 1042998

URL: http://svn.apache.org/viewvc?rev=1042998&view=rev
Log:
QPID-2972: client configuration for Max Delivery Count

Added:
    
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
Modified:
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
    
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Tue Dec  7 12:23:38 2010
@@ -325,6 +325,11 @@ public class AMQConnection extends Close
     //By default it's async publish
     private String _syncPublish = ""; 
     
+    /* Indicates the maximum number of times an individual consumer
+     * created on this connection can see a specific MessageID
+     * before it should reject the message during rollback/recover */
+    private int _maxDeliveryCount;
+    
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -454,6 +459,18 @@ public class AMQConnection extends Close
             _syncPublish = 
System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
         }
         
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT) 
!= null)
+        {
+            _maxDeliveryCount = Integer.parseInt(
+                    
connectionURL.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT));
+        }
+        else
+        {
+            // use the default value for all connections, if set
+            _maxDeliveryCount = 
Integer.getInteger(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, 0);
+        }
+
+        
         _failoverPolicy = new FailoverPolicy(connectionURL, this);
         BrokerDetails brokerDetails = 
_failoverPolicy.getCurrentBrokerDetails();
         if (brokerDetails.getTransport().equals(BrokerDetails.VM))
@@ -1596,4 +1613,15 @@ public class AMQConnection extends Close
     {
         return _sessions.getNextChannelId();
     }
+
+    /**
+     * Returns the MaxDeliveryCount value applied to this connection either 
via a
+     * ConnectionURL option or system property. If no setting was made this 
defaults to 0;
+     * 
+     * @return the value (0 if none was set)
+     */
+    public int getMaxDeliveryCount()
+    {
+        return _maxDeliveryCount;
+    }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
 Tue Dec  7 12:23:38 2010
@@ -55,6 +55,8 @@ public abstract class AMQDestination imp
 
     private AMQShortString[] _bindingKeys;
 
+    private Integer _maxDeliveryCount;
+
     private String _url;
     private AMQShortString _urlAsShortString;
 
@@ -62,11 +64,6 @@ public abstract class AMQDestination imp
 
     private boolean _exchangeExistsChecked;
 
-    private byte[] _byteEncoding;
-    private static final int IS_DURABLE_MASK = 0x1;
-    private static final int IS_EXCLUSIVE_MASK = 0x2;
-    private static final int IS_AUTODELETE_MASK = 0x4;
-
     public static final int QUEUE_TYPE = 1;
     public static final int TOPIC_TYPE = 2;
     public static final int UNKNOWN_TYPE = 3;
@@ -88,6 +85,8 @@ public abstract class AMQDestination imp
         _queueName = binding.getQueueName() == null ? null : 
binding.getQueueName();
         _routingKey = binding.getRoutingKey() == null ? null : 
binding.getRoutingKey();
         _bindingKeys = binding.getBindingKeys() == null || 
binding.getBindingKeys().length == 0 ? new AMQShortString[0] : 
binding.getBindingKeys();
+        String count = binding.getOption(BindingURL.OPTION_MAX_DELIVERY_COUNT);
+        _maxDeliveryCount = count == null ? null : Integer.parseInt(count);
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString 
exchangeClass, AMQShortString routingKey, AMQShortString queueName)
@@ -129,7 +128,14 @@ public abstract class AMQDestination imp
     }
 
     protected AMQDestination(AMQShortString exchangeName, AMQShortString 
exchangeClass, AMQShortString routingKey, boolean isExclusive,
-                             boolean isAutoDelete, AMQShortString queueName, 
boolean isDurable,AMQShortString[] bindingKeys, boolean browseOnly)
+            boolean isAutoDelete, AMQShortString queueName, boolean 
isDurable,AMQShortString[] bindingKeys, boolean browseOnly)
+    {
+        this (exchangeName, exchangeClass, routingKey, 
isExclusive,isAutoDelete,queueName,isDurable,bindingKeys, browseOnly, null);
+    }
+    
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString 
exchangeClass, AMQShortString routingKey, boolean isExclusive,
+                             boolean isAutoDelete, AMQShortString queueName, 
boolean isDurable,AMQShortString[] bindingKeys, 
+                                 boolean browseOnly, Integer maxDeliveryCount)
     {
         if ( (ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(exchangeClass) || 
               ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exchangeClass))  
@@ -154,6 +160,7 @@ public abstract class AMQDestination imp
         _isDurable = isDurable;
         _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new 
AMQShortString[0] : bindingKeys;
         _browseOnly = browseOnly;
+        _maxDeliveryCount = maxDeliveryCount;
     }
 
     public AMQShortString getEncodedName()
@@ -207,7 +214,6 @@ public abstract class AMQDestination imp
         // calculated URL now out of date
         _url = null;
         _urlAsShortString = null;
-        _byteEncoding = null;
     }
 
     public AMQShortString getRoutingKey()
@@ -309,7 +315,6 @@ public abstract class AMQDestination imp
                     sb.append(bindingKey);
                     sb.append("'");
                     sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
-
                 }
             }
 
@@ -333,6 +338,14 @@ public abstract class AMQDestination imp
                 sb.append("='true'");
                 sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
             }
+            
+            
+            if (_maxDeliveryCount != null)
+            {
+                sb.append(BindingURL.OPTION_MAX_DELIVERY_COUNT);
+                sb.append("='" + _maxDeliveryCount + "'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
 
             //removeKey the last char '?' if there is no options , ',' if 
there are.
             sb.deleteCharAt(sb.length() - 1);
@@ -343,54 +356,6 @@ public abstract class AMQDestination imp
         return url;
     }
 
-    public byte[] toByteEncoding()
-    {
-        byte[] encoding = _byteEncoding;
-        if(encoding == null)
-        {
-            int size = _exchangeClass.length() + 1 +
-                       _exchangeName.length() + 1 +
-                       0 +  // in place of the destination name
-                       (_queueName == null ? 0 : _queueName.length()) + 1 +
-                       1;
-            encoding = new byte[size];
-            int pos = 0;
-
-            pos = _exchangeClass.writeToByteArray(encoding, pos);
-            pos = _exchangeName.writeToByteArray(encoding, pos);
-
-            encoding[pos++] = (byte)0;
-
-            if(_queueName == null)
-            {
-                encoding[pos++] = (byte)0;
-            }
-            else
-            {
-                pos = _queueName.writeToByteArray(encoding,pos);
-            }
-            byte options = 0;
-            if(_isDurable)
-            {
-                options |= IS_DURABLE_MASK;
-            }
-            if(_isExclusive)
-            {
-                options |= IS_EXCLUSIVE_MASK;
-            }
-            if(_isAutoDelete)
-            {
-                options |= IS_AUTODELETE_MASK;
-            }
-            encoding[pos] = options;
-
-
-            _byteEncoding = encoding;
-
-        }
-        return encoding;
-    }
-
     public boolean equals(Object o)
     {
         if (this == o)
@@ -444,53 +409,6 @@ public abstract class AMQDestination imp
                 null);          // factory location
     }
 
-
-    public static Destination createDestination(byte[] byteEncodedDestination)
-    {
-        AMQShortString exchangeClass;
-        AMQShortString exchangeName;
-        AMQShortString routingKey;
-        AMQShortString queueName;
-        boolean isDurable;
-        boolean isExclusive;
-        boolean isAutoDelete;
-
-        int pos = 0;
-        exchangeClass = 
AMQShortString.readFromByteArray(byteEncodedDestination, pos);
-        pos+= exchangeClass.length() + 1;
-        exchangeName =  
AMQShortString.readFromByteArray(byteEncodedDestination, pos);
-        pos+= exchangeName.length() + 1;
-        routingKey =  AMQShortString.readFromByteArray(byteEncodedDestination, 
pos);
-        pos+= (routingKey == null ? 0 : routingKey.length()) + 1;
-        queueName =  AMQShortString.readFromByteArray(byteEncodedDestination, 
pos);
-        pos+= (queueName == null ? 0 : queueName.length()) + 1;
-        int options = byteEncodedDestination[pos];
-        isDurable = (options & IS_DURABLE_MASK) != 0;
-        isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
-        isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
-
-        if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
-        {
-            return new 
AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable);
-        }
-        else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
-        {
-            return new 
AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable);
-        }
-        else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
-        {
-            return new AMQHeadersExchange(routingKey);
-        }
-        else
-        {
-            return new AMQAnyDestination(exchangeName,exchangeClass,
-                                         routingKey,isExclusive, 
-                                         isAutoDelete,queueName, 
-                                         isDurable, new AMQShortString[0]);
-        }
-
-    }
-
     public static Destination createDestination(BindingURL binding)
     {
         AMQShortString type = binding.getExchangeClass();
@@ -517,4 +435,15 @@ public abstract class AMQDestination imp
     {
         return _browseOnly;
     }
+
+    /**
+     * The maximum times a consumer should attempt delivery before rejecting 
the message
+     * without requesting it be re-queued.
+     * 
+     * @return the Integer value, or null if the option was not set.
+     */
+    public Integer getMaxDeliveryCount()
+    {
+        return _maxDeliveryCount;
+    }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Dec  7 12:23:38 2010
@@ -1709,6 +1709,7 @@ public abstract class AMQSession<C exten
                     //fall through
                 case Session.AUTO_ACKNOWLEDGE:
                     //check the last message asynchronously delivered via 
auto-ack
+                    //This will be null unless an async auto-ack delivery is 
in progress
                     Long tag = getLastAsyncAutoAckDeliveryTag();
                     clearLastAsyncAutoAckDeliveryTag();
                     

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Dec  7 12:23:38 2010
@@ -21,6 +21,7 @@
 package org.apache.qpid.client;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -112,11 +113,10 @@ public abstract class BasicMessageConsum
      */
     protected final int _acknowledgeMode;
 
-    private int _idMapSize = 100;//TODO: set by configuration
-    private int _maxDeliveryAttempts = 3; //TODO: set by configuration
-    private boolean _maxRedeliverEnabled = true;//TODO set based on above 
config
+    private int _maxDeliveryAttempts = 0;
+    private boolean _maxRedeliverEnabled = false;
 
-    final DeliveryCountTracker _tracker = new 
DeliveryCountTracker(_idMapSize);//TODO
+    private final DeliveryCountTracker _tracker;
 
     /**
      * The thread that was used to call receive(). This is important for being 
able to interrupt that thread if a
@@ -174,6 +174,13 @@ public abstract class BasicMessageConsum
         {
             _acknowledgeMode = acknowledgeMode;
         }
+
+        //set configuration + create tracker for Max Delivery Count
+        int idMapSize = 
Integer.getInteger(ClientProperties.MAX_DELIVERY_RECORDS_PROP_NAME, 2 * 
_prefetchHigh);
+        Integer maxDeliveries = destination.getMaxDeliveryCount();
+        _maxDeliveryAttempts = maxDeliveries == null ? 
connection.getMaxDeliveryCount() : maxDeliveries;
+        _maxRedeliverEnabled = _maxDeliveryAttempts > 0;
+        _tracker = isMaxDeliveryCountEnforced() ? new 
DeliveryCountTracker(idMapSize) : null;
     }
 
     public AMQDestination getDestination()
@@ -1036,6 +1043,7 @@ public abstract class BasicMessageConsum
     {
         return _maxDeliveryAttempts;
     }
+
     public void removeDeliveryCountRecordsForMessage(long deliveryTag)
     {
         if(isMaxDeliveryCountEnforced())

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
 Tue Dec  7 12:23:38 2010
@@ -70,7 +70,18 @@ public class ClientProperties
      * heartbeat in TuneOK will be used
      */
     public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
+
+    /**
+     * System property to set default value for maximum delivery count on a 
client-wide basis, 
+     * unless overridden by the more specific settings at connection or 
binding URL level.
+     */
+    public static final String MAX_DELIVERY_COUNT_PROP_NAME = 
"qpid.max.delivery.count";
     
+    /**
+     * System property to set default value for maximum number of retained 
MessageID records
+     * per-consumer for use in enforcing maximum delivery count.
+     */
+    public static final String MAX_DELIVERY_RECORDS_PROP_NAME = 
"qpid.max.delivery.records";
 
      /**
      * ==========================================================

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
 Tue Dec  7 12:23:38 2010
@@ -41,6 +41,7 @@ public interface ConnectionURL
     public static final String OPTIONS_BROKERLIST = "brokerlist";
     public static final String OPTIONS_FAILOVER = "failover";
     public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+    public static final String OPTIONS_MAX_DELIVERY_COUNT = "maxdeliverycount";
     public static final String OPTIONS_SSL = "ssl";
     public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = 
"defaultTopicExchange";
     public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = 
"defaultQueueExchange";

Added: 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java?rev=1042998&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
 Tue Dec  7 12:23:38 2010
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.configuration.ClientProperties;
+import org.apache.qpid.jms.ConnectionURL;
+
+public class AMQConnectionUnitTest extends TestCase
+{
+    public void testMaxDeliveryCountPresent() throws Exception
+    {
+        String url = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='3'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        //check the max delivery count option is successfully passed through 
to the AMQConnection
+        AMQConnection conn = new MockAMQConnection(connectionURL, null);
+        assertEquals("Max Delivery Count option was not as expected", 3, 
+                conn.getMaxDeliveryCount());
+    }
+    
+    public void testMaxDeliveryCountNotPresent() throws Exception
+    {
+        String url = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        //check the max delivery count value defaults to 0 when no url option 
or sys prop is specified.
+        AMQConnection conn = new MockAMQConnection(connectionURL, null);
+        assertEquals("Max Delivery Count option was not as expected", 0, 
+                conn.getMaxDeliveryCount());
+    }
+    
+    public void testMaxDeliverySystemProperty() throws Exception
+    {
+        String oldSysPropValue = System.setProperty(
+                ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, "15");
+        try
+        {
+            String url = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+            ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+            //check the max delivery count system property was successfully 
picked up and
+            //the value seet for the AMQConnection
+            AMQConnection conn = new MockAMQConnection(connectionURL, null);
+            assertEquals("Max Delivery Count option was not as expected", 15, 
+                    conn.getMaxDeliveryCount());
+        }
+        finally
+        {
+            if(oldSysPropValue != null)
+            {
+                System.setProperty(
+                        ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, 
oldSysPropValue);
+            }
+            else
+            {
+                
System.clearProperty(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME);
+            }
+        }
+    }
+    
+    public void testMaxDeliveryUrlOptionOverridesSystemProperty() throws 
Exception
+    {
+        String oldSysPropValue = System.setProperty(
+                ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, "15");
+        try
+        {
+            String url = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='5'";
+            ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+            //check the max delivery count system property was overridden by
+            //the specified connectionURL option value
+            AMQConnection conn = new MockAMQConnection(connectionURL, null);
+            assertEquals("Max Delivery Count option was not as expected", 5, 
+                    conn.getMaxDeliveryCount());
+        }
+        finally
+        {
+            if(oldSysPropValue != null)
+            {
+                System.setProperty(
+                        ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, 
oldSysPropValue);
+            }
+            else
+            {
+                
System.clearProperty(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME);
+            }
+        }
+    }
+}

Added: 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java?rev=1042998&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
 Tue Dec  7 12:23:38 2010
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Session;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.unit.message.TestAMQSession;
+import org.apache.qpid.url.AMQBindingURL;
+
+import junit.framework.TestCase;
+
+public class BasicMessageConsumer_0_8_Test extends TestCase
+{
+    /**
+     * Test that if there is a value for Max Delivery Count specified for the 
Destination
+     * used to create the Consumer, it overrides the value for the Connection.
+     */
+    public void testDestinationMaxDeliveryCountOverridesConnection() throws 
Exception
+    {
+        /*
+         * Check that when the connection does not have a value applied that 
this
+         * is successfully overridden with a specific value by the consumer.
+         */
+        String connUrlString = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+        ConnectionURL connectionURL = new AMQConnectionURL(connUrlString);
+        AMQConnection conn = new MockAMQConnection(connectionURL, null);
+
+        assertEquals("Max Delivery Count option was not as expected", 0,
+                conn.getMaxDeliveryCount());
+
+        String url = 
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='1'";
+        AMQBindingURL burl = new AMQBindingURL(url);
+        AMQDestination queue = new AMQQueue(burl);
+        
+        AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> 
testSession = new TestAMQSession();
+        BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, 
conn, queue, "", false, null, testSession, null, null, 10, 5, false, 
Session.SESSION_TRANSACTED, false, false);
+
+        assertEquals("Max Delivery Attempts was was not as expected", 1, 
consumer.getMaxDeliveryAttempts());
+        assertTrue("Max Delivery should have been enforced", 
consumer.isMaxDeliveryCountEnforced());
+
+        /*
+         * Check that when the connection does have a specific value applied 
that this
+         * is successfully overridden with another specific value by the 
consumer.
+         */
+        connUrlString = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='5'";
+        connectionURL = new AMQConnectionURL(connUrlString);
+        conn = new MockAMQConnection(connectionURL, null);
+
+        assertEquals("Max Delivery Count option was not as expected", 5,
+                conn.getMaxDeliveryCount());
+
+        url = 
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='3'";
+        burl = new AMQBindingURL(url);
+        queue = new AMQQueue(burl);
+
+        consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, 
null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, 
false);
+
+        assertEquals("Max Delivery Attempts was was not as expected", 3, 
consumer.getMaxDeliveryAttempts());
+        assertTrue("Max Delivery should have been enforced", 
consumer.isMaxDeliveryCountEnforced());
+        
+        /*
+         * Check also that when the connection does have a specific value 
applied (5 above) that this
+         * can be successfully overridden and disabled by the destination 
value being set to 0.
+         */
+        url = 
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='0'";
+        burl = new AMQBindingURL(url);
+        queue = new AMQQueue(burl);
+        
+        consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, 
null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, 
false);
+
+        assertEquals("Max Delivery Count option was not as expected", 5, 
conn.getMaxDeliveryCount());
+        assertEquals("Max Delivery Attempts was was not as expected", 0, 
consumer.getMaxDeliveryAttempts());
+        assertFalse("Max Delivery should not have been enforced", 
consumer.isMaxDeliveryCountEnforced());
+    }
+
+    /**
+     * Test that if no value for MaxDeliveryCount is applied to the 
Destination, then the value 
+     * from the connection is used and acts as expected.
+     */
+    public void testMaxDeliveryCountDetectedFromConnection() throws Exception
+    {
+        /*
+         * Check that when the connection does have a specific value applied 
that this
+         * is successfully detected by the consumer.
+         */
+        String connUrlString = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='5'";
+        ConnectionURL connectionURL = new AMQConnectionURL(connUrlString);
+        AMQConnection conn = new MockAMQConnection(connectionURL, null);
+
+        String url = "exchangeClass://exchangeName/Destination/Queue";
+        AMQBindingURL burl = new AMQBindingURL(url);
+        AMQDestination queue = new AMQQueue(burl);
+        
+        assertEquals("Max Delivery Count should have been null", null, 
queue.getMaxDeliveryCount());
+        
+        AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> 
testSession = new TestAMQSession();
+        BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, 
conn, queue, "", false, null, testSession, null, null, 10, 5, false, 
Session.SESSION_TRANSACTED, false, false);
+
+        assertEquals("Max Delivery Attempts was was not as expected", 5, 
consumer.getMaxDeliveryAttempts());
+        assertTrue("Max Delivery should have been enforced", 
consumer.isMaxDeliveryCountEnforced());
+        
+        /*
+         * Also verify that when the connection does not have a value applied 
and defaults to 0, that
+         * this is successfully detected by the consumer and disables the Max 
Delivery feature.
+         */
+        connUrlString = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+        connectionURL = new AMQConnectionURL(connUrlString);
+        conn = new MockAMQConnection(connectionURL, null);
+        
+        consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, 
null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, 
false);
+        assertEquals("Max Delivery Attempts was was not as expected", 0, 
consumer.getMaxDeliveryAttempts());
+        assertFalse("Max Delivery should not have been enforced", 
consumer.isMaxDeliveryCountEnforced());
+    }
+}

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
 Tue Dec  7 12:23:38 2010
@@ -23,7 +23,9 @@ package org.apache.qpid.test.unit.client
 import junit.framework.TestCase;
 
 import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.MockAMQConnection;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.url.URLSyntaxException;
@@ -549,6 +551,38 @@ public class ConnectionURLTest extends T
         assertTrue("String representation should contain options and values", 
url.toString().contains("maxprefetch='12345'"));
     }
 
+    public void testMaxDeliveryCountPresent() throws Exception
+    {
+        String url = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='3'";
+
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        assertTrue(connectionURL.getFailoverMethod() == null);
+        assertTrue(connectionURL.getUsername().equals("guest"));
+        assertTrue(connectionURL.getPassword().equals("guest"));
+        assertTrue(connectionURL.getVirtualHost().equals("/test"));
+        
+        //check that the max delivery count option is returned as expected
+        assertEquals("Max Delivery Count option was not as expected", "3", 
+                
connectionURL.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT));
+    }
+    
+    public void testMaxDeliveryCountNotPresent() throws URLSyntaxException
+    {
+        String url = 
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        assertTrue(connectionurl.getFailoverMethod() == null);
+        assertTrue(connectionurl.getUsername().equals("guest"));
+        assertTrue(connectionurl.getPassword().equals("guest"));
+        assertTrue(connectionurl.getVirtualHost().equals("/test"));
+        
+        //check that the max delivery count option is null as expected
+        assertEquals("Max Delivery Count option was not as expected", null, 
+                
connectionurl.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT));
+    }
+
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(ConnectionURLTest.class);

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
 Tue Dec  7 12:23:38 2010
@@ -22,8 +22,10 @@ package org.apache.qpid.test.unit.client
 
 import junit.framework.TestCase;
 
+import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -189,6 +191,67 @@ public class DestinationURLTest extends 
         assertTrue(dest.getExchangeName().equals("amq.topic"));
         assertTrue(dest.getQueueName().equals("test:testQueueD"));
     }
+    
+    public void testMaxDeliveryCountPresent() throws URISyntaxException
+    {
+        String url = 
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='5'";
+
+        AMQBindingURL burl = new AMQBindingURL(url);
+
+        assertTrue(url.equals(burl.toString()));
+        assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+        assertTrue(burl.getExchangeName().equals("exchangeName"));
+        assertTrue(burl.getDestinationName().equals("Destination"));
+        assertTrue(burl.getQueueName().equals("Queue"));
+        
+        //check that the MaxDeliveryCount property has the right value
+        assertTrue(burl.getOption("maxdeliverycount").equals("5"));
+        
+        //check that the MaxDeliveryCount value is correctly returned from an 
AMQDestination
+        class MyTestAMQDestination extends AMQDestination
+        {
+            public MyTestAMQDestination(BindingURL url)
+            {
+                super(url);
+            }
+            public boolean isNameRequired()
+            {
+                return false;
+            }
+        };
+        
+        AMQDestination dest = new MyTestAMQDestination(burl);
+        assertEquals("Max Delivery Count should have been 5", 
Integer.valueOf(5), dest.getMaxDeliveryCount());
+    }
+
+    public void testMaxDeliveryCountNotPresent() throws URISyntaxException
+    {
+        String url = "exchangeClass://exchangeName/Destination/Queue";
+
+        AMQBindingURL burl = new AMQBindingURL(url);
+
+        assertTrue(url.equals(burl.toString()));
+
+        assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+        assertTrue(burl.getExchangeName().equals("exchangeName"));
+        assertTrue(burl.getDestinationName().equals("Destination"));
+        assertTrue(burl.getQueueName().equals("Queue"));
+        
+        class MyTestAMQDestination extends AMQDestination
+        {
+            public MyTestAMQDestination(BindingURL url)
+            {
+                super(url);
+            }
+            public boolean isNameRequired()
+            {
+                return false;
+            }
+        };
+        
+        AMQDestination dest = new MyTestAMQDestination(burl);
+        assertEquals("Max Delivery Count should have been null", null, 
dest.getMaxDeliveryCount());
+    }
 
     public static junit.framework.Test suite()
     {

Modified: 
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
 Tue Dec  7 12:23:38 2010
@@ -36,6 +36,7 @@ public interface BindingURL
     public static final String OPTION_SUBSCRIPTION = "subscription";
     public static final String OPTION_ROUTING_KEY = "routingkey";
     public static final String OPTION_BINDING_KEY = "bindingkey";
+    public static final String OPTION_MAX_DELIVERY_COUNT = "maxdeliverycount";
 
 
     String getURL();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to