Repository: activemq-artemis
Updated Branches:
  refs/heads/master 4203ae89c -> 91ee5272b


ARTEMIS-1068 routingType + AMQP fixes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/427039ef
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/427039ef
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/427039ef

Branch: refs/heads/master
Commit: 427039ef384fbecd22a734145997fba02530107d
Parents: 4203ae8
Author: Justin Bertram <[email protected]>
Authored: Fri Mar 24 15:05:18 2017 -0500
Committer: Justin Bertram <[email protected]>
Committed: Fri Mar 24 15:15:10 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  8 +-
 .../artemis/core/message/impl/CoreMessage.java  | 12 ++-
 .../artemis/jms/client/ActiveMQMessage.java     | 11 +--
 .../jms/client/ActiveMQMessageProducer.java     |  3 +-
 .../protocol/amqp/broker/AMQPMessage.java       | 32 ++++----
 .../amqp/converter/AMQPMessageSupport.java      |  5 ++
 .../core/protocol/openwire/OpenwireMessage.java |  6 --
 .../core/protocol/openwire/amq/AMQSession.java  |  4 +-
 .../stomp/VersionedStompFrameHandler.java       |  2 +-
 .../artemis/core/server/impl/DivertImpl.java    |  6 +-
 .../core/server/impl/ServerSessionImpl.java     |  2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  5 --
 .../integration/amqp/AmqpSendReceiveTest.java   | 77 +++++++++++++++++++-
 .../integration/client/AcknowledgeTest.java     |  5 --
 .../tests/integration/client/RoutingTest.java   |  4 +-
 .../tests/integration/divert/DivertTest.java    |  4 +-
 16 files changed, 131 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 56097ae..9cd3fa7 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -168,7 +168,13 @@ public interface Message {
       // only on core
    }
 
-   RoutingType getRouteType();
+   default RoutingType getRoutingType() {
+      return null;
+   }
+
+   default Message setRoutingType(RoutingType routingType) {
+      return this;
+   }
 
    default SimpleString getLastValueProperty() {
       return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index f0a8715..8f24cc0 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -154,7 +154,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    }
 
    @Override
