CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d19e5d74 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d19e5d74 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d19e5d74 Branch: refs/heads/master Commit: d19e5d742b2cbed2f84ad2598bbe9c4789789d3b Parents: 5b1d8da Author: Claus Ibsen <[email protected]> Authored: Mon Sep 7 11:10:01 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Sep 7 11:10:01 2015 +0200 ---------------------------------------------------------------------- components/camel-sjms/pom.xml | 219 +++--- .../camel/component/sjms/SjmsComponent.java | 27 +- .../camel/component/sjms/SjmsConsumer.java | 2 +- .../camel/component/sjms/SjmsEndpoint.java | 127 ++- .../sjms/SjmsHeaderFilterStrategy.java | 14 +- .../camel/component/sjms/SjmsMessage.java | 283 +++++++ .../component/sjms/batch/SjmsBatchConsumer.java | 5 +- .../component/sjms/batch/SjmsBatchEndpoint.java | 152 +++- .../sjms/consumer/AbstractMessageHandler.java | 22 +- .../sjms/consumer/InOutMessageHandler.java | 5 +- .../sjms/jms/DefaultJmsKeyFormatStrategy.java | 2 +- .../camel/component/sjms/jms/JmsBinding.java | 606 ++++++++++++++ .../sjms/jms/JmsKeyFormatStrategy.java | 41 + .../component/sjms/jms/JmsMessageHelper.java | 782 ++++++------------- .../component/sjms/jms/KeyFormatStrategy.java | 41 - .../sjms/jms/MessageCreatedStrategy.java | 39 + .../component/sjms/producer/InOnlyProducer.java | 23 +- .../component/sjms/producer/InOutProducer.java | 41 +- .../JMSMessageHelperTypeConversionTest.java | 201 ----- 19 files changed, 1671 insertions(+), 961 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml index 6977796..3f8d023 100644 --- a/components/camel-sjms/pom.xml +++ b/components/camel-sjms/pom.xml @@ -15,121 +15,122 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.camel</groupId> - <artifactId>components</artifactId> - <version>2.16-SNAPSHOT</version> - </parent> + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.16-SNAPSHOT</version> + </parent> - <artifactId>camel-sjms</artifactId> - <packaging>bundle</packaging> - <name>Camel :: Simple JMS</name> - <description>A pure Java JMS Camel Component</description> + <artifactId>camel-sjms</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Simple JMS</name> + <description>A pure Java JMS Camel Component</description> - <properties> - <camel.osgi.export.pkg> - org.apache.camel.component.sjms, - org.apache.camel.component.sjms.jms, - org.apache.camel.component.sjms.batch - </camel.osgi.export.pkg> - <camel.osgi.private.pkg> - org.apache.camel.component.sjms.consumer, - org.apache.camel.component.sjms.producer, - org.apache.camel.component.sjms.taskmanager, - org.apache.camel.component.sjms.tx - </camel.osgi.private.pkg> - <camel.osgi.export.service> - org.apache.camel.spi.ComponentResolver;component=sjms, - org.apache.camel.spi.ComponentResolver;component=sjms-batch - </camel.osgi.export.service> - </properties> + <properties> + <camel.osgi.export.pkg> + org.apache.camel.component.sjms, + org.apache.camel.component.sjms.jms, + org.apache.camel.component.sjms.batch + </camel.osgi.export.pkg> + <camel.osgi.private.pkg> + org.apache.camel.component.sjms.consumer, + org.apache.camel.component.sjms.producer, + org.apache.camel.component.sjms.taskmanager, + org.apache.camel.component.sjms.tx + </camel.osgi.private.pkg> + <camel.osgi.export.service> + org.apache.camel.spi.ComponentResolver;component=sjms, + org.apache.camel.spi.ComponentResolver;component=sjms-batch + </camel.osgi.export.service> + </properties> - <dependencies> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-core</artifactId> - </dependency> - <dependency> - <groupId>commons-pool</groupId> - <artifactId>commons-pool</artifactId> - </dependency> + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>commons-pool</groupId> + <artifactId>commons-pool</artifactId> + </dependency> - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jms_1.1_spec</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-annotation_1.0_spec</artifactId> - <version>${geronimo-annotation-spec-version}</version> - <scope>provided</scope> - </dependency> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-annotation_1.0_spec</artifactId> + <version>${geronimo-annotation-spec-version}</version> + <scope>provided</scope> + </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-test</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-kahadb-store</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-pool</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.atomikos</groupId> - <artifactId>transactions-jta</artifactId> - <version>${atomikos-transactions-version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>${commons-io-version}</version> - <scope>test</scope> - </dependency> - </dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-pool</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.atomikos</groupId> + <artifactId>transactions-jta</artifactId> + <version>${atomikos-transactions-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons-io-version}</version> + <scope>test</scope> + </dependency> + </dependencies> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <forkCount>1</forkCount> - <reuseForks>false</reuseForks> - <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-clean-plugin</artifactId> - <configuration> - <filesets> - <fileset> - <directory>${basedir}/activemq-data</directory> - </fileset> - </filesets> - </configuration> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-clean-plugin</artifactId> + <configuration> + <filesets> + <fileset> + <directory>${basedir}/activemq-data</directory> + </fileset> + </filesets> + </configuration> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java index 3433ec9..a7347c7 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java @@ -27,7 +27,8 @@ import org.apache.camel.component.sjms.jms.ConnectionFactoryResource; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy; import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; -import org.apache.camel.component.sjms.jms.KeyFormatStrategy; +import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy; +import org.apache.camel.component.sjms.jms.MessageCreatedStrategy; import org.apache.camel.component.sjms.taskmanager.TimedTaskManager; import org.apache.camel.impl.UriEndpointComponent; import org.apache.camel.spi.HeaderFilterStrategy; @@ -44,12 +45,13 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS private ConnectionFactory connectionFactory; private ConnectionResource connectionResource; private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy(); - private KeyFormatStrategy keyFormatStrategy = new DefaultJmsKeyFormatStrategy(); + private JmsKeyFormatStrategy jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); private Integer connectionCount = 1; private TransactionCommitStrategy transactionCommitStrategy; private TimedTaskManager timedTaskManager; private DestinationCreationStrategy destinationCreationStrategy; private ExecutorService asyncStartStopExecutorService; + private MessageCreatedStrategy messageCreatedStrategy; public SjmsComponent() { super(SjmsEndpoint.class); @@ -200,12 +202,12 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy * and refer to it using the # notation. */ - public void setKeyFormatStrategy(KeyFormatStrategy keyFormatStrategy) { - this.keyFormatStrategy = keyFormatStrategy; + public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) { + this.jmsKeyFormatStrategy = jmsKeyFormatStrategy; } - public KeyFormatStrategy getKeyFormatStrategy() { - return keyFormatStrategy; + public JmsKeyFormatStrategy getJmsKeyFormatStrategy() { + return jmsKeyFormatStrategy; } public TransactionCommitStrategy getTransactionCommitStrategy() { @@ -241,4 +243,17 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS public void setTimedTaskManager(TimedTaskManager timedTaskManager) { this.timedTaskManager = timedTaskManager; } + + public MessageCreatedStrategy getMessageCreatedStrategy() { + return messageCreatedStrategy; + } + + /** + * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt> + * objects when Camel is sending a JMS message. + */ + public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { + this.messageCreatedStrategy = messageCreatedStrategy; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index 322ad5c..162e252 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -206,7 +206,7 @@ public class SjmsConsumer extends DefaultConsumer { */ protected MessageListener createMessageHandler(Session session) { - TransactionCommitStrategy commitStrategy = null; + TransactionCommitStrategy commitStrategy; if (getTransactionCommitStrategy() != null) { commitStrategy = getTransactionCommitStrategy(); } else if (getTransactionBatchCount() > 0) { http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index c7ed9ac..cb5d396 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -16,22 +16,30 @@ */ package org.apache.camel.component.sjms; +import javax.jms.Message; +import javax.jms.Session; + import org.apache.camel.Component; import org.apache.camel.Consumer; +import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy; +import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy; import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; import org.apache.camel.component.sjms.jms.DestinationNameParser; -import org.apache.camel.component.sjms.jms.KeyFormatStrategy; +import org.apache.camel.component.sjms.jms.JmsBinding; +import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy; +import org.apache.camel.component.sjms.jms.MessageCreatedStrategy; import org.apache.camel.component.sjms.jms.SessionAcknowledgementType; import org.apache.camel.component.sjms.producer.InOnlyProducer; import org.apache.camel.component.sjms.producer.InOutProducer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -43,11 +51,13 @@ import org.slf4j.LoggerFactory; * A JMS Endpoint */ @UriEndpoint(scheme = "sjms", title = "Simple JMS", syntax = "sjms:destinationType:destinationName", consumerClass = SjmsConsumer.class, label = "messaging") -public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { +public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, HeaderFilterStrategyAware { protected final Logger logger = LoggerFactory.getLogger(getClass()); private boolean topic; + private JmsBinding binding; + @UriPath(enums = "queue,topic", defaultValue = "queue", description = "The kind of destination to use") private String destinationType; @UriPath @Metadata(required = "true") @@ -55,6 +65,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu @UriParam(label = "consumer", defaultValue = "true") private boolean synchronous = true; @UriParam + private HeaderFilterStrategy headerFilterStrategy; + @UriParam + private boolean includeAllJMSXProperties; + @UriParam private boolean transacted; @UriParam(label = "producer") private String namedReplyTo; @@ -88,10 +102,16 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu private boolean prefillPool = true; @UriParam(label = "producer", defaultValue = "true") private boolean allowNullBody = true; + @UriParam(defaultValue = "true") + private boolean mapJmsMessage = true; @UriParam private TransactionCommitStrategy transactionCommitStrategy; @UriParam private DestinationCreationStrategy destinationCreationStrategy = new DefaultDestinationCreationStrategy(); + @UriParam + private MessageCreatedStrategy messageCreatedStrategy; + @UriParam + private JmsKeyFormatStrategy jmsKeyFormatStrategy; public SjmsEndpoint() { } @@ -146,6 +166,34 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu return true; } + public Exchange createExchange(Message message, Session session) { + Exchange exchange = createExchange(getExchangePattern()); + exchange.setIn(new SjmsMessage(message, session, getBinding())); + return exchange; + } + + public JmsBinding getBinding() { + if (binding == null) { + binding = createBinding(); + } + return binding; + } + + /** + * Creates the {@link org.apache.camel.component.sjms.jms.JmsBinding} to use. + */ + protected JmsBinding createBinding() { + return new JmsBinding(isMapJmsMessage(), isAllowNullBody(), getHeaderFilterStrategy(), getJmsKeyFormatStrategy(), getMessageCreatedStrategy()); + } + + /** + * Sets the binding used to convert from a Camel message to and from a JMS + * message + */ + public void setBinding(JmsBinding binding) { + this.binding = binding; + } + /** * DestinationName is a JMS queue or topic name. By default, the destinationName is interpreted as a queue name. */ @@ -157,16 +205,35 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu return destinationName; } - public ConnectionResource getConnectionResource() { - return getComponent().getConnectionResource(); + public HeaderFilterStrategy getHeaderFilterStrategy() { + if (headerFilterStrategy == null) { + headerFilterStrategy = new SjmsHeaderFilterStrategy(isIncludeAllJMSXProperties()); + } + return headerFilterStrategy; + } + + /** + * To use a custom HeaderFilterStrategy to filter header to and from Camel message. + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { + this.headerFilterStrategy = strategy; } - public HeaderFilterStrategy getSjmsHeaderFilterStrategy() { - return getComponent().getHeaderFilterStrategy(); + public boolean isIncludeAllJMSXProperties() { + return includeAllJMSXProperties; } - public KeyFormatStrategy getJmsKeyFormatStrategy() { - return getComponent().getKeyFormatStrategy(); + /** + * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. + * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. + * Note: If you are using a custom headerFilterStrategy then this option does not apply. + */ + public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) { + this.includeAllJMSXProperties = includeAllJMSXProperties; + } + + public ConnectionResource getConnectionResource() { + return getComponent().getConnectionResource(); } public boolean isSynchronous() { @@ -417,4 +484,48 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu public void setAllowNullBody(boolean allowNullBody) { this.allowNullBody = allowNullBody; } + + public boolean isMapJmsMessage() { + return mapJmsMessage; + } + + /** + * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. + * See section about how mapping works below for more details. + */ + public void setMapJmsMessage(boolean mapJmsMessage) { + this.mapJmsMessage = mapJmsMessage; + } + + public MessageCreatedStrategy getMessageCreatedStrategy() { + return messageCreatedStrategy; + } + + /** + * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt> + * objects when Camel is sending a JMS message. + */ + public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { + this.messageCreatedStrategy = messageCreatedStrategy; + } + + public JmsKeyFormatStrategy getJmsKeyFormatStrategy() { + if (jmsKeyFormatStrategy == null) { + jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); + } + return jmsKeyFormatStrategy; + } + + /** + * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. + * Camel provides two implementations out of the box: default and passthrough. + * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. + * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. + * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy + * and refer to it using the # notation. + */ + public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) { + this.jmsKeyFormatStrategy = jmsKeyFormatStrategy; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java index 82a8a90..0da77ca 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java @@ -24,14 +24,18 @@ import org.apache.camel.impl.DefaultHeaderFilterStrategy; public class SjmsHeaderFilterStrategy extends DefaultHeaderFilterStrategy { public SjmsHeaderFilterStrategy() { - initialize(); + this(false); + } + + public SjmsHeaderFilterStrategy(boolean includeAllJMSXProperties) { + if (!includeAllJMSXProperties) { + initialize(); + } } protected void initialize() { - // ignore provider specified JMS extension headers see page 39 of JMS - // 1.1 specification - // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in - // AqjmsMessage + // ignore provider specified JMS extension headers see page 39 of JMS 1.1 specification + // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in AqjmsMessage getOutFilter().add("JMSXUserID"); getOutFilter().add("JMSXAppID"); getOutFilter().add("JMSXDeliveryCount"); http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java new file mode 100644 index 0000000..92f8531 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms; + +import java.io.File; +import java.util.Map; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.component.sjms.jms.JmsBinding; +import org.apache.camel.component.sjms.jms.JmsMessageHelper; +import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents a {@link org.apache.camel.Message} for working with JMS + * + * @version + */ +public class SjmsMessage extends DefaultMessage { + private static final Logger LOG = LoggerFactory.getLogger(SjmsMessage.class); + private Message jmsMessage; + private Session jmsSession; + private JmsBinding binding; + + public SjmsMessage(Message jmsMessage, Session jmsSession, JmsBinding binding) { + setJmsMessage(jmsMessage); + setJmsSession(jmsSession); + setBinding(binding); + } + + @Override + public String toString() { + // do not print jmsMessage as there could be sensitive details + if (jmsMessage != null) { + try { + return "SjmsMessage[JmsMessageID: " + jmsMessage.getJMSMessageID() + "]"; + } catch (Throwable e) { + // ignore + } + } + return "SjmsMessage@" + ObjectHelper.getIdentityHashCode(this); + } + + @Override + public void copyFrom(org.apache.camel.Message that) { + if (that == this) { + // the same instance so do not need to copy + return; + } + + // must initialize headers before we set the JmsMessage to avoid Camel + // populating it before we do the copy + getHeaders().clear(); + + boolean copyMessageId = true; + if (that instanceof SjmsMessage) { + SjmsMessage thatMessage = (SjmsMessage) that; + this.jmsMessage = thatMessage.jmsMessage; + if (this.jmsMessage != null) { + // for performance lets not copy the messageID if we are a JMS message + copyMessageId = false; + } + } + if (copyMessageId) { + setMessageId(that.getMessageId()); + } + + // copy body and fault flag + setBody(that.getBody()); + setFault(that.isFault()); + + // we have already cleared the headers + if (that.hasHeaders()) { + getHeaders().putAll(that.getHeaders()); + } + + getAttachments().clear(); + if (that.hasAttachments()) { + getAttachments().putAll(that.getAttachments()); + } + } + + public JmsBinding getBinding() { + if (binding == null) { + binding = ExchangeHelper.getBinding(getExchange(), JmsBinding.class); + } + return binding; + } + + public void setBinding(JmsBinding binding) { + this.binding = binding; + } + + /** + * Returns the underlying JMS message + */ + public Message getJmsMessage() { + return jmsMessage; + } + + public void setJmsMessage(Message jmsMessage) { + if (jmsMessage != null) { + try { + setMessageId(jmsMessage.getJMSMessageID()); + } catch (JMSException e) { + LOG.warn("Unable to retrieve JMSMessageID from JMS Message", e); + } + } + this.jmsMessage = jmsMessage; + } + + /** + * Returns the underlying JMS session. + * <p/> + * This may be <tt>null</tt>. + */ + public Session getJmsSession() { + return jmsSession; + } + + public void setJmsSession(Session jmsSession) { + this.jmsSession = jmsSession; + } + + @Override + public void setBody(Object body) { + super.setBody(body); + if (body == null) { + // preserver headers even if we set body to null + ensureInitialHeaders(); + // remove underlying jmsMessage since we mutated body to null + jmsMessage = null; + } + } + + public Object getHeader(String name) { + Object answer = null; + + // we will exclude using JMS-prefixed headers here to avoid strangeness with some JMS providers + // e.g. ActiveMQ returns the String not the Destination type for "JMSReplyTo"! + // only look in jms message directly if we have not populated headers + if (jmsMessage != null && !hasPopulatedHeaders() && !name.startsWith("JMS")) { + try { + // use binding to do the lookup as it has to consider using encoded keys + answer = getBinding().getObjectProperty(jmsMessage, name); + } catch (JMSException e) { + throw new RuntimeExchangeException("Unable to retrieve header from JMS Message: " + name, getExchange(), e); + } + } + // only look if we have populated headers otherwise there are no headers at all + // if we do lookup a header starting with JMS then force a lookup + if (answer == null && (hasPopulatedHeaders() || name.startsWith("JMS"))) { + answer = super.getHeader(name); + } + return answer; + } + + @Override + public Map<String, Object> getHeaders() { + ensureInitialHeaders(); + return super.getHeaders(); + } + + @Override + public Object removeHeader(String name) { + ensureInitialHeaders(); + return super.removeHeader(name); + } + + @Override + public void setHeaders(Map<String, Object> headers) { + ensureInitialHeaders(); + super.setHeaders(headers); + } + + @Override + public void setHeader(String name, Object value) { + ensureInitialHeaders(); + super.setHeader(name, value); + } + + @Override + public SjmsMessage newInstance() { + return new SjmsMessage(null, null, binding); + } + + /** + * Returns true if a new JMS message instance should be created to send to the next component + */ + public boolean shouldCreateNewMessage() { + return super.hasPopulatedHeaders(); + } + + /** + * Ensure that the headers have been populated from the underlying JMS message + * before we start mutating the headers + */ + protected void ensureInitialHeaders() { + if (jmsMessage != null && !hasPopulatedHeaders()) { + // we have not populated headers so force this by creating + // new headers and set it on super + super.setHeaders(createHeaders()); + } + } + + @Override + protected Object createBody() { + if (jmsMessage != null) { + return getBinding().extractBodyFromJms(getExchange(), jmsMessage); + } + return null; + } + + @Override + protected void populateInitialHeaders(Map<String, Object> map) { + if (jmsMessage != null && map != null) { + map.putAll(getBinding().extractHeadersFromJms(jmsMessage, getExchange())); + } + } + + @Override + protected String createMessageId() { + if (jmsMessage == null) { + LOG.trace("No javax.jms.Message set so generating a new message id"); + return super.createMessageId(); + } + try { + String id = getDestinationAsString(jmsMessage.getJMSDestination()) + jmsMessage.getJMSMessageID(); + return getSanitizedString(id); + } catch (JMSException e) { + throw new RuntimeExchangeException("Unable to retrieve JMSMessageID from JMS Message", getExchange(), e); + } + } + + @Override + protected Boolean isTransactedRedelivered() { + if (jmsMessage != null) { + return JmsMessageHelper.getJMSRedelivered(jmsMessage); + } else { + return null; + } + } + + private String getDestinationAsString(Destination destination) throws JMSException { + String result; + if (destination == null) { + result = "null destination!" + File.separator; + } else if (destination instanceof Topic) { + result = "topic" + File.separator + ((Topic) destination).getTopicName() + File.separator; + } else { + result = "queue" + File.separator + ((Queue) destination).getQueueName() + File.separator; + } + return result; + } + + private String getSanitizedString(Object value) { + return value != null ? value.toString().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_") : ""; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index ee2b250..e8ce161 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.sjms.batch; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Date; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -41,6 +42,7 @@ import org.apache.camel.Processor; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,7 +228,8 @@ public class SjmsBatchConsumer extends DefaultConsumer { LOG.debug("Message received: {}", messageCount); if ((message instanceof ObjectMessage) || (message instanceof TextMessage)) { - Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint()); + + final Exchange exchange = getEndpoint().createExchange(message, session); aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange); aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java index b4c052f..49c74ba 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java @@ -16,38 +16,65 @@ */ package org.apache.camel.component.sjms.batch; +import javax.jms.Message; +import javax.jms.Session; + import org.apache.camel.Component; import org.apache.camel.Consumer; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.sjms.SjmsHeaderFilterStrategy; +import org.apache.camel.component.sjms.SjmsMessage; +import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy; import org.apache.camel.component.sjms.jms.DestinationNameParser; +import org.apache.camel.component.sjms.jms.JmsBinding; +import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy; +import org.apache.camel.component.sjms.jms.MessageCreatedStrategy; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; @UriEndpoint(scheme = "sjms-batch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName", - consumerClass = SjmsBatchComponent.class, label = "messaging") -public class SjmsBatchEndpoint extends DefaultEndpoint { + consumerClass = SjmsBatchComponent.class, label = "messaging", consumerOnly = true) +public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ public static final int DEFAULT_COMPLETION_TIMEOUT = 500; public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize"; - @UriPath(label = "consumer") @Metadata(required = "true") + private JmsBinding binding; + + @UriPath @Metadata(required = "true") private String destinationName; - @UriParam(label = "consumer", defaultValue = "1") + @UriParam(defaultValue = "1") private int consumerCount = 1; - @UriParam(label = "consumer", defaultValue = "200") + @UriParam(defaultValue = "200") private int completionSize = DEFAULT_COMPLETION_SIZE; - @UriParam(label = "consumer", defaultValue = "500") + @UriParam(defaultValue = "500") private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT; - @UriParam(label = "consumer", defaultValue = "1000") + @UriParam(defaultValue = "1000") private int pollDuration = 1000; - @UriParam(label = "consumer") @Metadata(required = "true") + @UriParam @Metadata(required = "true") private AggregationStrategy aggregationStrategy; + @UriParam + private HeaderFilterStrategy headerFilterStrategy; + @UriParam + private boolean includeAllJMSXProperties; + @UriParam(defaultValue = "true") + private boolean allowNullBody = true; + @UriParam(defaultValue = "true") + private boolean mapJmsMessage = true; + @UriParam + private MessageCreatedStrategy messageCreatedStrategy; + @UriParam + private JmsKeyFormatStrategy jmsKeyFormatStrategy; + public SjmsBatchEndpoint() { } @@ -78,6 +105,34 @@ public class SjmsBatchEndpoint extends DefaultEndpoint { return new SjmsBatchConsumer(this, processor); } + public Exchange createExchange(Message message, Session session) { + Exchange exchange = createExchange(getExchangePattern()); + exchange.setIn(new SjmsMessage(message, session, getBinding())); + return exchange; + } + + public JmsBinding getBinding() { + if (binding == null) { + binding = createBinding(); + } + return binding; + } + + /** + * Creates the {@link org.apache.camel.component.sjms.jms.JmsBinding} to use. + */ + protected JmsBinding createBinding() { + return new JmsBinding(isMapJmsMessage(), isAllowNullBody(), getHeaderFilterStrategy(), getJmsKeyFormatStrategy(), getMessageCreatedStrategy()); + } + + /** + * Sets the binding used to convert from a Camel message to and from a JMS + * message + */ + public void setBinding(JmsBinding binding) { + this.binding = binding; + } + public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } @@ -141,4 +196,85 @@ public class SjmsBatchEndpoint extends DefaultEndpoint { this.pollDuration = pollDuration; } + public boolean isAllowNullBody() { + return allowNullBody; + } + + /** + * Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown. + */ + public void setAllowNullBody(boolean allowNullBody) { + this.allowNullBody = allowNullBody; + } + + public boolean isMapJmsMessage() { + return mapJmsMessage; + } + + /** + * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. + * See section about how mapping works below for more details. + */ + public void setMapJmsMessage(boolean mapJmsMessage) { + this.mapJmsMessage = mapJmsMessage; + } + + public MessageCreatedStrategy getMessageCreatedStrategy() { + return messageCreatedStrategy; + } + + /** + * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt> + * objects when Camel is sending a JMS message. + */ + public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { + this.messageCreatedStrategy = messageCreatedStrategy; + } + + public JmsKeyFormatStrategy getJmsKeyFormatStrategy() { + if (jmsKeyFormatStrategy == null) { + jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); + } + return jmsKeyFormatStrategy; + } + + /** + * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. + * Camel provides two implementations out of the box: default and passthrough. + * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. + * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. + * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy + * and refer to it using the # notation. + */ + public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) { + this.jmsKeyFormatStrategy = jmsKeyFormatStrategy; + } + + public HeaderFilterStrategy getHeaderFilterStrategy() { + if (headerFilterStrategy == null) { + headerFilterStrategy = new SjmsHeaderFilterStrategy(isIncludeAllJMSXProperties()); + } + return headerFilterStrategy; + } + + /** + * To use a custom HeaderFilterStrategy to filter header to and from Camel message. + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { + this.headerFilterStrategy = strategy; + } + + public boolean isIncludeAllJMSXProperties() { + return includeAllJMSXProperties; + } + + /** + * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. + * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. + * Note: If you are using a custom headerFilterStrategy then this option does not apply. + */ + public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) { + this.includeAllJMSXProperties = includeAllJMSXProperties; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java index 1598a43..f394008 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java @@ -25,8 +25,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.sjms.SjmsEndpoint; -import org.apache.camel.component.sjms.jms.JmsMessageHelper; -import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.Synchronization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +68,7 @@ public abstract class AbstractMessageHandler implements MessageListener { public void onMessage(Message message) { RuntimeCamelException rce = null; try { - SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint(); - final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy()); + final Exchange exchange = getEndpoint().createExchange(message, getSession()); log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); @@ -80,10 +77,10 @@ public abstract class AbstractMessageHandler implements MessageListener { } try { if (isTransacted() || isSynchronous()) { - log.debug(" Handling synchronous message: {}", exchange.getIn().getBody()); + log.debug("Handling synchronous message: {}", exchange.getIn().getBody()); handleMessage(exchange); } else { - log.debug(" Handling asynchronous message: {}", exchange.getIn().getBody()); + log.debug("Handling asynchronous message: {}", exchange.getIn().getBody()); executor.execute(new Runnable() { @Override public void run() { @@ -96,12 +93,10 @@ public abstract class AbstractMessageHandler implements MessageListener { }); } } catch (Exception e) { - if (exchange != null) { - if (exchange.getException() == null) { - exchange.setException(e); - } else { - throw e; - } + if (exchange.getException() == null) { + exchange.setException(e); + } else { + throw e; } } } catch (Exception e) { @@ -113,9 +108,6 @@ public abstract class AbstractMessageHandler implements MessageListener { } } - /** - * @param exchange - */ public abstract void handleMessage(final Exchange exchange); /** http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java index 2068da5..97def12 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java @@ -32,7 +32,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.component.sjms.jms.JmsConstants; -import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.spi.Synchronization; /** @@ -157,7 +156,9 @@ public class InOutMessageHandler extends AbstractMessageHandler { @Override public void done(boolean sync) { try { - Message response = JmsMessageHelper.createMessage(exchange, getSession(), getEndpoint()); + // the response can either be in OUT or IN + org.apache.camel.Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + Message response = getEndpoint().getBinding().makeJmsMessage(exchange, msg.getBody(), msg.getHeaders(), getSession(), null); response.setJMSCorrelationID(exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class)); localProducer.send(response); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java index d95d2b5..4fa3308 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java @@ -22,7 +22,7 @@ package org.apache.camel.component.sjms.jms; * This can be used for sending keys contain package names that is common by * Java frameworks. */ -public class DefaultJmsKeyFormatStrategy implements KeyFormatStrategy { +public class DefaultJmsKeyFormatStrategy implements JmsKeyFormatStrategy { public String encodeKey(String key) { String answer = key.replace(".", "_DOT_"); http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java new file mode 100644 index 0000000..8dc2841 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java @@ -0,0 +1,606 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.jms; + +import java.io.File; +import java.io.InputStream; +import java.io.Reader; +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageFormatException; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.StreamCache; +import org.apache.camel.WrappedFile; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Node; + +import static org.apache.camel.component.sjms.jms.JmsMessageHelper.normalizeDestinationName; + +/** + * A Strategy used to convert between a Camel {@link org.apache.camel.Exchange} and {@link org.apache.camel.Message} + * to and from a JMS {@link javax.jms.Message} + */ +public class JmsBinding { + + private static final Logger LOG = LoggerFactory.getLogger(JmsBinding.class); + private final boolean mapJmsMessage; + private final boolean allowNullBody; + private final HeaderFilterStrategy headerFilterStrategy; + private final JmsKeyFormatStrategy jmsJmsKeyFormatStrategy; + private final MessageCreatedStrategy messageCreatedStrategy; + + public JmsBinding(boolean mapJmsMessage, boolean allowNullBody, + HeaderFilterStrategy headerFilterStrategy, JmsKeyFormatStrategy jmsJmsKeyFormatStrategy, + MessageCreatedStrategy messageCreatedStrategy) { + this.mapJmsMessage = mapJmsMessage; + this.allowNullBody = allowNullBody; + this.headerFilterStrategy = headerFilterStrategy; + this.jmsJmsKeyFormatStrategy = jmsJmsKeyFormatStrategy; + this.messageCreatedStrategy = messageCreatedStrategy; + } + + /** + * Extracts the body from the JMS message + * + * @param exchange the exchange + * @param message the message to extract its body + * @return the body, can be <tt>null</tt> + */ + public Object extractBodyFromJms(Exchange exchange, Message message) { + try { + + // TODO: new options to support + + // is a custom message converter configured on endpoint then use it instead of doing the extraction + // based on message type +/* if (endpoint != null && endpoint.getMessageConverter() != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Extracting body using a custom MessageConverter: {} from JMS message: {}", endpoint.getMessageConverter(), message); + } + return endpoint.getMessageConverter().fromMessage(message); + } +*/ + // if we are configured to not map the jms message then return it as body + if (!mapJmsMessage) { + LOG.trace("Option map JMS message is false so using JMS message as body: {}", message); + return message; + } + + if (message instanceof ObjectMessage) { + LOG.trace("Extracting body as a ObjectMessage from JMS message: {}", message); + ObjectMessage objectMessage = (ObjectMessage)message; + Object payload = objectMessage.getObject(); + if (payload instanceof DefaultExchangeHolder) { + DefaultExchangeHolder holder = (DefaultExchangeHolder) payload; + DefaultExchangeHolder.unmarshal(exchange, holder); + return exchange.getIn().getBody(); + } else { + return objectMessage.getObject(); + } + } else if (message instanceof TextMessage) { + LOG.trace("Extracting body as a TextMessage from JMS message: {}", message); + TextMessage textMessage = (TextMessage)message; + return textMessage.getText(); + } else if (message instanceof MapMessage) { + LOG.trace("Extracting body as a MapMessage from JMS message: {}", message); + return createMapFromMapMessage((MapMessage)message); + } else if (message instanceof BytesMessage) { + LOG.trace("Extracting body as a BytesMessage from JMS message: {}", message); + return createByteArrayFromBytesMessage((BytesMessage)message); + } else if (message instanceof StreamMessage) { + LOG.trace("Extracting body as a StreamMessage from JMS message: {}", message); + return message; + } else { + return null; + } + } catch (JMSException e) { + throw new RuntimeCamelException("Failed to extract body due to: " + e + ". Message: " + message, e); + } + } + + public Map<String, Object> extractHeadersFromJms(Message jmsMessage, Exchange exchange) { + Map<String, Object> map = new HashMap<String, Object>(); + if (jmsMessage != null) { + // lets populate the standard JMS message headers + try { + map.put("JMSCorrelationID", jmsMessage.getJMSCorrelationID()); + map.put("JMSCorrelationIDAsBytes", JmsMessageHelper.getJMSCorrelationIDAsBytes(jmsMessage)); + map.put("JMSDeliveryMode", jmsMessage.getJMSDeliveryMode()); + map.put("JMSDestination", jmsMessage.getJMSDestination()); + map.put("JMSExpiration", jmsMessage.getJMSExpiration()); + map.put("JMSMessageID", jmsMessage.getJMSMessageID()); + map.put("JMSPriority", jmsMessage.getJMSPriority()); + map.put("JMSRedelivered", jmsMessage.getJMSRedelivered()); + map.put("JMSTimestamp", jmsMessage.getJMSTimestamp()); + + map.put("JMSReplyTo", JmsMessageHelper.getJMSReplyTo(jmsMessage)); + map.put("JMSType", JmsMessageHelper.getJMSType(jmsMessage)); + + // this works around a bug in the ActiveMQ property handling + map.put(JmsConstants.JMSX_GROUP_ID, JmsMessageHelper.getStringProperty(jmsMessage, JmsConstants.JMSX_GROUP_ID)); + map.put("JMSXUserID", JmsMessageHelper.getStringProperty(jmsMessage, "JMSXUserID")); + } catch (JMSException e) { + throw new RuntimeCamelException(e); + } + + Enumeration<?> names; + try { + names = jmsMessage.getPropertyNames(); + } catch (JMSException e) { + throw new RuntimeCamelException(e); + } + while (names.hasMoreElements()) { + String name = names.nextElement().toString(); + try { + Object value = JmsMessageHelper.getProperty(jmsMessage, name); + if (headerFilterStrategy != null + && headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) { + continue; + } + + // must decode back from safe JMS header name to original header name + // when storing on this Camel JmsMessage object. + String key = jmsJmsKeyFormatStrategy.decodeKey(name); + map.put(key, value); + } catch (JMSException e) { + throw new RuntimeCamelException(name, e); + } + } + } + + return map; + } + + public Object getObjectProperty(Message jmsMessage, String name) throws JMSException { + // try a direct lookup first + Object answer = jmsMessage.getObjectProperty(name); + if (answer == null) { + // then encode the key and do another lookup + String key = jmsJmsKeyFormatStrategy.encodeKey(name); + answer = jmsMessage.getObjectProperty(key); + } + return answer; + } + + protected byte[] createByteArrayFromBytesMessage(BytesMessage message) throws JMSException { + if (message.getBodyLength() > Integer.MAX_VALUE) { + LOG.warn("Length of BytesMessage is too long: {}", message.getBodyLength()); + return null; + } + byte[] result = new byte[(int)message.getBodyLength()]; + message.readBytes(result); + return result; + } + + /** + * Creates a JMS message from the Camel exchange and message + * + * @param exchange the current exchange + * @param session the JMS session used to create the message + * @return a newly created JMS Message instance containing the + * @throws JMSException if the message could not be created + */ + public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException { + Message answer = makeJmsMessage(exchange, exchange.getIn().getBody(), exchange.getIn().getHeaders(), session, null); + if (answer != null && messageCreatedStrategy != null) { + messageCreatedStrategy.onMessageCreated(answer, session, exchange, null); + } + return answer; + } + + /** + * Creates a JMS message from the Camel exchange and message + * + * @param exchange the current exchange + * @param body the message body + * @param headers the message headers + * @param session the JMS session used to create the message + * @param cause optional exception occurred that should be sent as reply instead of a regular body + * @return a newly created JMS Message instance containing the + * @throws JMSException if the message could not be created + */ + public Message makeJmsMessage(Exchange exchange, Object body, Map headers, Session session, Exception cause) throws JMSException { + Message answer = null; + + // TODO: look at supporting some of these options + +/* boolean alwaysCopy = endpoint != null && endpoint.getConfiguration().isAlwaysCopyMessage(); + boolean force = endpoint != null && endpoint.getConfiguration().isForceSendOriginalMessage(); + if (!alwaysCopy && camelMessage instanceof JmsMessage) { + JmsMessage jmsMessage = (JmsMessage)camelMessage; + if (!jmsMessage.shouldCreateNewMessage() || force) { + answer = jmsMessage.getJmsMessage(); + + if (!force) { + // answer must match endpoint type + JmsMessageType type = endpoint != null ? endpoint.getConfiguration().getJmsMessageType() : null; + if (type != null && answer != null) { + if (type == JmsMessageType.Text) { + answer = answer instanceof TextMessage ? answer : null; + } else if (type == JmsMessageType.Bytes) { + answer = answer instanceof BytesMessage ? answer : null; + } else if (type == JmsMessageType.Map) { + answer = answer instanceof MapMessage ? answer : null; + } else if (type == JmsMessageType.Object) { + answer = answer instanceof ObjectMessage ? answer : null; + } else if (type == JmsMessageType.Stream) { + answer = answer instanceof StreamMessage ? answer : null; + } + } + } + } + } +*/ + + if (answer == null) { + if (cause != null) { + // an exception occurred so send it as response + LOG.debug("Will create JmsMessage with caused exception: {}", cause); + // create jms message containing the caused exception + answer = createJmsMessage(cause, session); + } else { + // create regular jms message using the camel message body + answer = createJmsMessage(exchange, body, headers, session, exchange.getContext()); + appendJmsProperties(answer, exchange, headers); + } + } + + if (answer != null && messageCreatedStrategy != null) { + messageCreatedStrategy.onMessageCreated(answer, session, exchange, null); + } + return answer; + } + + /** + * Appends the JMS headers from the Camel {@link Message} + */ + public void appendJmsProperties(Message jmsMessage, Exchange exchange, Map<String, Object> headers) throws JMSException { + if (headers != null) { + Set<Map.Entry<String, Object>> entries = headers.entrySet(); + for (Map.Entry<String, Object> entry : entries) { + String headerName = entry.getKey(); + Object headerValue = entry.getValue(); + appendJmsProperty(jmsMessage, exchange, headerName, headerValue); + } + } + } + + public void appendJmsProperty(Message jmsMessage, Exchange exchange, String headerName, Object headerValue) throws JMSException { + if (isStandardJMSHeader(headerName)) { + if (headerName.equals("JMSCorrelationID")) { + jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue)); + } else if (headerName.equals("JMSReplyTo") && headerValue != null) { + if (headerValue instanceof String) { + // if the value is a String we must normalize it first, and must include the prefix + // as ActiveMQ requires that when converting the String to a javax.jms.Destination type + headerValue = normalizeDestinationName((String) headerValue, true); + } + Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue); + JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo); + } else if (headerName.equals("JMSType")) { + jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue)); + } else if (headerName.equals("JMSPriority")) { + jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue)); + } else if (headerName.equals("JMSDeliveryMode")) { + JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue); + } else if (headerName.equals("JMSExpiration")) { + jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue)); + } else { + // The following properties are set by the MessageProducer: + // JMSDestination + // The following are set on the underlying JMS provider: + // JMSMessageID, JMSTimestamp, JMSRedelivered + // log at trace level to not spam log + LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue); + } + } else if (shouldOutputHeader(headerName, headerValue, exchange)) { + // only primitive headers and strings is allowed as properties + // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html + Object value = getValidJMSHeaderValue(headerName, headerValue); + if (value != null) { + // must encode to safe JMS header name before setting property on jmsMessage + String key = jmsJmsKeyFormatStrategy.encodeKey(headerName); + // set the property + JmsMessageHelper.setProperty(jmsMessage, key, value); + } else if (LOG.isDebugEnabled()) { + // okay the value is not a primitive or string so we cannot sent it over the wire + LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}", + new Object[]{headerName, headerValue.getClass().getName(), headerValue}); + } + } + } + + /** + * Is the given header a standard JMS header + * @param headerName the header name + * @return <tt>true</tt> if its a standard JMS header + */ + protected boolean isStandardJMSHeader(String headerName) { + if (!headerName.startsWith("JMS")) { + return false; + } + if (headerName.startsWith("JMSX")) { + return false; + } + // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM) + if (headerName.startsWith("JMS_")) { + return false; + } + + // the 4th char must be a letter to be a standard JMS header + if (headerName.length() > 3) { + Character fourth = headerName.charAt(3); + if (Character.isLetter(fourth)) { + return true; + } + } + + return false; + } + + /** + * Strategy to test if the given header is valid according to the JMS spec to be set as a property + * on the JMS message. + * <p/> + * This default implementation will allow: + * <ul> + * <li>any primitives and their counter Objects (Integer, Double etc.)</li> + * <li>String and any other literals, Character, CharSequence</li> + * <li>Boolean</li> + * <li>Number</li> + * <li>java.util.Date</li> + * </ul> + * + * @param headerName the header name + * @param headerValue the header value + * @return the value to use, <tt>null</tt> to ignore this header + */ + protected Object getValidJMSHeaderValue(String headerName, Object headerValue) { + if (headerValue instanceof String) { + return headerValue; + } else if (headerValue instanceof BigInteger) { + return headerValue.toString(); + } else if (headerValue instanceof BigDecimal) { + return headerValue.toString(); + } else if (headerValue instanceof Number) { + return headerValue; + } else if (headerValue instanceof Character) { + return headerValue; + } else if (headerValue instanceof CharSequence) { + return headerValue.toString(); + } else if (headerValue instanceof Boolean) { + return headerValue; + } else if (headerValue instanceof Date) { + return headerValue.toString(); + } + return null; + } + + protected Message createJmsMessage(Exception cause, Session session) throws JMSException { + LOG.trace("Using JmsMessageType: {}", JmsMessageType.Object); + Message answer = session.createObjectMessage(cause); + // ensure default delivery mode is used by default + answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); + return answer; + } + + protected Message createJmsMessage(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException { + JmsMessageType type = null; + + // TODO: support some of these options? + +/* // special for transferExchange + if (endpoint != null && endpoint.isTransferExchange()) { + LOG.trace("Option transferExchange=true so we use JmsMessageType: Object"); + Serializable holder = DefaultExchangeHolder.marshal(exchange); + Message answer = session.createObjectMessage(holder); + // ensure default delivery mode is used by default + answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); + return answer; + } + + // use a custom message converter + if (endpoint != null && endpoint.getMessageConverter() != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Creating JmsMessage using a custom MessageConverter: {} with body: {}", endpoint.getMessageConverter(), body); + } + return endpoint.getMessageConverter().toMessage(body, session); + } +*/ + // check if header have a type set, if so we force to use it +/* + if (headers.containsKey(JmsConstants.JMS_MESSAGE_TYPE)) { + type = context.getTypeConverter().convertTo(JmsMessageType.class, headers.get(JmsConstants.JMS_MESSAGE_TYPE)); + } else if (endpoint != null && endpoint.getConfiguration().getJmsMessageType() != null) { + // force a specific type from the endpoint configuration + type = endpoint.getConfiguration().getJmsMessageType(); + } else { +*/ type = getJMSMessageTypeForBody(exchange, body, headers, session, context); + //} + + // create the JmsMessage based on the type + if (type != null) { + if (body == null && !allowNullBody) { + throw new JMSException("Cannot send message as message body is null, and option allowNullBody is false."); + } + LOG.trace("Using JmsMessageType: {}", type); + Message answer = createJmsMessageForType(exchange, body, headers, session, context, type); + // ensure default delivery mode is used by default + answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); + return answer; + } + + // check for null body + if (body == null && !allowNullBody) { + throw new JMSException("Cannot send message as message body is null, and option allowNullBody is false."); + } + + // warn if the body could not be mapped + if (body != null && LOG.isWarnEnabled()) { + LOG.warn("Cannot determine specific JmsMessage type to use from body class." + + " Will use generic JmsMessage." + + " Body class: " + ObjectHelper.classCanonicalName(body) + + ". If you want to send a POJO then your class might need to implement java.io.Serializable" + + ", or you can force a specific type by setting the jmsMessageType option on the JMS endpoint."); + } + + // return a default message + Message answer = session.createMessage(); + // ensure default delivery mode is used by default + answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); + return answer; + } + + /** + * Return the {@link JmsMessageType} + * + * @return type or null if no mapping was possible + */ + protected JmsMessageType getJMSMessageTypeForBody(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) { + JmsMessageType type = null; + // let body determine the type + if (body instanceof Node || body instanceof String) { + type = JmsMessageType.Text; + } else if (body instanceof byte[] || body instanceof WrappedFile || body instanceof File || body instanceof Reader + || body instanceof InputStream || body instanceof ByteBuffer || body instanceof StreamCache) { + type = JmsMessageType.Bytes; + } else if (body instanceof Map) { + type = JmsMessageType.Map; + } else if (body instanceof Serializable) { + type = JmsMessageType.Object; + } else if (exchange.getContext().getTypeConverter().tryConvertTo(File.class, body) != null + || exchange.getContext().getTypeConverter().tryConvertTo(InputStream.class, body) != null) { + type = JmsMessageType.Bytes; + } + return type; + } + + /** + * + * Create the {@link Message} + * + * @return jmsMessage or null if the mapping was not successfully + */ + protected Message createJmsMessageForType(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context, JmsMessageType type) throws JMSException { + switch (type) { + case Text: { + TextMessage message = session.createTextMessage(); + if (body != null) { + String payload = context.getTypeConverter().convertTo(String.class, exchange, body); + message.setText(payload); + } + return message; + } + case Bytes: { + BytesMessage message = session.createBytesMessage(); + if (body != null) { + byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body); + message.writeBytes(payload); + } + return message; + } + case Map: { + MapMessage message = session.createMapMessage(); + if (body != null) { + Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body); + populateMapMessage(message, payload, context); + } + return message; + } + case Object: + ObjectMessage message = session.createObjectMessage(); + if (body != null) { + try { + Serializable payload = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body); + message.setObject(payload); + } catch (NoTypeConversionAvailableException e) { + // cannot convert to serializable then thrown an exception to avoid sending a null message + JMSException cause = new MessageFormatException(e.getMessage()); + cause.initCause(e); + throw cause; + } + } + return message; + default: + break; + } + return null; + } + /** + * Populates a {@link MapMessage} from a {@link Map} instance. + */ + protected void populateMapMessage(MapMessage message, Map<?, ?> map, CamelContext context) + throws JMSException { + for (Map.Entry<?, ?> entry : map.entrySet()) { + String keyString = CamelContextHelper.convertTo(context, String.class, entry.getKey()); + if (keyString != null) { + message.setObject(keyString, entry.getValue()); + } + } + } + + /** + * Extracts a {@link Map} from a {@link MapMessage} + */ + public Map<String, Object> createMapFromMapMessage(MapMessage message) throws JMSException { + Map<String, Object> answer = new HashMap<String, Object>(); + Enumeration<?> names = message.getMapNames(); + while (names.hasMoreElements()) { + String name = names.nextElement().toString(); + Object value = message.getObject(name); + answer.put(name, value); + } + return answer; + } + + /** + * Strategy to allow filtering of headers which are put on the JMS message + * <p/> + * <b>Note</b>: Currently only supports sending java identifiers as keys + */ + protected boolean shouldOutputHeader(String headerName, Object headerValue, Exchange exchange) { + return headerFilterStrategy == null + || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java new file mode 100644 index 0000000..5a0327e --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.jms; + +/** + * Strategy for applying encoding and decoding of JMS headers so they apply to + * the JMS spec. + */ +public interface JmsKeyFormatStrategy { + + /** + * Encodes the key before its sent as a {@link javax.jms.Message} message. + * + * @param key the original key + * @return the encoded key + */ + String encodeKey(String key); + + /** + * Decodes the key after its received from a {@link javax.jms.Message} + * message. + * + * @param key the encoded key + * @return the decoded key as the original key + */ + String decodeKey(String key); +}
