Author: rgodfrey
Date: Fri Jul 15 10:52:02 2016
New Revision: 1752819

URL: http://svn.apache.org/viewvc?rev=1752819&view=rev
Log:
QPID-7188 : Add target capabilities for AMQP 1.0 Destinations that map to 
Exchanges to allow the selection of either "discard unroutable" or "reject 
unroutable" behaviours

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 Fri Jul 15 10:52:02 2016
@@ -84,6 +84,8 @@ public abstract class AbstractExchange<T
 
     @ManagedAttributeField(beforeSet = "preSetAlternateExchange", afterSet = 
"postSetAlternateExchange" )
     private Exchange<?> _alternateExchange;
+    @ManagedAttributeField
+    private UnroutableMessageBehaviour _unroutableMessageBehaviour;
 
     private VirtualHost<?> _virtualHost;
 
@@ -208,6 +210,12 @@ public abstract class AbstractExchange<T
     }
 
     @Override
+    public UnroutableMessageBehaviour getUnroutableMessageBehaviour()
+    {
+        return _unroutableMessageBehaviour;
+    }
+
+    @Override
     public String toString()
     {
         return getClass().getSimpleName() + "[" + getName() +"]";

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
 Fri Jul 15 10:52:02 2016
@@ -40,11 +40,19 @@ public interface Exchange<X extends Exch
 
     String ALTERNATE_EXCHANGE                   = "alternateExchange";
 
+    enum UnroutableMessageBehaviour
+    {
+        REJECT, DISCARD
+    }
+
     // Attributes
 
     @ManagedAttribute
     Exchange<?> getAlternateExchange();
 
+    @ManagedAttribute(description = "(AMQP 1.0 only) Default behaviour to 
apply when a message is not routed to any queues", defaultValue = "DISCARD")
+    UnroutableMessageBehaviour getUnroutableMessageBehaviour();
+
     //children
     Collection<? extends Binding> getBindings();
     Collection<Publisher> getPublishers();

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
 Fri Jul 15 10:52:02 2016
@@ -21,8 +21,10 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+
 public interface Destination
 {
-
+    Symbol[] getCapabilities();
 
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
 Fri Jul 15 10:52:02 2016
@@ -20,9 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.Arrays;
 import java.util.Collections;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -35,6 +40,7 @@ import org.apache.qpid.server.txn.Server
 
 public class ExchangeDestination implements ReceivingDestination, 
SendingDestination
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExchangeDestination.class);
     private static final Accepted ACCEPTED = new Accepted();
     public static final Rejected REJECTED = new Rejected();
     private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
@@ -44,13 +50,21 @@ public class ExchangeDestination impleme
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
     private String _initialRoutingAddress;
+    private final boolean _discardUnroutable;
 
-    public ExchangeDestination(Exchange<?> exchange, TerminusDurability 
durable, TerminusExpiryPolicy expiryPolicy, String address)
+    public ExchangeDestination(Exchange<?> exchange,
+                               TerminusDurability durable,
+                               TerminusExpiryPolicy expiryPolicy,
+                               String address,
+                               final Symbol[] capabilities)
     {
         _exchange = exchange;
         _durability = durable;
         _expiryPolicy = expiryPolicy;
         _address = address;
+        _discardUnroutable = (capabilities != null && 
Arrays.asList(capabilities).contains(DISCARD_UNROUTABLE)) || 
exchange.getUnroutableMessageBehaviour() == 
Exchange.UnroutableMessageBehaviour.DISCARD;
+
+
     }
 
     public Outcome[] getOutcomes()
@@ -90,7 +104,7 @@ public class ExchangeDestination impleme
                                       null);
 
 
-        return enqueues == 0 ? REJECTED : ACCEPTED;
+        return enqueues == 0 && !_discardUnroutable ? REJECTED : ACCEPTED;
     }
 
     @Override
@@ -183,4 +197,12 @@ public class ExchangeDestination impleme
     {
         return _initialRoutingAddress;
     }
+
+    @Override
+    public Symbol[] getCapabilities()
+    {
+        Symbol[] capabilities = new Symbol[1];
+        capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : 
REJECT_UNROUTABLE;
+        return capabilities;
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
 Fri Jul 15 10:52:02 2016
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.message.MessageSource;
 
@@ -53,4 +54,9 @@ public class MessageSourceDestination im
         return _queue;
     }
 
