Author: rajith
Date: Mon Feb 13 22:30:47 2012
New Revision: 1243719

URL: http://svn.apache.org/viewvc?rev=1243719&view=rev
Log:
QPID-3836 Modified the address handling code to pass the noLocal
argument to queue-declare method.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Feb 13 22:30:47 2012
@@ -1045,7 +1045,7 @@ public abstract class AMQSession<C exten
         {
             try
             {
-                handleAddressBasedDestination(dest,false,true);
+                handleAddressBasedDestination(dest,false,noLocal,true);
                 if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
                 {
                     throw new JMSException("Durable subscribers can only be 
created for Topics");
@@ -2905,7 +2905,7 @@ public abstract class AMQSession<C exten
 
         if (amqd.getDestSyntax() == DestSyntax.ADDR)
         {
-            handleAddressBasedDestination(amqd,true,nowait);            
+            
handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
         }
         else
         {
@@ -2966,6 +2966,7 @@ public abstract class AMQSession<C exten
 
     public abstract void handleAddressBasedDestination(AMQDestination dest, 
                                                        boolean isConsumer,
+                                                       boolean noLocal,
                                                        boolean noWait) throws 
AMQException;
     
     private void registerProducer(long producerId, MessageProducer producer)

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Mon Feb 13 22:30:47 2012
@@ -766,8 +766,14 @@ public class AMQSession_0_10 extends AMQ
         else
         {
             QueueNode node = (QueueNode)amqd.getSourceNode();
+            Map<String,Object> arguments = new HashMap<String,Object>();
+            arguments.putAll((Map<? extends String, ? extends Object>) 
node.getDeclareArgs());
+            if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == 
null)
+            {
+                arguments.put(AddressHelper.NO_LOCAL, noLocal);
+            }
             getQpidSession().queueDeclare(queueName.toString(), 
node.getAlternateExchange() ,
-                    node.getDeclareArgs(),
+                    arguments,
                     node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                     node.isDurable() ? Option.DURABLE : Option.NONE,
                     node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);   
@@ -1167,13 +1173,14 @@ public class AMQSession_0_10 extends AMQ
     @SuppressWarnings("deprecation")
     public void handleAddressBasedDestination(AMQDestination dest, 
                                               boolean isConsumer,
+                                              boolean noLocal,
                                               boolean noWait) throws 
AMQException
     {
         if (dest.isAddressResolved() && 
dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
         {
             if (isConsumer && AMQDestination.TOPIC_TYPE == 
dest.getAddressType()) 
             {
-                createSubscriptionQueue(dest);
+                createSubscriptionQueue(dest,noLocal);
             }
         }
         else
@@ -1202,7 +1209,7 @@ public class AMQSession_0_10 extends AMQ
                     else if(createNode)
                     {
                         setLegacyFiledsForQueueType(dest);
-                        send0_10QueueDeclare(dest,null,false,noWait);
+                        send0_10QueueDeclare(dest,null,noLocal,noWait);
                         sendQueueBind(dest.getAMQQueueName(), 
dest.getRoutingKey(),
                                       null,dest.getExchangeName(),dest, false);
                         break;
@@ -1217,7 +1224,7 @@ public class AMQSession_0_10 extends AMQ
                         verifySubject(dest);
                         if (isConsumer && 
!isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
                         {  
-                            createSubscriptionQueue(dest);
+                            createSubscriptionQueue(dest, noLocal);
                         }
                         break;
                     }
@@ -1232,7 +1239,7 @@ public class AMQSession_0_10 extends AMQ
                                 false);        
                         if (isConsumer && 
!isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
                         {
-                            createSubscriptionQueue(dest);
+                            createSubscriptionQueue(dest,noLocal);
                         }
                         break;
                     }
@@ -1295,7 +1302,7 @@ public class AMQSession_0_10 extends AMQ
         }
     }
     
-    private void createSubscriptionQueue(AMQDestination dest) throws 
AMQException
+    private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) 
throws AMQException
     {
         QueueNode node = (QueueNode)dest.getSourceNode();  // source node is 
never null
         
@@ -1308,7 +1315,7 @@ public class AMQSession_0_10 extends AMQ
         }
         node.setExclusive(true);
         node.setAutoDelete(!node.isDurable());
-        send0_10QueueDeclare(dest,null,false,true);
+        send0_10QueueDeclare(dest,null,noLocal,true);
         getQpidSession().exchangeBind(dest.getQueueName(), 
                                      dest.getAddressName(), 
                                      dest.getSubject(), 

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Mon Feb 13 22:30:47 2012
@@ -624,6 +624,7 @@ public class AMQSession_0_8 extends AMQS
 
     public void handleAddressBasedDestination(AMQDestination dest, 
                                               boolean isConsumer,
+                                              boolean noLocal,
                                               boolean noWait) throws 
AMQException
     {
         throw new UnsupportedOperationException("The new addressing based 
sytanx is "

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1243719&r1=1243718&r2=1243719&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 Mon Feb 13 22:30:47 2012
@@ -86,7 +86,7 @@ public class BasicMessageProducer_0_10 e
         {       
             try
             {
-                
getSession().handleAddressBasedDestination(destination,false,false);
+                
getSession().handleAddressBasedDestination(destination,false,false,false);
             }
             catch(Exception e)
             {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to