Author: rgodfrey
Date: Fri Jul 15 10:52:02 2016
New Revision: 1752819
URL: http://svn.apache.org/viewvc?rev=1752819&view=rev
Log:
QPID-7188 : Add target capabilities for AMQP 1.0 Destinations that map to
Exchanges to allow the selection of either "discard unroutable" or "reject
unroutable" behaviours
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Fri Jul 15 10:52:02 2016
@@ -84,6 +84,8 @@ public abstract class AbstractExchange<T
@ManagedAttributeField(beforeSet = "preSetAlternateExchange", afterSet =
"postSetAlternateExchange" )
private Exchange<?> _alternateExchange;
+ @ManagedAttributeField
+ private UnroutableMessageBehaviour _unroutableMessageBehaviour;
private VirtualHost<?> _virtualHost;
@@ -208,6 +210,12 @@ public abstract class AbstractExchange<T
}
@Override
+ public UnroutableMessageBehaviour getUnroutableMessageBehaviour()
+ {
+ return _unroutableMessageBehaviour;
+ }
+
+ @Override
public String toString()
{
return getClass().getSimpleName() + "[" + getName() +"]";
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
Fri Jul 15 10:52:02 2016
@@ -40,11 +40,19 @@ public interface Exchange<X extends Exch
String ALTERNATE_EXCHANGE = "alternateExchange";
+ enum UnroutableMessageBehaviour
+ {
+ REJECT, DISCARD
+ }
+
// Attributes
@ManagedAttribute
Exchange<?> getAlternateExchange();
+ @ManagedAttribute(description = "(AMQP 1.0 only) Default behaviour to
apply when a message is not routed to any queues", defaultValue = "DISCARD")
+ UnroutableMessageBehaviour getUnroutableMessageBehaviour();
+
//children
Collection<? extends Binding> getBindings();
Collection<Publisher> getPublishers();
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
Fri Jul 15 10:52:02 2016
@@ -21,8 +21,10 @@
package org.apache.qpid.server.protocol.v1_0;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+
public interface Destination
{
-
+ Symbol[] getCapabilities();
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
Fri Jul 15 10:52:02 2016
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.util.Arrays;
import java.util.Collections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -35,6 +40,7 @@ import org.apache.qpid.server.txn.Server
public class ExchangeDestination implements ReceivingDestination,
SendingDestination
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ExchangeDestination.class);
private static final Accepted ACCEPTED = new Accepted();
public static final Rejected REJECTED = new Rejected();
private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
@@ -44,13 +50,21 @@ public class ExchangeDestination impleme
private TerminusDurability _durability;
private TerminusExpiryPolicy _expiryPolicy;
private String _initialRoutingAddress;
+ private final boolean _discardUnroutable;
- public ExchangeDestination(Exchange<?> exchange, TerminusDurability
durable, TerminusExpiryPolicy expiryPolicy, String address)
+ public ExchangeDestination(Exchange<?> exchange,
+ TerminusDurability durable,
+ TerminusExpiryPolicy expiryPolicy,
+ String address,
+ final Symbol[] capabilities)
{
_exchange = exchange;
_durability = durable;
_expiryPolicy = expiryPolicy;
_address = address;
+ _discardUnroutable = (capabilities != null &&
Arrays.asList(capabilities).contains(DISCARD_UNROUTABLE)) ||
exchange.getUnroutableMessageBehaviour() ==
Exchange.UnroutableMessageBehaviour.DISCARD;
+
+
}
public Outcome[] getOutcomes()
@@ -90,7 +104,7 @@ public class ExchangeDestination impleme
null);
- return enqueues == 0 ? REJECTED : ACCEPTED;
+ return enqueues == 0 && !_discardUnroutable ? REJECTED : ACCEPTED;
}
@Override
@@ -183,4 +197,12 @@ public class ExchangeDestination impleme
{
return _initialRoutingAddress;
}
+
+ @Override
+ public Symbol[] getCapabilities()
+ {
+ Symbol[] capabilities = new Symbol[1];
+ capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE :
REJECT_UNROUTABLE;
+ return capabilities;
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
Fri Jul 15 10:52:02 2016
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.message.MessageSource;
@@ -53,4 +54,9 @@ public class MessageSourceDestination im
return _queue;
}
+ @Override
+ public Symbol[] getCapabilities()
+ {
+ return new Symbol[0];
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
Fri Jul 15 10:52:02 2016
@@ -20,10 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.util.Arrays;
import java.util.Collections;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -39,6 +42,7 @@ public class NodeReceivingDestination im
private static final Accepted ACCEPTED = new Accepted();
public static final Rejected REJECTED = new Rejected();
private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
+ private final boolean _discardUnroutable;
private MessageDestination _destination;
private TerminusDurability _durability;
@@ -48,12 +52,16 @@ public class NodeReceivingDestination im
public NodeReceivingDestination(MessageDestination destination,
TerminusDurability durable,
TerminusExpiryPolicy expiryPolicy,
- final String address)
+ final String address, final Symbol[]
capabilities)
{
_destination = destination;
_durability = durable;
_expiryPolicy = expiryPolicy;
_address = address;
+ _discardUnroutable = destination instanceof Exchange
+ && ((capabilities != null &&
Arrays.asList(capabilities).contains(DISCARD_UNROUTABLE))
+ ||
((Exchange)destination).getUnroutableMessageBehaviour() ==
Exchange.UnroutableMessageBehaviour.DISCARD);
+
}
public Outcome[] getOutcomes()
@@ -92,7 +100,7 @@ public class NodeReceivingDestination im
int enqueues = _destination.send(message, routingAddress,
instanceProperties, txn, null);
- return enqueues == 0 ? REJECTED : ACCEPTED;
+ return enqueues == 0 && !_discardUnroutable ? REJECTED : ACCEPTED;
}
@Override
@@ -164,4 +172,12 @@ public class NodeReceivingDestination im
{
return _destination;
}
+
+ @Override
+ public Symbol[] getCapabilities()
+ {
+ Symbol[] capabilities = new Symbol[1];
+ capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE :
REJECT_UNROUTABLE;
+ return capabilities;
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
Fri Jul 15 10:52:02 2016
@@ -22,12 +22,16 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.txn.ServerTransaction;
public interface ReceivingDestination extends Destination
{
+ Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
+ Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
+
Outcome[] getOutcomes();
Outcome send(Message_1_0 message, ServerTransaction txn);
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1752819&r1=1752818&r2=1752819&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Fri Jul 15 10:52:02 2016
@@ -93,7 +93,6 @@ import org.apache.qpid.server.protocol.C
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -755,10 +754,11 @@ public class Session_1_0 implements AMQS
new ExchangeDestination(exchg,
source.getDurable(),
source.getExpiryPolicy(),
- parts[0]);
+ parts[0],
+
target.getCapabilities());
exchangeDestination.setInitialRoutingAddress(parts[1]);
destination = exchangeDestination;
-
+
target.setCapabilities(exchangeDestination.getCapabilities());
}
else
{
@@ -778,10 +778,14 @@ public class Session_1_0 implements AMQS
Exchange<?> exchg = getExchange(addr);
if (exchg != null)
{
- destination = new ExchangeDestination(exchg,
+ ExchangeDestination exchangeDestination =
+ new ExchangeDestination(exchg,
source.getDurable(),
source.getExpiryPolicy(),
- addr);
+ addr,
+
target.getCapabilities());
+ destination = exchangeDestination;
+
target.setCapabilities(exchangeDestination.getCapabilities());
}
else
{
@@ -924,7 +928,10 @@ public class Session_1_0 implements AMQS
{
MessageDestination messageDestination =
getAddressSpace().getDefaultDestination();
destination = new
NodeReceivingDestination(messageDestination, target.getDurable(),
-
target.getExpiryPolicy(), "");
+
target.getExpiryPolicy(), "",
+
target.getCapabilities());
+
target.setCapabilities(destination.getCapabilities());
+
}
else if (!addr.startsWith("/") && addr.contains("/"))
{
@@ -936,10 +943,11 @@ public class Session_1_0 implements AMQS
new ExchangeDestination(exchange,
target.getDurable(),
target.getExpiryPolicy(),
- parts[0]);
+ parts[0],
+
target.getCapabilities());
exchangeDestination.setInitialRoutingAddress(parts[1]);
-
+
target.setCapabilities(exchangeDestination.getCapabilities());
destination = exchangeDestination;
}
@@ -957,7 +965,9 @@ public class Session_1_0 implements AMQS
{
destination =
new
NodeReceivingDestination(messageDestination, target.getDurable(),
-
target.getExpiryPolicy(), addr);
+
target.getExpiryPolicy(), addr, target.getCapabilities());
+
target.setCapabilities(destination.getCapabilities());
+
}
else
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]