-   public RoutingType getRouteType() {
+   public RoutingType getRoutingType() {
       if (containsProperty(Message.HDR_ROUTING_TYPE)) {
          return RoutingType.getType(getByteProperty(Message.HDR_ROUTING_TYPE));
       }
@@ -162,6 +162,16 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    }
 
    @Override
+   public Message setRoutingType(RoutingType routingType) {
+      if (routingType == null) {
+         removeProperty(Message.HDR_ROUTING_TYPE);
+      } else {
+         putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
+      }
+      return this;
+   }
+
+   @Override
    public CoreMessage setReplyTo(SimpleString address) {
 
       if (address == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 286bc57..64c8f16 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -399,13 +399,10 @@ public class ActiveMQMessage implements javax.jms.Message 
{
       if (dest == null) {
          SimpleString address = message.getAddressSimpleString();
          String prefix = "";
-         if 
(message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE))
 {
-            RoutingType routingType = 
RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
-            if (routingType.equals(RoutingType.ANYCAST)) {
-               prefix = QUEUE_QUALIFIED_PREFIX;
-            } else if (routingType.equals(RoutingType.MULTICAST)) {
-               prefix = TOPIC_QUALIFIED_PREFIX;
-            }
+         if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
+            prefix = QUEUE_QUALIFIED_PREFIX;
+         } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
+            prefix = TOPIC_QUALIFIED_PREFIX;
          }
 
          dest = address == null ? null : 
ActiveMQDestination.fromPrefixedName(prefix + address.toString());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 74dd39a..3121a88 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -494,8 +494,7 @@ public class ActiveMQMessageProducer implements 
MessageProducer, QueueSender, To
       ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
       
coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, 
connID);
 
-      byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() 
: RoutingType.MULTICAST.getType();
-      
coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
 routingType);
+      coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : 
RoutingType.MULTICAST);
 
       try {
          /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d02eace..522ae16 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -250,22 +250,24 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public RoutingType getRouteType() {
-
-      /* TODO-now How to use this properly
-      switch (((Byte)type).byteValue()) {
-         case AMQPMessageSupport.QUEUE_TYPE:
-         case AMQPMessageSupport.TEMP_QUEUE_TYPE:
-            return RoutingType.ANYCAST;
-
-         case AMQPMessageSupport.TOPIC_TYPE:
-         case AMQPMessageSupport.TEMP_TOPIC_TYPE:
-            return RoutingType.MULTICAST;
-         default:
-            return null;
-      } */
+   public RoutingType getRoutingType() {
+      Object routingType = getSymbol(AMQPMessageSupport.ROUTING_TYPE);
 
-      return null;
+      if (routingType != null) {
+         return RoutingType.getType((byte) routingType);
+      } else {
+         return null;
+      }
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message 
setRoutingType(RoutingType routingType) {
+      parseHeaders();
+      if (routingType == null) {
+         removeSymbol(AMQPMessageSupport.ROUTING_TYPE);
+      }
+      setSymbol(AMQPMessageSupport.ROUTING_TYPE, routingType.getType());
+      return this;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 0dd54db..da2f4e0 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -65,6 +65,11 @@ public final class AMQPMessageSupport {
    public static final Symbol JMS_DELIVERY_TIME = 
Symbol.getSymbol("x-opt-delivery-time");
 
    /**
+    * Attribute used to mark the Application defined delivery time assigned to 
the message
+    */
+   public static final Symbol ROUTING_TYPE = 
Symbol.getSymbol("x-opt-routing-type");
+
+   /**
     * Value mapping for JMS_MSG_TYPE which indicates the message is a generic 
JMS Message
     * which has no body.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 3bd95f4..d28eda4 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -25,7 +25,6 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessageListener;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
@@ -43,11 +42,6 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
-   public RoutingType getRouteType() {
-      return null;
-   }
-
-   @Override
    public SimpleString getReplyTo() {
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index b5d2c86..7a8ed3b 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -329,9 +329,9 @@ public class AMQSession implements SessionCallback {
 
          if (actualDestinations[i].isQueue()) {
             checkAutoCreateQueue(new 
SimpleString(actualDestinations[i].getPhysicalName()), 
actualDestinations[i].isTemporary());
-            
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
 RoutingType.ANYCAST.getType());
+            coreMsg.setRoutingType(RoutingType.ANYCAST);
          } else {
-            
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
 RoutingType.MULTICAST.getType());
+            coreMsg.setRoutingType(RoutingType.MULTICAST);
          }
          PagingStore store = server.getPagingManager().getPageStore(address);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 3f68c6f..c5fc8f1 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -180,7 +180,7 @@ public abstract class VersionedStompFrameHandler {
 
          CoreMessage message = connection.createServerMessage();
          if (routingType != null) {
-            message.putByteProperty(Message.HDR_ROUTING_TYPE, 
routingType.getType());
+            message.setRoutingType(routingType);
          }
          message.setTimestamp(timestamp);
          message.setAddress(SimpleString.toSimpleString(destination));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index c73fd80..124a43d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -108,13 +108,13 @@ public class DivertImpl implements Divert {
 
          switch (routingType) {
             case ANYCAST:
-               copy.putByteProperty(Message.HDR_ROUTING_TYPE, 
RoutingType.ANYCAST.getType());
+               copy.setRoutingType(RoutingType.ANYCAST);
                break;
             case MULTICAST:
-               copy.putByteProperty(Message.HDR_ROUTING_TYPE, 
RoutingType.MULTICAST.getType());
+               copy.setRoutingType(RoutingType.MULTICAST);
                break;
             case STRIP:
-               copy.removeProperty(Message.HDR_ROUTING_TYPE);
+               copy.setRoutingType(null);
                break;
             case PASS:
                break;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 97a4249..ae4c16e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1619,7 +1619,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
       RoutingStatus result = RoutingStatus.OK;
 
-      RoutingType routingType = msg.getRouteType();
+      RoutingType routingType = msg.getRoutingType();
 
          /* TODO-now: How to address here with AMQP?
          if (originalAddress != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index dd48b58..5cea833 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -284,11 +284,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
    class FakeMessage extends RefCountMessage {
 
       @Override
-      public RoutingType getRouteType() {
-         return null;
-      }
-
-      @Override
       public SimpleString getReplyTo() {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 10f06b2..4757659 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -35,7 +35,9 @@ import javax.jms.JMSException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -208,7 +210,33 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
-   public void testAnycastMessageRoutingExclusivity() throws Exception {
+   public void testQueueReceiverReadMessageWithDivert() throws Exception {
+      final String forwardingAddress = getTestName() + "Divert";
+      final SimpleString simpleForwardingAddress = 
SimpleString.toSimpleString(forwardingAddress);
+      server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, 
simpleForwardingAddress, null, true, false);
+      server.getActiveMQServerControl().createDivert("name", "routingName", 
getTestName(), forwardingAddress, true, null, null, 
DivertConfigurationRoutingType.ANYCAST.toString());
+      sendMessages(getTestName(), 1);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(forwardingAddress);
+
+      Queue queueView = getProxyToQueue(forwardingAddress);
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testAnycastMessageRoutingExclusivityUsingPrefix() throws 
Exception {
       final String addressA = "addressA";
       final String queueA = "queueA";
       final String queueB = "queueB";
@@ -226,8 +254,27 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
       assertEquals(0, 
server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
    }
 
+   @Test(timeout = 60000)
+   public void testAnycastMessageRoutingExclusivityUsingProperty() throws 
Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + 
"," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, 
RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, 
RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueC, 
RoutingType.MULTICAST.toString());
+
+      sendMessages(addressA, 1, RoutingType.ANYCAST);
+
+      assertEquals(1, 
server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + 
server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+      assertEquals(0, 
server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
+   }
+
    @Test
-   public void testMulticastMessageRoutingExclusivity() throws Exception {
+   public void testMulticastMessageRoutingExclusivityUsingPrefix() throws 
Exception {
       final String addressA = "addressA";
       final String queueA = "queueA";
       final String queueB = "queueB";
@@ -245,6 +292,25 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
       assertEquals(2, 
server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + 
server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
    }
 
+   @Test
+   public void testMulticastMessageRoutingExclusivityUsingProperty() throws 
Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + 
"," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, 
RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, 
RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueC, 
RoutingType.MULTICAST.toString());
+
+      sendMessages(addressA, 1, RoutingType.MULTICAST);
+
+      assertEquals(0, 
server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
+      assertEquals(2, 
server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + 
server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+   }
+
    @Test(timeout = 60000)
    public void testMessageDurableFalse() throws Exception {
       sendMessages(getTestName(), 1, false);
@@ -1107,6 +1173,10 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
    }
 
    public void sendMessages(String destinationName, int count) throws 
Exception {
+      sendMessages(destinationName, count, null);
+   }
+
+   public void sendMessages(String destinationName, int count, RoutingType 
routingType) throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       try {
@@ -1116,6 +1186,9 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
          for (int i = 0; i < count; ++i) {
             AmqpMessage message = new AmqpMessage();
             message.setMessageId("MessageID:" + i);
+            if (routingType != null) {
+               
message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), 
routingType.getType());
+            }
             sender.send(message);
          }
       } finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 31e26e3..604b630 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -342,11 +342,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       final long id;
 
       @Override
-      public RoutingType getRouteType() {
-         return null;
-      }
-
-      @Override
       public SimpleString getReplyTo() {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java
index 9e22542..8de6958 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java
@@ -236,7 +236,7 @@ public class RoutingTest extends ActiveMQTestBase {
       sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC);
       ClientProducer p = sendSession.createProducer(addressA);
       ClientMessage message = sendSession.createMessage(false);
-      message.putByteProperty(Message.HDR_ROUTING_TYPE, 
RoutingType.ANYCAST.getType());
+      message.setRoutingType(RoutingType.ANYCAST);
       p.send(message);
       sendSession.close();
       assertEquals(1, server.locateQueue(queueA).getMessageCount() + 
server.locateQueue(queueB).getMessageCount());
@@ -255,7 +255,7 @@ public class RoutingTest extends ActiveMQTestBase {
       sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC);
       ClientProducer p = sendSession.createProducer(addressA);
       ClientMessage message = sendSession.createMessage(false);
-      message.putByteProperty(Message.HDR_ROUTING_TYPE, 
RoutingType.MULTICAST.getType());
+      message.setRoutingType(RoutingType.MULTICAST);
       p.send(message);
       sendSession.close();
       assertEquals(0, server.locateQueue(queueA).getMessageCount());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/427039ef/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 69a360e..5311601 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -165,7 +165,7 @@ public class DivertTest extends ActiveMQTestBase {
       for (int i = 0; i < numMessages; i++) {
          ClientMessage message = session.createMessage(false);
 
-         message.putByteProperty(Message.HDR_ROUTING_TYPE, 
RoutingType.MULTICAST.getType());
+         message.setRoutingType(RoutingType.MULTICAST);
 
          message.putIntProperty(propKey, i);
 
@@ -238,7 +238,7 @@ public class DivertTest extends ActiveMQTestBase {
       for (int i = 0; i < numMessages; i++) {
          ClientMessage message = session.createMessage(false);
 
-         message.putByteProperty(Message.HDR_ROUTING_TYPE, 
RoutingType.MULTICAST.getType());
+         message.setRoutingType(RoutingType.MULTICAST);
 
          message.putIntProperty(propKey, i);
 

Reply via email to