Repository: activemq
Updated Branches:
  refs/heads/master cad1a2a8c -> 6b18857b5


AMQ-5637: support mapping between the AMQP Subject field and JMSType header

https://issues.apache.org/jira/browse/AMQ-5637


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6b18857b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6b18857b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6b18857b

Branch: refs/heads/master
Commit: 6b18857b530db6b94bb25650b52dc0f54ebafa16
Parents: cad1a2a
Author: Robert Gemmell <rob...@apache.org>
Authored: Wed Mar 4 18:40:52 2015 +0000
Committer: Robert Gemmell <rob...@apache.org>
Committed: Wed Mar 4 18:44:23 2015 +0000

----------------------------------------------------------------------
 .../amqp/message/InboundTransformer.java        |  9 +++--
 .../message/JMSMappingOutboundTransformer.java  |  7 +---
 .../amqp/message/OutboundTransformer.java       |  2 -
 .../activemq/transport/amqp/JMSClientTest.java  | 41 ++++++++++++++++++++
 4 files changed, 47 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index e8ac740..9ceb096 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -138,11 +138,12 @@ public abstract class InboundTransformer {
         if (ma != null) {
             for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
                 String key = entry.getKey().toString();
-                if ("x-opt-jms-type".equals(key.toString()) && 
entry.getValue() != null) {
+                if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
+                    // Legacy annotation, JMSType value will be replaced by 
Subject further down if also present.
                     jms.setJMSType(entry.getValue().toString());
-                } else {
-                    setProperty(jms, prefixVendor + prefixMessageAnnotations + 
key, entry.getValue());
                 }
+
+                setProperty(jms, prefixVendor + prefixMessageAnnotations + 
key, entry.getValue());
             }
         }
 
@@ -175,7 +176,7 @@ public abstract class InboundTransformer {
                 
jms.setJMSDestination(vendor.createDestination(properties.getTo()));
             }
             if (properties.getSubject() != null) {
-                jms.setStringProperty(prefixVendor + "Subject", 
properties.getSubject());
+                jms.setJMSType(properties.getSubject());
             }
             if (properties.getReplyTo() != null) {
                 
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));

http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 1a837ae..de1bbda 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -153,10 +153,7 @@ public class JMSMappingOutboundTransformer extends 
OutboundTransformer {
         header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 
? true : false);
         header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
         if (msg.getJMSType() != null) {
-            if (maMap == null) {
-                maMap = new HashMap<Symbol, Object>();
-            }
-            maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType());
+            props.setSubject(msg.getJMSType());
         }
         if (msg.getJMSMessageID() != null) {
             props.setMessageId(msg.getJMSMessageID());
@@ -234,8 +231,6 @@ public class JMSMappingOutboundTransformer extends 
OutboundTransformer {
                 }
                 String name = 
key.substring(prefixMessageAnnotationsKey.length());
                 maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
-            } else if (key.equals(subjectKey)) {
-                props.setSubject(msg.getStringProperty(key));
             } else if (key.equals(contentTypeKey)) {
                 
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
             } else if (key.equals(contentEncodingKey)) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
index 61749d1..1d28a07 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
@@ -32,7 +32,6 @@ public abstract class OutboundTransformer {
     String firstAcquirerKey;
     String prefixDeliveryAnnotationsKey;
     String prefixMessageAnnotationsKey;
-    String subjectKey;
     String contentTypeKey;
     String contentEncodingKey;
     String replyToGroupIDKey;
@@ -57,7 +56,6 @@ public abstract class OutboundTransformer {
         firstAcquirerKey = prefixVendor + "FirstAcquirer";
         prefixDeliveryAnnotationsKey = prefixVendor + 
prefixDeliveryAnnotations;
         prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
-        subjectKey =  prefixVendor +"Subject";
         contentTypeKey = prefixVendor +"ContentType";
         contentEncodingKey = prefixVendor +"ContentEncoding";
         replyToGroupIDKey = prefixVendor +"ReplyToGroupID";

http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 552d828..6799a83 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -322,6 +322,47 @@ public class JMSClientTest extends JMSClientTestSupport {
         }
     }
 
+    @SuppressWarnings("rawtypes")
+    @Test(timeout=30000)
+    public void testSelectorsWithJMSType() throws Exception{
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getDestinationName());
+            MessageProducer p = session.createProducer(queue);
+
+            TextMessage message = session.createTextMessage();
+            message.setText("text");
+            p.send(message, DeliveryMode.NON_PERSISTENT, 
Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+            TextMessage message2 = session.createTextMessage();
+            String type = "myJMSType";
+            message2.setJMSType(type);
+            message2.setText("text + type");
+            p.send(message2, DeliveryMode.NON_PERSISTENT, 
Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+            QueueBrowser browser = session.createBrowser(queue);
+            Enumeration enumeration = browser.getEnumeration();
+            int count = 0;
+            while (enumeration.hasMoreElements()) {
+                Message m = (Message) enumeration.nextElement();
+                assertTrue(m instanceof TextMessage);
+                count ++;
+            }
+
+            assertEquals(2, count);
+
+            MessageConsumer consumer = session.createConsumer(queue, "JMSType 
= '"+ type +"'");
+            Message msg = consumer.receive(TestConfig.TIMEOUT);
+            assertNotNull(msg);
+            assertTrue(msg instanceof TextMessage);
+            assertEquals("Unexpected JMSType value", type, msg.getJMSType());
+            assertEquals("Unexpected message content", "text + type", 
((TextMessage) msg).getText());
+        }
+    }
+
     abstract class Testable implements Runnable {
         protected String msg;
         synchronized boolean passed() {

Reply via email to