Repository: activemq-artemis Updated Branches: refs/heads/master 78d0193fc -> 2ef0d2601
ARTEMIS-1068 JMS + AMQP routing Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c792b8e2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c792b8e2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c792b8e2 Branch: refs/heads/master Commit: c792b8e2741d24aef24f07b78c733ebf5f225ed7 Parents: 78d0193 Author: Justin Bertram <[email protected]> Authored: Sat Mar 25 08:13:25 2017 -0500 Committer: Clebert Suconic <[email protected]> Committed: Mon Mar 27 15:14:43 2017 -0400 ---------------------------------------------------------------------- .../artemis/protocol/amqp/broker/AMQPMessage.java | 11 +++++++++++ .../protocol/amqp/broker/AMQPSessionCallback.java | 4 ++++ .../artemis/protocol/amqp/proton/AmqpSupport.java | 2 ++ .../amqp/proton/ProtonServerReceiverContext.java | 14 +++++++++++++- 4 files changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 522ae16..d241958 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -256,6 +256,17 @@ public class AMQPMessage extends RefCountMessage { if (routingType != null) { return RoutingType.getType((byte) routingType); } else { + routingType = getSymbol(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION); + if (routingType != null) { + if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) { + return RoutingType.ANYCAST; + } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) { + return RoutingType.MULTICAST; + } + } else { + return null; + } + return null; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 034cb72..18294e0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -575,4 +575,8 @@ public class AMQPSessionCallback implements SessionCallback { public void removeTemporaryQueue(String address) throws Exception { serverSession.deleteQueue(SimpleString.toSimpleString(address)); } + + public RoutingType getDefaultRoutingType(String address) { + return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 227ee5d..3a36f16 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -39,6 +39,8 @@ public class AmqpSupport { // Capabilities used to identify destination type in some requests. public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); + public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue"); + public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic"); // Symbols used to announce connection information to remote peer. public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 34a522f..596e93a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -86,7 +86,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements address = sessionSPI.tempQueueName(); try { - sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST); + sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities())); } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } @@ -122,6 +122,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(maxCreditAllocation, minCreditRefresh); } + private RoutingType getRoutingType(Symbol[] symbols) { + for (Symbol symbol : symbols) { + if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { + return RoutingType.MULTICAST; + } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) { + return RoutingType.ANYCAST; + } + } + + return sessionSPI.getDefaultRoutingType(address); + } + /* * called when Proton receives a message to be delivered via a Delivery. *
