Repository: camel Updated Branches: refs/heads/master eef0c490d -> 25103bf65
CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b1d8da9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b1d8da9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b1d8da9 Branch: refs/heads/master Commit: 5b1d8da9b9b537615e764ebb4f0f8298e83421a8 Parents: eef0c49 Author: Claus Ibsen <[email protected]> Authored: Mon Sep 7 09:31:30 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Sep 7 09:31:30 2015 +0200 ---------------------------------------------------------------------- .../camel/component/sjms/jms/JmsConstants.java | 5 + .../component/sjms/jms/JmsMessageHelper.java | 339 ++++++++++++------- .../component/sjms/producer/InOnlyProducer.java | 8 +- 3 files changed, 227 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java index 08e0339..ccf1f96 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java @@ -21,6 +21,11 @@ package org.apache.camel.component.sjms.jms; */ public interface JmsConstants { + String QUEUE_PREFIX = "queue:"; + String TOPIC_PREFIX = "topic:"; + String TEMP_QUEUE_PREFIX = "temp:queue:"; + String TEMP_TOPIC_PREFIX = "temp:topic:"; + /** * Set by the publishing Client */ http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java index 79787c9..0a93f16 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java @@ -20,10 +20,13 @@ import java.io.File; import java.io.InputStream; import java.io.Reader; import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.Enumeration; import java.util.HashMap; import java.util.List; @@ -47,16 +50,20 @@ import org.apache.camel.TypeConverter; import org.apache.camel.component.sjms.SjmsConstants; import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; + /** * Utility class for {@link javax.jms.Message}. */ public final class JmsMessageHelper implements JmsConstants { - private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class); + private static final Logger LOG = LoggerFactory.getLogger(JmsMessageHelper.class); private JmsMessageHelper() { } @@ -98,7 +105,7 @@ public final class JmsMessageHelper implements JmsConstants { case Bytes: BytesMessage bytesMessage = (BytesMessage) message; if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) { - LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength()); + LOG.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength()); return null; } byte[] result = new byte[(int) bytesMessage.getBodyLength()]; @@ -164,16 +171,17 @@ public final class JmsMessageHelper implements JmsConstants { bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders()); } - answer = createMessage(session, body, bodyHeaders, endpoint); + answer = createMessage(exchange, session, body, bodyHeaders, endpoint); return answer; } - public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception { - return createMessage(session, payload, messageHeaders, endpoint.isAllowNullBody(), endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter()); + public static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception { + return createMessage(exchange, session, payload, messageHeaders, endpoint.isAllowNullBody(), + endpoint.getSjmsHeaderFilterStrategy(), endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter()); } - private static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, boolean allowNullBody, - KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { + private static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, boolean allowNullBody, + HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { Message answer = null; JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload); @@ -224,113 +232,134 @@ public final class JmsMessageHelper implements JmsConstants { break; } - if (messageHeaders != null && !messageHeaders.isEmpty()) { - answer = JmsMessageHelper.setJmsMessageHeaders(answer, messageHeaders, keyFormatStrategy); - } + appendJmsProperties(answer, exchange, exchange.getIn(), headerFilterStrategy, keyFormatStrategy); return answer; } /** - * Adds or updates the {@link Message} headers. Header names and values are - * checked for JMS 1.1 compliance. - * - * @param jmsMessage the {@link Message} to add or update the headers on - * @param messageHeaders a {@link Map} of String/Object pairs - * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to - * format keys in a JMS 1.1 compliant manner. - * @return {@link Message} + * Appends the JMS headers from the Camel {@link Message} */ - private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException { - - Map<String, Object> headers = new HashMap<String, Object>(messageHeaders); - for (final Map.Entry<String, Object> entry : headers.entrySet()) { + private static void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in, + HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException { + Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet(); + for (Map.Entry<String, Object> entry : entries) { String headerName = entry.getKey(); Object headerValue = entry.getValue(); + appendJmsProperty(jmsMessage, exchange, in, headerName, headerValue, headerFilterStrategy, keyFormatStrategy); + } + } - if (headerName.equalsIgnoreCase(JMS_CORRELATION_ID)) { - if (headerValue == null) { - // Value can be null but we can't cast a null to a String - // so pass null to the setter - setCorrelationId(jmsMessage, null); - } else if (headerValue instanceof String) { - setCorrelationId(jmsMessage, (String) headerValue); - } else { - throw new IllegalHeaderException("The " + JMS_CORRELATION_ID + " must either be a String or null. Found: " + headerValue.getClass().getName()); - } - } else if (headerName.equalsIgnoreCase(JMS_REPLY_TO)) { + private static void appendJmsProperty(Message jmsMessage, Exchange exchange, org.apache.camel.Message in, + String headerName, Object headerValue, + HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException { + if (isStandardJMSHeader(headerName)) { + if (headerName.equals("JMSCorrelationID")) { + jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue)); + } else if (headerName.equals("JMSReplyTo") && headerValue != null) { if (headerValue instanceof String) { - // FIXME Setting the reply to appears broken. walk back - // through it. If the value is a String we must normalize it - // first - } else { - // TODO write destination converter - // Destination replyTo = - // ExchangeHelper.convertToType(exchange, - // Destination.class, - // headerValue); - // jmsMessage.setJMSReplyTo(replyTo); - } - } else if (headerName.equalsIgnoreCase(JMS_TYPE)) { - if (headerValue == null) { - // Value can be null but we can't cast a null to a String - // so pass null to the setter - setMessageType(jmsMessage, null); - } else if (headerValue instanceof String) { - // Not null but is a String - setMessageType(jmsMessage, (String) headerValue); - } else { - throw new IllegalHeaderException("The " + JMS_TYPE + " must either be a String or null. Found: " + headerValue.getClass().getName()); - } - } else if (headerName.equalsIgnoreCase(JMS_PRIORITY)) { - if (headerValue instanceof Integer) { - try { - jmsMessage.setJMSPriority((Integer) headerValue); - } catch (JMSException e) { - throw new IllegalHeaderException("Failed to set the " + JMS_PRIORITY + " header. Cause: " + e.getLocalizedMessage(), e); - } - } else { - throw new IllegalHeaderException("The " + JMS_PRIORITY + " must be a Integer. Type found: " + headerValue.getClass().getName()); - } - } else if (headerName.equalsIgnoreCase(JMS_DELIVERY_MODE)) { - try { - JmsMessageHelper.setJMSDeliveryMode(jmsMessage, headerValue); - } catch (JMSException e) { - throw new IllegalHeaderException("Failed to set the " + JMS_DELIVERY_MODE + " header. Cause: " + e.getLocalizedMessage(), e); - } - } else if (headerName.equalsIgnoreCase(JMS_EXPIRATION)) { - if (headerValue instanceof Long) { - try { - jmsMessage.setJMSExpiration((Long) headerValue); - } catch (JMSException e) { - throw new IllegalHeaderException("Failed to set the " + JMS_EXPIRATION + " header. Cause: " + e.getLocalizedMessage(), e); - } - } else { - throw new IllegalHeaderException("The " + JMS_EXPIRATION + " must be a Long. Type found: " + headerValue.getClass().getName()); + // if the value is a String we must normalize it first, and must include the prefix + // as ActiveMQ requires that when converting the String to a javax.jms.Destination type + headerValue = normalizeDestinationName((String) headerValue, true); } + Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue); + JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo); + } else if (headerName.equals("JMSType")) { + jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue)); + } else if (headerName.equals("JMSPriority")) { + jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue)); + } else if (headerName.equals("JMSDeliveryMode")) { + JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue); + } else if (headerName.equals("JMSExpiration")) { + jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue)); } else { - LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue); - if (headerName.equalsIgnoreCase(JMS_DESTINATION) || headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || headerName.equalsIgnoreCase(JMS_TIMESTAMP) - || headerName.equalsIgnoreCase(JMS_REDELIVERED)) { - // The following properties are set by the - // MessageProducer: - // JMSDestination - // The following are set on the underlying JMS provider: - // JMSMessageID, JMSTimestamp, JMSRedelivered - // log at trace level to not spam log - LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue); - } else { - if (!(headerValue instanceof JmsMessageType)) { - String encodedName = keyFormatStrategy.encodeKey(headerName); - try { - JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue); - } catch (JMSException e) { - throw new IllegalHeaderException("Failed to set the header " + encodedName + " header. Cause: " + e.getLocalizedMessage(), e); - } - } - } + // The following properties are set by the MessageProducer: + // JMSDestination + // The following are set on the underlying JMS provider: + // JMSMessageID, JMSTimestamp, JMSRedelivered + // log at trace level to not spam log + LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue); + } + } else if (shouldOutputHeader(in, headerName, headerValue, exchange, headerFilterStrategy)) { + // only primitive headers and strings is allowed as properties + // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html + Object value = getValidJMSHeaderValue(headerName, headerValue); + if (value != null) { + // must encode to safe JMS header name before setting property on jmsMessage + String key = keyFormatStrategy.encodeKey(headerName); + // set the property + JmsMessageHelper.setProperty(jmsMessage, key, value); + } else if (LOG.isDebugEnabled()) { + // okay the value is not a primitive or string so we cannot sent it over the wire + LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}", + new Object[]{headerName, headerValue.getClass().getName(), headerValue}); + } + } + } + + /** + * Is the given header a standard JMS header + * @param headerName the header name + * @return <tt>true</tt> if its a standard JMS header + */ + protected static boolean isStandardJMSHeader(String headerName) { + if (!headerName.startsWith("JMS")) { + return false; + } + if (headerName.startsWith("JMSX")) { + return false; + } + // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM) + if (headerName.startsWith("JMS_")) { + return false; + } + + // the 4th char must be a letter to be a standard JMS header + if (headerName.length() > 3) { + Character fourth = headerName.charAt(3); + if (Character.isLetter(fourth)) { + return true; } } - return jmsMessage; + + return false; + } + + /** + * Strategy to test if the given header is valid according to the JMS spec to be set as a property + * on the JMS message. + * <p/> + * This default implementation will allow: + * <ul> + * <li>any primitives and their counter Objects (Integer, Double etc.)</li> + * <li>String and any other literals, Character, CharSequence</li> + * <li>Boolean</li> + * <li>Number</li> + * <li>java.util.Date</li> + * </ul> + * + * @param headerName the header name + * @param headerValue the header value + * @return the value to use, <tt>null</tt> to ignore this header + */ + protected static Object getValidJMSHeaderValue(String headerName, Object headerValue) { + if (headerValue instanceof String) { + return headerValue; + } else if (headerValue instanceof BigInteger) { + return headerValue.toString(); + } else if (headerValue instanceof BigDecimal) { + return headerValue.toString(); + } else if (headerValue instanceof Number) { + return headerValue; + } else if (headerValue instanceof Character) { + return headerValue; + } else if (headerValue instanceof CharSequence) { + return headerValue.toString(); + } else if (headerValue instanceof Boolean) { + return headerValue; + } else if (headerValue instanceof Date) { + return headerValue.toString(); + } + return null; } @SuppressWarnings("unchecked") @@ -375,6 +404,17 @@ public final class JmsMessageHelper implements JmsConstants { } /** + * Strategy to allow filtering of headers which are put on the JMS message + * <p/> + * <b>Note</b>: Currently only supports sending java identifiers as keys + */ + protected static boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String headerName, + Object headerValue, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) { + return headerFilterStrategy == null + || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange); + } + + /** * Gets the JMSType from the message. * * @param message the message @@ -393,41 +433,41 @@ public final class JmsMessageHelper implements JmsConstants { /** * Sets the JMSDeliveryMode on the message. * - * @param message the message - * @param deliveryMode the delivery mode, either as a String or integer + * @param exchange the exchange + * @param message the message + * @param deliveryMode the delivery mode, either as a String or integer * @throws javax.jms.JMSException is thrown if error setting the delivery mode */ - public static void setJMSDeliveryMode(Message message, Object deliveryMode) throws JMSException { - Integer mode; + public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode) throws JMSException { + Integer mode = null; if (deliveryMode instanceof String) { String s = (String) deliveryMode; - if (JMS_DELIVERY_MODE_PERSISTENT.equalsIgnoreCase(s)) { + if ("PERSISTENT".equalsIgnoreCase(s)) { mode = DeliveryMode.PERSISTENT; - } else if (JMS_DELIVERY_MODE_NON_PERSISTENT.equalsIgnoreCase(s)) { + } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) { mode = DeliveryMode.NON_PERSISTENT; } else { // it may be a number in the String so try that - Integer value = null; - try { - value = Integer.valueOf(s); - } catch (NumberFormatException e) { - // Do nothing. The error handler below is sufficient - } + Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode); if (value != null) { mode = value; } else { throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode); } } - } else if (deliveryMode instanceof Integer) { - // fallback and try to convert to a number - mode = (Integer) deliveryMode; } else { - throw new IllegalArgumentException("Unable to convert the given delivery mode of type " + deliveryMode.getClass().getName() + " with value: " + deliveryMode); + // fallback and try to convert to a number + Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode); + if (value != null) { + mode = value; + } } - message.setJMSDeliveryMode(mode); + if (mode != null) { + message.setJMSDeliveryMode(mode); + message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode); + } } /** @@ -442,7 +482,7 @@ public final class JmsMessageHelper implements JmsConstants { try { message.setJMSType(type); } catch (JMSException e) { - LOGGER.debug("Error setting the message type: {}", type, e); + LOG.debug("Error setting the message type: {}", type, e); } } @@ -458,7 +498,7 @@ public final class JmsMessageHelper implements JmsConstants { try { message.setJMSCorrelationID(correlationId); } catch (JMSException e) { - LOGGER.debug("Error setting the correlationId: {}", correlationId, e); + LOG.debug("Error setting the correlationId: {}", correlationId, e); } } @@ -472,7 +512,7 @@ public final class JmsMessageHelper implements JmsConstants { try { message.setJMSReplyTo(replyTo); } catch (Exception e) { - LOGGER.debug("Error setting the correlationId: {}", replyTo.toString()); + LOG.debug("Error setting the correlationId: {}", replyTo.toString()); } } @@ -589,4 +629,61 @@ public final class JmsMessageHelper implements JmsConstants { return key == null || key.isEmpty() || key.contains(".") || key.contains("-"); } + /** + * Normalizes the destination name. + * <p/> + * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which + * was intended as <tt>queue://foo</tt>. + * + * @param destination the destination + * @return the normalized destination + */ + public static String normalizeDestinationName(String destination) { + // do not include prefix which is the current behavior when using this method. + return normalizeDestinationName(destination, false); + } + + /** + * Normalizes the destination name. + * <p/> + * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which + * was intended as <tt>queue://foo</tt>. + * + * @param destination the destination + * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name + * @return the normalized destination + */ + public static String normalizeDestinationName(String destination, boolean includePrefix) { + if (ObjectHelper.isEmpty(destination)) { + return destination; + } + if (destination.startsWith(QUEUE_PREFIX)) { + String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/'); + if (includePrefix) { + s = QUEUE_PREFIX + "//" + s; + } + return s; + } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) { + String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/'); + if (includePrefix) { + s = TEMP_QUEUE_PREFIX + "//" + s; + } + return s; + } else if (destination.startsWith(TOPIC_PREFIX)) { + String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/'); + if (includePrefix) { + s = TOPIC_PREFIX + "//" + s; + } + return s; + } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) { + String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/'); + if (includePrefix) { + s = TEMP_TOPIC_PREFIX + "//" + s; + } + return s; + } else { + return destination; + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index 71f5770..3fa23b0 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -93,19 +93,19 @@ public class InOnlyProducer extends SjmsProducer { Message message; if (BatchMessage.class.isInstance(object)) { BatchMessage<?> batchMessage = (BatchMessage<?>) object; - message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint()); + message = JmsMessageHelper.createMessage(exchange, producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint()); } else { - message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint()); + message = JmsMessageHelper.createMessage(exchange, producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint()); } messages.add(message); } } else { Object payload = exchange.getIn().getBody(); - Message message = JmsMessageHelper.createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint()); + Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint()); messages.add(message); } } else { - Message message = JmsMessageHelper.createMessage(producer.getSession(), null, exchange.getIn().getHeaders(), getEndpoint()); + Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), null, exchange.getIn().getHeaders(), getEndpoint()); messages.add(message); }
