Repository: activemq-artemis
Updated Branches:
  refs/heads/master 78d0193fc -> 2ef0d2601


ARTEMIS-1068 JMS + AMQP routing


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c792b8e2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c792b8e2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c792b8e2

Branch: refs/heads/master
Commit: c792b8e2741d24aef24f07b78c733ebf5f225ed7
Parents: 78d0193
Author: Justin Bertram <[email protected]>
Authored: Sat Mar 25 08:13:25 2017 -0500
Committer: Clebert Suconic <[email protected]>
Committed: Mon Mar 27 15:14:43 2017 -0400

----------------------------------------------------------------------
 .../artemis/protocol/amqp/broker/AMQPMessage.java     | 11 +++++++++++
 .../protocol/amqp/broker/AMQPSessionCallback.java     |  4 ++++
 .../artemis/protocol/amqp/proton/AmqpSupport.java     |  2 ++
 .../amqp/proton/ProtonServerReceiverContext.java      | 14 +++++++++++++-
 4 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 522ae16..d241958 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -256,6 +256,17 @@ public class AMQPMessage extends RefCountMessage {
       if (routingType != null) {
          return RoutingType.getType((byte) routingType);
       } else {
+         routingType = 
getSymbol(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
+         if (routingType != null) {
+            if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || 
AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
+               return RoutingType.ANYCAST;
+            } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || 
AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
+               return RoutingType.MULTICAST;
+            }
+         } else {
+            return null;
+         }
+
          return null;
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 034cb72..18294e0 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -575,4 +575,8 @@ public class AMQPSessionCallback implements SessionCallback 
{
    public void removeTemporaryQueue(String address) throws Exception {
       serverSession.deleteQueue(SimpleString.toSimpleString(address));
    }
+
+   public RoutingType getDefaultRoutingType(String address) {
+      return 
manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 227ee5d..3a36f16 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -39,6 +39,8 @@ public class AmqpSupport {
    // Capabilities used to identify destination type in some requests.
    public static final Symbol TEMP_QUEUE_CAPABILITY = 
Symbol.valueOf("temporary-queue");
    public static final Symbol TEMP_TOPIC_CAPABILITY = 
Symbol.valueOf("temporary-topic");
+   public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue");
+   public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic");
 
    // Symbols used to announce connection information to remote peer.
    public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 34a522f..596e93a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -86,7 +86,7 @@ public class ProtonServerReceiverContext extends 
ProtonInitializable implements
             address = sessionSPI.tempQueueName();
 
             try {
-               sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST);
+               sessionSPI.createTemporaryQueue(address, 
getRoutingType(target.getCapabilities()));
             } catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
@@ -122,6 +122,18 @@ public class ProtonServerReceiverContext extends 
ProtonInitializable implements
       flow(maxCreditAllocation, minCreditRefresh);
    }
 
+   private RoutingType getRoutingType(Symbol[] symbols) {
+      for (Symbol symbol : symbols) {
+         if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || 
AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
+            return RoutingType.MULTICAST;
+         } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || 
AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
+            return RoutingType.ANYCAST;
+         }
+      }
+
+      return sessionSPI.getDefaultRoutingType(address);
+   }
+
    /*
    * called when Proton receives a message to be delivered via a Delivery.
    *

Reply via email to