Author: rgodfrey
Date: Wed Mar  5 17:33:23 2014
New Revision: 1574582

URL: http://svn.apache.org/r1574582
Log:
QPID-5605 : [Java Broker] [AMQP 1.0] allow use of addresses of the form 
<exchange>/<routing-key>

Modified:
    
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
 Wed Mar  5 17:33:23 2014
@@ -38,6 +38,7 @@ public class ExchangeDestination impleme
     private ExchangeImpl _exchange;
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
+    private String _initialRoutingAddress;
 
     public ExchangeDestination(ExchangeImpl exchange, TerminusDurability 
durable, TerminusExpiryPolicy expiryPolicy)
     {
@@ -76,7 +77,13 @@ public class ExchangeDestination impleme
                     return null;
                 }};
 
-        int enqueues = _exchange.send(message, 
message.getInitialRoutingAddress(), instanceProperties, txn, null);
+        int enqueues = _exchange.send(message,
+                                      _initialRoutingAddress == null
+                                              ? 
message.getInitialRoutingAddress()
+                                              : _initialRoutingAddress,
+                                      instanceProperties,
+                                      txn,
+                                      null);
 
 
         return enqueues == 0 ? REJECTED : ACCEPTED;
@@ -102,4 +109,14 @@ public class ExchangeDestination impleme
     {
         return _exchange;
     }
+
+    public void setInitialRoutingAddress(final String initialRoutingAddress)
+    {
+        _initialRoutingAddress = initialRoutingAddress;
+    }
+
+    public String getInitialRoutingAddress()
+    {
+        return _initialRoutingAddress;
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
 Wed Mar  5 17:33:23 2014
@@ -35,13 +35,13 @@ public class NodeReceivingDestination im
     public static final Rejected REJECTED = new Rejected();
     private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
 
-    private MessageDestination _exchange;
+    private MessageDestination _destination;
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
 
-    public NodeReceivingDestination(MessageDestination exchange, 
TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
+    public NodeReceivingDestination(MessageDestination destination, 
TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
     {
-        _exchange = exchange;
+        _destination = destination;
         _durability = durable;
         _expiryPolicy = expiryPolicy;
     }
@@ -76,7 +76,7 @@ public class NodeReceivingDestination im
                     return null;
                 }};
 
-        int enqueues = _exchange.send(message, 
message.getInitialRoutingAddress(), instanceProperties, txn, null);
+        int enqueues = _destination.send(message, 
message.getInitialRoutingAddress(), instanceProperties, txn, null);
 
 
         return enqueues == 0 ? REJECTED : ACCEPTED;
@@ -100,6 +100,6 @@ public class NodeReceivingDestination im
 
     public MessageDestination getDestination()
     {
-        return _exchange;
+        return _destination;
     }
 }

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 Wed Mar  5 17:33:23 2014
@@ -230,7 +230,7 @@ public class SendingLink_1_0 implements 
                 }
 
 
-                String binding = "";
+                String binding = null;
 
                 Map<Symbol,Filter> filters = source.getFilter();
                 Map<Symbol,Filter> actualFilters = new 
HashMap<Symbol,Filter>();
@@ -298,8 +298,14 @@ public class SendingLink_1_0 implements 
                 }
                 _queue = queue;
                 source.setFilter(actualFilters.isEmpty() ? null : 
actualFilters);
-
-                exchange.addBinding(binding, queue,null);
+                if(binding != null)
+                {
+                    exchange.addBinding(binding, queue,null);
+                }
+                if(exchangeDestination.getInitialRoutingAddress() != null)
+                {
+                    
exchange.addBinding(exchangeDestination.getInitialRoutingAddress(),queue,null);
+                }
                 source.setDistributionMode(StdDistMode.COPY);
 
                 qd = new QueueDestination(queue);

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Wed Mar  5 17:33:23 2014
@@ -127,17 +127,17 @@ public class Session_1_0 implements Sess
                         source.setAddress(tempQueue.getName());
                     }
                     String addr = source.getAddress();
-                    MessageSource queue = 
getVirtualHost().getMessageSource(addr);
-                    if(queue != null)
+                    if(!addr.startsWith("/") && addr.contains("/"))
                     {
-                        destination = new MessageSourceDestination(queue);
-                    }
-                    else
-                    {
-                        ExchangeImpl exchg = 
getVirtualHost().getExchange(addr);
+                        String[] parts = addr.split("/",2);
+                        ExchangeImpl exchg = 
getVirtualHost().getExchange(parts[0]);
                         if(exchg != null)
                         {
-                            destination = new ExchangeDestination(exchg, 
source.getDurable(), source.getExpiryPolicy());
+                            ExchangeDestination exchangeDestination =
+                                    new ExchangeDestination(exchg, 
source.getDurable(), source.getExpiryPolicy());
+                            
exchangeDestination.setInitialRoutingAddress(parts[1]);
+                            destination = exchangeDestination;
+
                         }
                         else
                         {
@@ -145,6 +145,27 @@ public class Session_1_0 implements Sess
                             destination = null;
                         }
                     }
+                    else
+                    {
+                        MessageSource queue = 
getVirtualHost().getMessageSource(addr);
+                        if(queue != null)
+                        {
+                            destination = new MessageSourceDestination(queue);
+                        }
+                        else
+                        {
+                            ExchangeImpl exchg = 
getVirtualHost().getExchange(addr);
+                            if(exchg != null)
+                            {
+                                destination = new ExchangeDestination(exchg, 
source.getDurable(), source.getExpiryPolicy());
+                            }
+                            else
+                            {
+                                endpoint.setSource(null);
+                                destination = null;
+                            }
+                        }
+                    }
 
                 }
                 else
@@ -265,28 +286,52 @@ public class Session_1_0 implements Sess
                         }
 
                         String addr = target.getAddress();
-                        MessageDestination messageDestination = 
getVirtualHost().getMessageDestination(addr);
-                        if(messageDestination != null)
-                        {
-                            destination = new 
NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                                       
target.getExpiryPolicy());
-                        }
-                        else
+                        if(!addr.startsWith("/") && addr.contains("/"))
                         {
-                            AMQQueue queue = getVirtualHost().getQueue(addr);
-                            if(queue != null)
+                            String[] parts = addr.split("/",2);
+                            ExchangeImpl exchange = 
getVirtualHost().getExchange(parts[0]);
+                            if(exchange != null)
                             {
+                                ExchangeDestination exchangeDestination =
+                                        new ExchangeDestination(exchange,
+                                                                
target.getDurable(),
+                                                                
target.getExpiryPolicy());
+
+                                
exchangeDestination.setInitialRoutingAddress(parts[1]);
+
+                                destination = exchangeDestination;
 
-                                destination = new QueueDestination(queue);
                             }
                             else
                             {
                                 endpoint.setTarget(null);
                                 destination = null;
                             }
-
                         }
+                        else
+                        {
+                            MessageDestination messageDestination = 
getVirtualHost().getMessageDestination(addr);
+                            if(messageDestination != null)
+                            {
+                                destination = new 
NodeReceivingDestination(messageDestination, target.getDurable(),
+                                                                           
target.getExpiryPolicy());
+                            }
+                            else
+                            {
+                                AMQQueue queue = 
getVirtualHost().getQueue(addr);
+                                if(queue != null)
+                                {
+
+                                    destination = new QueueDestination(queue);
+                                }
+                                else
+                                {
+                                    endpoint.setTarget(null);
+                                    destination = null;
+                                }
 
+                            }
+                        }
 
                     }
                     else



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to