Author: robbie
Date: Tue Dec 7 12:23:38 2010
New Revision: 1042998
URL: http://svn.apache.org/viewvc?rev=1042998&view=rev
Log:
QPID-2972: client configuration for Max Delivery Count
Added:
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Tue Dec 7 12:23:38 2010
@@ -325,6 +325,11 @@ public class AMQConnection extends Close
//By default it's async publish
private String _syncPublish = "";
+ /* Indicates the maximum number of times an individual consumer
+ * created on this connection can see a specific MessageID
+ * before it should reject the message during rollback/recover */
+ private int _maxDeliveryCount;
+
/**
* @param broker brokerdetails
* @param username username
@@ -454,6 +459,18 @@ public class AMQConnection extends Close
_syncPublish =
System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT)
!= null)
+ {
+ _maxDeliveryCount = Integer.parseInt(
+
connectionURL.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT));
+ }
+ else
+ {
+ // use the default value for all connections, if set
+ _maxDeliveryCount =
Integer.getInteger(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, 0);
+ }
+
+
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails =
_failoverPolicy.getCurrentBrokerDetails();
if (brokerDetails.getTransport().equals(BrokerDetails.VM))
@@ -1596,4 +1613,15 @@ public class AMQConnection extends Close
{
return _sessions.getNextChannelId();
}
+
+ /**
+ * Returns the MaxDeliveryCount value applied to this connection either
via a
+ * ConnectionURL option or system property. If no setting was made this
defaults to 0;
+ *
+ * @return the value (0 if none was set)
+ */
+ public int getMaxDeliveryCount()
+ {
+ return _maxDeliveryCount;
+ }
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
Tue Dec 7 12:23:38 2010
@@ -55,6 +55,8 @@ public abstract class AMQDestination imp
private AMQShortString[] _bindingKeys;
+ private Integer _maxDeliveryCount;
+
private String _url;
private AMQShortString _urlAsShortString;
@@ -62,11 +64,6 @@ public abstract class AMQDestination imp
private boolean _exchangeExistsChecked;
- private byte[] _byteEncoding;
- private static final int IS_DURABLE_MASK = 0x1;
- private static final int IS_EXCLUSIVE_MASK = 0x2;
- private static final int IS_AUTODELETE_MASK = 0x4;
-
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
@@ -88,6 +85,8 @@ public abstract class AMQDestination imp
_queueName = binding.getQueueName() == null ? null :
binding.getQueueName();
_routingKey = binding.getRoutingKey() == null ? null :
binding.getRoutingKey();
_bindingKeys = binding.getBindingKeys() == null ||
binding.getBindingKeys().length == 0 ? new AMQShortString[0] :
binding.getBindingKeys();
+ String count = binding.getOption(BindingURL.OPTION_MAX_DELIVERY_COUNT);
+ _maxDeliveryCount = count == null ? null : Integer.parseInt(count);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString routingKey, AMQShortString queueName)
@@ -129,7 +128,14 @@ public abstract class AMQDestination imp
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString routingKey, boolean isExclusive,
- boolean isAutoDelete, AMQShortString queueName,
boolean isDurable,AMQShortString[] bindingKeys, boolean browseOnly)
+ boolean isAutoDelete, AMQShortString queueName, boolean
isDurable,AMQShortString[] bindingKeys, boolean browseOnly)
+ {
+ this (exchangeName, exchangeClass, routingKey,
isExclusive,isAutoDelete,queueName,isDurable,bindingKeys, browseOnly, null);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName,
boolean isDurable,AMQShortString[] bindingKeys,
+ boolean browseOnly, Integer maxDeliveryCount)
{
if ( (ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(exchangeClass) ||
ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exchangeClass))
@@ -154,6 +160,7 @@ public abstract class AMQDestination imp
_isDurable = isDurable;
_bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new
AMQShortString[0] : bindingKeys;
_browseOnly = browseOnly;
+ _maxDeliveryCount = maxDeliveryCount;
}
public AMQShortString getEncodedName()
@@ -207,7 +214,6 @@ public abstract class AMQDestination imp
// calculated URL now out of date
_url = null;
_urlAsShortString = null;
- _byteEncoding = null;
}
public AMQShortString getRoutingKey()
@@ -309,7 +315,6 @@ public abstract class AMQDestination imp
sb.append(bindingKey);
sb.append("'");
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
-
}
}
@@ -333,6 +338,14 @@ public abstract class AMQDestination imp
sb.append("='true'");
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
+
+
+ if (_maxDeliveryCount != null)
+ {
+ sb.append(BindingURL.OPTION_MAX_DELIVERY_COUNT);
+ sb.append("='" + _maxDeliveryCount + "'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
//removeKey the last char '?' if there is no options , ',' if
there are.
sb.deleteCharAt(sb.length() - 1);
@@ -343,54 +356,6 @@ public abstract class AMQDestination imp
return url;
}
- public byte[] toByteEncoding()
- {
- byte[] encoding = _byteEncoding;
- if(encoding == null)
- {
- int size = _exchangeClass.length() + 1 +
- _exchangeName.length() + 1 +
- 0 + // in place of the destination name
- (_queueName == null ? 0 : _queueName.length()) + 1 +
- 1;
- encoding = new byte[size];
- int pos = 0;
-
- pos = _exchangeClass.writeToByteArray(encoding, pos);
- pos = _exchangeName.writeToByteArray(encoding, pos);
-
- encoding[pos++] = (byte)0;
-
- if(_queueName == null)
- {
- encoding[pos++] = (byte)0;
- }
- else
- {
- pos = _queueName.writeToByteArray(encoding,pos);
- }
- byte options = 0;
- if(_isDurable)
- {
- options |= IS_DURABLE_MASK;
- }
- if(_isExclusive)
- {
- options |= IS_EXCLUSIVE_MASK;
- }
- if(_isAutoDelete)
- {
- options |= IS_AUTODELETE_MASK;
- }
- encoding[pos] = options;
-
-
- _byteEncoding = encoding;
-
- }
- return encoding;
- }
-
public boolean equals(Object o)
{
if (this == o)
@@ -444,53 +409,6 @@ public abstract class AMQDestination imp
null); // factory location
}
-
- public static Destination createDestination(byte[] byteEncodedDestination)
- {
- AMQShortString exchangeClass;
- AMQShortString exchangeName;
- AMQShortString routingKey;
- AMQShortString queueName;
- boolean isDurable;
- boolean isExclusive;
- boolean isAutoDelete;
-
- int pos = 0;
- exchangeClass =
AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= exchangeClass.length() + 1;
- exchangeName =
AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= exchangeName.length() + 1;
- routingKey = AMQShortString.readFromByteArray(byteEncodedDestination,
pos);
- pos+= (routingKey == null ? 0 : routingKey.length()) + 1;
- queueName = AMQShortString.readFromByteArray(byteEncodedDestination,
pos);
- pos+= (queueName == null ? 0 : queueName.length()) + 1;
- int options = byteEncodedDestination[pos];
- isDurable = (options & IS_DURABLE_MASK) != 0;
- isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
- isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
-
- if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
- {
- return new
AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable);
- }
- else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
- {
- return new
AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable);
- }
- else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
- {
- return new AMQHeadersExchange(routingKey);
- }
- else
- {
- return new AMQAnyDestination(exchangeName,exchangeClass,
- routingKey,isExclusive,
- isAutoDelete,queueName,
- isDurable, new AMQShortString[0]);
- }
-
- }
-
public static Destination createDestination(BindingURL binding)
{
AMQShortString type = binding.getExchangeClass();
@@ -517,4 +435,15 @@ public abstract class AMQDestination imp
{
return _browseOnly;
}
+
+ /**
+ * The maximum times a consumer should attempt delivery before rejecting
the message
+ * without requesting it be re-queued.
+ *
+ * @return the Integer value, or null if the option was not set.
+ */
+ public Integer getMaxDeliveryCount()
+ {
+ return _maxDeliveryCount;
+ }
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Dec 7 12:23:38 2010
@@ -1709,6 +1709,7 @@ public abstract class AMQSession<C exten
//fall through
case Session.AUTO_ACKNOWLEDGE:
//check the last message asynchronously delivered via
auto-ack
+ //This will be null unless an async auto-ack delivery is
in progress
Long tag = getLastAsyncAutoAckDeliveryTag();
clearLastAsyncAutoAckDeliveryTag();
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Tue Dec 7 12:23:38 2010
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -112,11 +113,10 @@ public abstract class BasicMessageConsum
*/
protected final int _acknowledgeMode;
- private int _idMapSize = 100;//TODO: set by configuration
- private int _maxDeliveryAttempts = 3; //TODO: set by configuration
- private boolean _maxRedeliverEnabled = true;//TODO set based on above
config
+ private int _maxDeliveryAttempts = 0;
+ private boolean _maxRedeliverEnabled = false;
- final DeliveryCountTracker _tracker = new
DeliveryCountTracker(_idMapSize);//TODO
+ private final DeliveryCountTracker _tracker;
/**
* The thread that was used to call receive(). This is important for being
able to interrupt that thread if a
@@ -174,6 +174,13 @@ public abstract class BasicMessageConsum
{
_acknowledgeMode = acknowledgeMode;
}
+
+ //set configuration + create tracker for Max Delivery Count
+ int idMapSize =
Integer.getInteger(ClientProperties.MAX_DELIVERY_RECORDS_PROP_NAME, 2 *
_prefetchHigh);
+ Integer maxDeliveries = destination.getMaxDeliveryCount();
+ _maxDeliveryAttempts = maxDeliveries == null ?
connection.getMaxDeliveryCount() : maxDeliveries;
+ _maxRedeliverEnabled = _maxDeliveryAttempts > 0;
+ _tracker = isMaxDeliveryCountEnforced() ? new
DeliveryCountTracker(idMapSize) : null;
}
public AMQDestination getDestination()
@@ -1036,6 +1043,7 @@ public abstract class BasicMessageConsum
{
return _maxDeliveryAttempts;
}
+
public void removeDeliveryCountRecordsForMessage(long deliveryTag)
{
if(isMaxDeliveryCountEnforced())
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
Tue Dec 7 12:23:38 2010
@@ -70,7 +70,18 @@ public class ClientProperties
* heartbeat in TuneOK will be used
*/
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
+
+ /**
+ * System property to set default value for maximum delivery count on a
client-wide basis,
+ * unless overridden by the more specific settings at connection or
binding URL level.
+ */
+ public static final String MAX_DELIVERY_COUNT_PROP_NAME =
"qpid.max.delivery.count";
+ /**
+ * System property to set default value for maximum number of retained
MessageID records
+ * per-consumer for use in enforcing maximum delivery count.
+ */
+ public static final String MAX_DELIVERY_RECORDS_PROP_NAME =
"qpid.max.delivery.records";
/**
* ==========================================================
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
Tue Dec 7 12:23:38 2010
@@ -41,6 +41,7 @@ public interface ConnectionURL
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public static final String OPTIONS_MAX_DELIVERY_COUNT = "maxdeliverycount";
public static final String OPTIONS_SSL = "ssl";
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE =
"defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE =
"defaultQueueExchange";
Added:
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java?rev=1042998&view=auto
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
(added)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
Tue Dec 7 12:23:38 2010
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.configuration.ClientProperties;
+import org.apache.qpid.jms.ConnectionURL;
+
+public class AMQConnectionUnitTest extends TestCase
+{
+ public void testMaxDeliveryCountPresent() throws Exception
+ {
+ String url =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='3'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ //check the max delivery count option is successfully passed through
to the AMQConnection
+ AMQConnection conn = new MockAMQConnection(connectionURL, null);
+ assertEquals("Max Delivery Count option was not as expected", 3,
+ conn.getMaxDeliveryCount());
+ }
+
+ public void testMaxDeliveryCountNotPresent() throws Exception
+ {
+ String url =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ //check the max delivery count value defaults to 0 when no url option
or sys prop is specified.
+ AMQConnection conn = new MockAMQConnection(connectionURL, null);
+ assertEquals("Max Delivery Count option was not as expected", 0,
+ conn.getMaxDeliveryCount());
+ }
+
+ public void testMaxDeliverySystemProperty() throws Exception
+ {
+ String oldSysPropValue = System.setProperty(
+ ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, "15");
+ try
+ {
+ String url =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ //check the max delivery count system property was successfully
picked up and
+ //the value seet for the AMQConnection
+ AMQConnection conn = new MockAMQConnection(connectionURL, null);
+ assertEquals("Max Delivery Count option was not as expected", 15,
+ conn.getMaxDeliveryCount());
+ }
+ finally
+ {
+ if(oldSysPropValue != null)
+ {
+ System.setProperty(
+ ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME,
oldSysPropValue);
+ }
+ else
+ {
+
System.clearProperty(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME);
+ }
+ }
+ }
+
+ public void testMaxDeliveryUrlOptionOverridesSystemProperty() throws
Exception
+ {
+ String oldSysPropValue = System.setProperty(
+ ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, "15");
+ try
+ {
+ String url =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='5'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ //check the max delivery count system property was overridden by
+ //the specified connectionURL option value
+ AMQConnection conn = new MockAMQConnection(connectionURL, null);
+ assertEquals("Max Delivery Count option was not as expected", 5,
+ conn.getMaxDeliveryCount());
+ }
+ finally
+ {
+ if(oldSysPropValue != null)
+ {
+ System.setProperty(
+ ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME,
oldSysPropValue);
+ }
+ else
+ {
+
System.clearProperty(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME);
+ }
+ }
+ }
+}
Added:
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java?rev=1042998&view=auto
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
(added)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
Tue Dec 7 12:23:38 2010
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Session;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.unit.message.TestAMQSession;
+import org.apache.qpid.url.AMQBindingURL;
+
+import junit.framework.TestCase;
+
+public class BasicMessageConsumer_0_8_Test extends TestCase
+{
+ /**
+ * Test that if there is a value for Max Delivery Count specified for the
Destination
+ * used to create the Consumer, it overrides the value for the Connection.
+ */
+ public void testDestinationMaxDeliveryCountOverridesConnection() throws
Exception
+ {
+ /*
+ * Check that when the connection does not have a value applied that
this
+ * is successfully overridden with a specific value by the consumer.
+ */
+ String connUrlString =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ ConnectionURL connectionURL = new AMQConnectionURL(connUrlString);
+ AMQConnection conn = new MockAMQConnection(connectionURL, null);
+
+ assertEquals("Max Delivery Count option was not as expected", 0,
+ conn.getMaxDeliveryCount());
+
+ String url =
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='1'";
+ AMQBindingURL burl = new AMQBindingURL(url);
+ AMQDestination queue = new AMQQueue(burl);
+
+ AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
testSession = new TestAMQSession();
+ BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0,
conn, queue, "", false, null, testSession, null, null, 10, 5, false,
Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Max Delivery Attempts was was not as expected", 1,
consumer.getMaxDeliveryAttempts());
+ assertTrue("Max Delivery should have been enforced",
consumer.isMaxDeliveryCountEnforced());
+
+ /*
+ * Check that when the connection does have a specific value applied
that this
+ * is successfully overridden with another specific value by the
consumer.
+ */
+ connUrlString =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='5'";
+ connectionURL = new AMQConnectionURL(connUrlString);
+ conn = new MockAMQConnection(connectionURL, null);
+
+ assertEquals("Max Delivery Count option was not as expected", 5,
+ conn.getMaxDeliveryCount());
+
+ url =
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='3'";
+ burl = new AMQBindingURL(url);
+ queue = new AMQQueue(burl);
+
+ consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false,
null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false,
false);
+
+ assertEquals("Max Delivery Attempts was was not as expected", 3,
consumer.getMaxDeliveryAttempts());
+ assertTrue("Max Delivery should have been enforced",
consumer.isMaxDeliveryCountEnforced());
+
+ /*
+ * Check also that when the connection does have a specific value
applied (5 above) that this
+ * can be successfully overridden and disabled by the destination
value being set to 0.
+ */
+ url =
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='0'";
+ burl = new AMQBindingURL(url);
+ queue = new AMQQueue(burl);
+
+ consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false,
null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false,
false);
+
+ assertEquals("Max Delivery Count option was not as expected", 5,
conn.getMaxDeliveryCount());
+ assertEquals("Max Delivery Attempts was was not as expected", 0,
consumer.getMaxDeliveryAttempts());
+ assertFalse("Max Delivery should not have been enforced",
consumer.isMaxDeliveryCountEnforced());
+ }
+
+ /**
+ * Test that if no value for MaxDeliveryCount is applied to the
Destination, then the value
+ * from the connection is used and acts as expected.
+ */
+ public void testMaxDeliveryCountDetectedFromConnection() throws Exception
+ {
+ /*
+ * Check that when the connection does have a specific value applied
that this
+ * is successfully detected by the consumer.
+ */
+ String connUrlString =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='5'";
+ ConnectionURL connectionURL = new AMQConnectionURL(connUrlString);
+ AMQConnection conn = new MockAMQConnection(connectionURL, null);
+
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+ AMQBindingURL burl = new AMQBindingURL(url);
+ AMQDestination queue = new AMQQueue(burl);
+
+ assertEquals("Max Delivery Count should have been null", null,
queue.getMaxDeliveryCount());
+
+ AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
testSession = new TestAMQSession();
+ BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0,
conn, queue, "", false, null, testSession, null, null, 10, 5, false,
Session.SESSION_TRANSACTED, false, false);
+
+ assertEquals("Max Delivery Attempts was was not as expected", 5,
consumer.getMaxDeliveryAttempts());
+ assertTrue("Max Delivery should have been enforced",
consumer.isMaxDeliveryCountEnforced());
+
+ /*
+ * Also verify that when the connection does not have a value applied
and defaults to 0, that
+ * this is successfully detected by the consumer and disables the Max
Delivery feature.
+ */
+ connUrlString =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'";
+ connectionURL = new AMQConnectionURL(connUrlString);
+ conn = new MockAMQConnection(connectionURL, null);
+
+ consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false,
null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false,
false);
+ assertEquals("Max Delivery Attempts was was not as expected", 0,
consumer.getMaxDeliveryAttempts());
+ assertFalse("Max Delivery should not have been enforced",
consumer.isMaxDeliveryCountEnforced());
+ }
+}
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
Tue Dec 7 12:23:38 2010
@@ -23,7 +23,9 @@ package org.apache.qpid.test.unit.client
import junit.framework.TestCase;
import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.URLSyntaxException;
@@ -549,6 +551,38 @@ public class ConnectionURLTest extends T
assertTrue("String representation should contain options and values",
url.toString().contains("maxprefetch='12345'"));
}
+ public void testMaxDeliveryCountPresent() throws Exception
+ {
+ String url =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&maxdeliverycount='3'";
+
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertTrue(connectionURL.getFailoverMethod() == null);
+ assertTrue(connectionURL.getUsername().equals("guest"));
+ assertTrue(connectionURL.getPassword().equals("guest"));
+ assertTrue(connectionURL.getVirtualHost().equals("/test"));
+
+ //check that the max delivery count option is returned as expected
+ assertEquals("Max Delivery Count option was not as expected", "3",
+
connectionURL.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT));
+ }
+
+ public void testMaxDeliveryCountNotPresent() throws URLSyntaxException
+ {
+ String url =
"amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ //check that the max delivery count option is null as expected
+ assertEquals("Max Delivery Count option was not as expected", null,
+
connectionurl.getOption(ConnectionURL.OPTIONS_MAX_DELIVERY_COUNT));
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
Tue Dec 7 12:23:38 2010
@@ -22,8 +22,10 @@ package org.apache.qpid.test.unit.client
import junit.framework.TestCase;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,6 +191,67 @@ public class DestinationURLTest extends
assertTrue(dest.getExchangeName().equals("amq.topic"));
assertTrue(dest.getQueueName().equals("test:testQueueD"));
}
+
+ public void testMaxDeliveryCountPresent() throws URISyntaxException
+ {
+ String url =
"exchangeClass://exchangeName/Destination/Queue?maxdeliverycount='5'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ //check that the MaxDeliveryCount property has the right value
+ assertTrue(burl.getOption("maxdeliverycount").equals("5"));
+
+ //check that the MaxDeliveryCount value is correctly returned from an
AMQDestination
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertEquals("Max Delivery Count should have been 5",
Integer.valueOf(5), dest.getMaxDeliveryCount());
+ }
+
+ public void testMaxDeliveryCountNotPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertEquals("Max Delivery Count should have been null", null,
dest.getMaxDeliveryCount());
+ }
public static junit.framework.Test suite()
{
Modified:
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=1042998&r1=1042997&r2=1042998&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
Tue Dec 7 12:23:38 2010
@@ -36,6 +36,7 @@ public interface BindingURL
public static final String OPTION_SUBSCRIPTION = "subscription";
public static final String OPTION_ROUTING_KEY = "routingkey";
public static final String OPTION_BINDING_KEY = "bindingkey";
+ public static final String OPTION_MAX_DELIVERY_COUNT = "maxdeliverycount";
String getURL();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]