Author: rajith
Date: Mon Feb 13 22:30:47 2012
New Revision: 1243719
URL: http://svn.apache.org/viewvc?rev=1243719&view=rev
Log:
QPID-3836 Modified the address handling code to pass the noLocal
argument to queue-declare method.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Feb 13 22:30:47 2012
@@ -1045,7 +1045,7 @@ public abstract class AMQSession<C exten
{
try
{
- handleAddressBasedDestination(dest,false,true);
+ handleAddressBasedDestination(dest,false,noLocal,true);
if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
{
throw new JMSException("Durable subscribers can only be
created for Topics");
@@ -2905,7 +2905,7 @@ public abstract class AMQSession<C exten
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- handleAddressBasedDestination(amqd,true,nowait);
+
handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
}
else
{
@@ -2966,6 +2966,7 @@ public abstract class AMQSession<C exten
public abstract void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws
AMQException;
private void registerProducer(long producerId, MessageProducer producer)
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Mon Feb 13 22:30:47 2012
@@ -766,8 +766,14 @@ public class AMQSession_0_10 extends AMQ
else
{
QueueNode node = (QueueNode)amqd.getSourceNode();
+ Map<String,Object> arguments = new HashMap<String,Object>();
+ arguments.putAll((Map<? extends String, ? extends Object>)
node.getDeclareArgs());
+ if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) ==
null)
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
getQpidSession().queueDeclare(queueName.toString(),
node.getAlternateExchange() ,
- node.getDeclareArgs(),
+ arguments,
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
@@ -1167,13 +1173,14 @@ public class AMQSession_0_10 extends AMQ
@SuppressWarnings("deprecation")
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws
AMQException
{
if (dest.isAddressResolved() &&
dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
{
if (isConsumer && AMQDestination.TOPIC_TYPE ==
dest.getAddressType())
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest,noLocal);
}
}
else
@@ -1202,7 +1209,7 @@ public class AMQSession_0_10 extends AMQ
else if(createNode)
{
setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,false,noWait);
+ send0_10QueueDeclare(dest,null,noLocal,noWait);
sendQueueBind(dest.getAMQQueueName(),
dest.getRoutingKey(),
null,dest.getExchangeName(),dest, false);
break;
@@ -1217,7 +1224,7 @@ public class AMQSession_0_10 extends AMQ
verifySubject(dest);
if (isConsumer &&
!isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest, noLocal);
}
break;
}
@@ -1232,7 +1239,7 @@ public class AMQSession_0_10 extends AMQ
false);
if (isConsumer &&
!isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest,noLocal);
}
break;
}
@@ -1295,7 +1302,7 @@ public class AMQSession_0_10 extends AMQ
}
}
- private void createSubscriptionQueue(AMQDestination dest) throws
AMQException
+ private void createSubscriptionQueue(AMQDestination dest, boolean noLocal)
throws AMQException
{
QueueNode node = (QueueNode)dest.getSourceNode(); // source node is
never null
@@ -1308,7 +1315,7 @@ public class AMQSession_0_10 extends AMQ
}
node.setExclusive(true);
node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,false,true);
+ send0_10QueueDeclare(dest,null,noLocal,true);
getQpidSession().exchangeBind(dest.getQueueName(),
dest.getAddressName(),
dest.getSubject(),
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Mon Feb 13 22:30:47 2012
@@ -624,6 +624,7 @@ public class AMQSession_0_8 extends AMQS
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws
AMQException
{
throw new UnsupportedOperationException("The new addressing based
sytanx is "
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Mon Feb 13 22:30:47 2012
@@ -86,7 +86,7 @@ public class BasicMessageProducer_0_10 e
{
try
{
-
getSession().handleAddressBasedDestination(destination,false,false);
+
getSession().handleAddressBasedDestination(destination,false,false,false);
}
catch(Exception e)
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]