Repository: activemq Updated Branches: refs/heads/master cad1a2a8c -> 6b18857b5
AMQ-5637: support mapping between the AMQP Subject field and JMSType header https://issues.apache.org/jira/browse/AMQ-5637 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6b18857b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6b18857b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6b18857b Branch: refs/heads/master Commit: 6b18857b530db6b94bb25650b52dc0f54ebafa16 Parents: cad1a2a Author: Robert Gemmell <rob...@apache.org> Authored: Wed Mar 4 18:40:52 2015 +0000 Committer: Robert Gemmell <rob...@apache.org> Committed: Wed Mar 4 18:44:23 2015 +0000 ---------------------------------------------------------------------- .../amqp/message/InboundTransformer.java | 9 +++-- .../message/JMSMappingOutboundTransformer.java | 7 +--- .../amqp/message/OutboundTransformer.java | 2 - .../activemq/transport/amqp/JMSClientTest.java | 41 ++++++++++++++++++++ 4 files changed, 47 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index e8ac740..9ceb096 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -138,11 +138,12 @@ public abstract class InboundTransformer { if (ma != null) { for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) { String key = entry.getKey().toString(); - if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) { + if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { + // Legacy annotation, JMSType value will be replaced by Subject further down if also present. jms.setJMSType(entry.getValue().toString()); - } else { - setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); } + + setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); } } @@ -175,7 +176,7 @@ public abstract class InboundTransformer { jms.setJMSDestination(vendor.createDestination(properties.getTo())); } if (properties.getSubject() != null) { - jms.setStringProperty(prefixVendor + "Subject", properties.getSubject()); + jms.setJMSType(properties.getSubject()); } if (properties.getReplyTo() != null) { jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 1a837ae..de1bbda 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -153,10 +153,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); if (msg.getJMSType() != null) { - if (maMap == null) { - maMap = new HashMap<Symbol, Object>(); - } - maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType()); + props.setSubject(msg.getJMSType()); } if (msg.getJMSMessageID() != null) { props.setMessageId(msg.getJMSMessageID()); @@ -234,8 +231,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { } String name = key.substring(prefixMessageAnnotationsKey.length()); maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); - } else if (key.equals(subjectKey)) { - props.setSubject(msg.getStringProperty(key)); } else if (key.equals(contentTypeKey)) { props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); } else if (key.equals(contentEncodingKey)) { http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java index 61749d1..1d28a07 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java @@ -32,7 +32,6 @@ public abstract class OutboundTransformer { String firstAcquirerKey; String prefixDeliveryAnnotationsKey; String prefixMessageAnnotationsKey; - String subjectKey; String contentTypeKey; String contentEncodingKey; String replyToGroupIDKey; @@ -57,7 +56,6 @@ public abstract class OutboundTransformer { firstAcquirerKey = prefixVendor + "FirstAcquirer"; prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; - subjectKey = prefixVendor +"Subject"; contentTypeKey = prefixVendor +"ContentType"; contentEncodingKey = prefixVendor +"ContentEncoding"; replyToGroupIDKey = prefixVendor +"ReplyToGroupID"; http://git-wip-us.apache.org/repos/asf/activemq/blob/6b18857b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 552d828..6799a83 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -322,6 +322,47 @@ public class JMSClientTest extends JMSClientTestSupport { } } + @SuppressWarnings("rawtypes") + @Test(timeout=30000) + public void testSelectorsWithJMSType() throws Exception{ + ActiveMQAdmin.enableJMSFrameTracing(); + + connection = createConnection(); + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("text"); + p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + TextMessage message2 = session.createTextMessage(); + String type = "myJMSType"; + message2.setJMSType(type); + message2.setText("text + type"); + p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + assertTrue(m instanceof TextMessage); + count ++; + } + + assertEquals(2, count); + + MessageConsumer consumer = session.createConsumer(queue, "JMSType = '"+ type +"'"); + Message msg = consumer.receive(TestConfig.TIMEOUT); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("Unexpected JMSType value", type, msg.getJMSType()); + assertEquals("Unexpected message content", "text + type", ((TextMessage) msg).getText()); + } + } + abstract class Testable implements Runnable { protected String msg; synchronized boolean passed() {