+    @Override
+    public Symbol[] getCapabilities()
+    {
+        return new Symbol[0];
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
 Fri Jul 15 10:52:02 2016
@@ -20,10 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -39,6 +42,7 @@ public class NodeReceivingDestination im
     private static final Accepted ACCEPTED = new Accepted();
     public static final Rejected REJECTED = new Rejected();
     private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
+    private final boolean _discardUnroutable;
 
     private MessageDestination _destination;
     private TerminusDurability _durability;
@@ -48,12 +52,16 @@ public class NodeReceivingDestination im
     public NodeReceivingDestination(MessageDestination destination,
                                     TerminusDurability durable,
                                     TerminusExpiryPolicy expiryPolicy,
-                                    final String address)
+                                    final String address, final Symbol[] 
capabilities)
     {
         _destination = destination;
         _durability = durable;
         _expiryPolicy = expiryPolicy;
         _address = address;
+        _discardUnroutable = destination instanceof Exchange
+                             && ((capabilities != null && 
Arrays.asList(capabilities).contains(DISCARD_UNROUTABLE))
+                                 || 
((Exchange)destination).getUnroutableMessageBehaviour() == 
Exchange.UnroutableMessageBehaviour.DISCARD);
+
     }
 
     public Outcome[] getOutcomes()
@@ -92,7 +100,7 @@ public class NodeReceivingDestination im
         int enqueues = _destination.send(message, routingAddress, 
instanceProperties, txn, null);
 
 
-        return enqueues == 0 ? REJECTED : ACCEPTED;
+        return enqueues == 0 && !_discardUnroutable ? REJECTED : ACCEPTED;
     }
 
     @Override
@@ -164,4 +172,12 @@ public class NodeReceivingDestination im
     {
         return _destination;
     }
+
+    @Override
+    public Symbol[] getCapabilities()
+    {
+        Symbol[] capabilities = new Symbol[1];
+        capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : 
REJECT_UNROUTABLE;
+        return capabilities;
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
 Fri Jul 15 10:52:02 2016
@@ -22,12 +22,16 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public interface ReceivingDestination extends Destination
 {
 
+    Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
+    Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
+
     Outcome[] getOutcomes();
 
     Outcome send(Message_1_0 message, ServerTransaction txn);

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Fri Jul 15 10:52:02 2016
@@ -93,7 +93,6 @@ import org.apache.qpid.server.protocol.C
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
@@ -755,10 +754,11 @@ public class Session_1_0 implements AMQS
                                     new ExchangeDestination(exchg,
                                                             
source.getDurable(),
                                                             
source.getExpiryPolicy(),
-                                                            parts[0]);
+                                                            parts[0],
+                                                            
target.getCapabilities());
                             
exchangeDestination.setInitialRoutingAddress(parts[1]);
                             destination = exchangeDestination;
-
+                            
target.setCapabilities(exchangeDestination.getCapabilities());
                         }
                         else
                         {
@@ -778,10 +778,14 @@ public class Session_1_0 implements AMQS
                             Exchange<?> exchg = getExchange(addr);
                             if (exchg != null)
                             {
-                                destination = new ExchangeDestination(exchg,
+                                ExchangeDestination exchangeDestination =
+                                              new ExchangeDestination(exchg,
                                                                       
source.getDurable(),
                                                                       
source.getExpiryPolicy(),
-                                                                      addr);
+                                                                      addr,
+                                                                      
target.getCapabilities());
+                                destination = exchangeDestination;
+                                
target.setCapabilities(exchangeDestination.getCapabilities());
                             }
                             else
                             {
@@ -924,7 +928,10 @@ public class Session_1_0 implements AMQS
                         {
                             MessageDestination messageDestination = 
getAddressSpace().getDefaultDestination();
                             destination = new 
NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                                       
target.getExpiryPolicy(), "");
+                                                                       
target.getExpiryPolicy(), "",
+                                                                       
target.getCapabilities());
+                            
target.setCapabilities(destination.getCapabilities());
+
                         }
                         else if (!addr.startsWith("/") && addr.contains("/"))
                         {
@@ -936,10 +943,11 @@ public class Session_1_0 implements AMQS
                                         new ExchangeDestination(exchange,
                                                                 
target.getDurable(),
                                                                 
target.getExpiryPolicy(),
-                                                                parts[0]);
+                                                                parts[0],
+                                                                
target.getCapabilities());
 
                                 
exchangeDestination.setInitialRoutingAddress(parts[1]);
-
+                                
target.setCapabilities(exchangeDestination.getCapabilities());
                                 destination = exchangeDestination;
 
                             }
@@ -957,7 +965,9 @@ public class Session_1_0 implements AMQS
                             {
                                 destination =
                                         new 
NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                                     
target.getExpiryPolicy(), addr);
+                                                                     
target.getExpiryPolicy(), addr, target.getCapabilities());
+                                
target.setCapabilities(destination.getCapabilities());
+
                             }
                             else
                             {



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to