http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java index 0a93f16..f3224b6 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java @@ -16,408 +16,272 @@ */ package org.apache.camel.component.sjms.jms; -import java.io.File; -import java.io.InputStream; -import java.io.Reader; -import java.io.Serializable; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; -import java.util.Set; -import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; -import org.apache.camel.component.sjms.SjmsConstants; -import org.apache.camel.component.sjms.SjmsEndpoint; -import org.apache.camel.impl.DefaultMessage; -import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; /** * Utility class for {@link javax.jms.Message}. */ -public final class JmsMessageHelper implements JmsConstants { - - private static final Logger LOG = LoggerFactory.getLogger(JmsMessageHelper.class); +public final class JmsMessageHelper { private JmsMessageHelper() { } - public static Exchange createExchange(Message message, Endpoint endpoint) { - return createExchange(message, endpoint, null); - } - /** - * Creates an Exchange from a JMS Message. - * @param message The JMS message. - * @param endpoint The Endpoint to use to create the Exchange object. - * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to - * format keys in a JMS 1.1 compliant manner. If null the - * {@link DefaultJmsKeyFormatStrategy} will be used. - * @return Populated Exchange. + * Removes the property from the JMS message. + * + * @param jmsMessage the JMS message + * @param name name of the property to remove + * @return the old value of the property or <tt>null</tt> if not exists + * @throws javax.jms.JMSException can be thrown */ - public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) { - Exchange exchange = endpoint.createExchange(); - KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null) - ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy; - return populateExchange(message, exchange, false, initialisedKeyFormatStrategy); - } - - @SuppressWarnings("unchecked") - public static Exchange populateExchange(Message message, Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) { - try { - setJmsMessageHeaders(message, exchange, out, keyFormatStrategy); - if (message != null) { - // convert to JMS Message of the given type + public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException { + // check if the property exists + if (!jmsMessage.propertyExists(name)) { + return null; + } - DefaultMessage bodyMessage; - if (out) { - bodyMessage = (DefaultMessage) exchange.getOut(); - } else { - bodyMessage = (DefaultMessage) exchange.getIn(); - } - switch (JmsMessageHelper.discoverJmsMessageType(message)) { - case Bytes: - BytesMessage bytesMessage = (BytesMessage) message; - if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) { - LOG.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength()); - return null; - } - byte[] result = new byte[(int) bytesMessage.getBodyLength()]; - bytesMessage.readBytes(result); - bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Bytes); - bodyMessage.setBody(result); - break; - case Map: - Map<String, Object> body = new HashMap<String, Object>(); - MapMessage mapMessage = (MapMessage) message; - Enumeration<String> names = mapMessage.getMapNames(); - while (names.hasMoreElements()) { - String key = names.nextElement(); - Object value = mapMessage.getObject(key); - body.put(key, value); - } - bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Map); - bodyMessage.setBody(body); - break; - case Object: - ObjectMessage objMsg = (ObjectMessage) message; - bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Object); - bodyMessage.setBody(objMsg.getObject()); - break; - case Text: - TextMessage textMsg = (TextMessage) message; - bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Text); - bodyMessage.setBody(textMsg.getText()); - break; - case Stream: - StreamMessage streamMessage = (StreamMessage) message; - List<Object> list = new ArrayList<Object>(); - Object obj; - while ((obj = streamMessage.readObject()) != null) { - list.add(obj); - } - bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Stream); - bodyMessage.setBody(list); - break; - case Message: - default: - // Do nothing. Only set the headers for an empty message - bodyMessage.setBody(message); - break; - } + Object answer = null; + + // store the properties we want to keep in a temporary map + // as the JMS API is a bit strict as we are not allowed to + // clear a single property, but must clear them all and redo + // the properties + Map<String, Object> map = new LinkedHashMap<String, Object>(); + Enumeration<?> en = jmsMessage.getPropertyNames(); + while (en.hasMoreElements()) { + String key = (String) en.nextElement(); + if (name.equals(key)) { + answer = key; + } else { + map.put(key, getProperty(jmsMessage, key)); } - } catch (Exception e) { - exchange.setException(e); } - return exchange; - } - - public static Message createMessage(Exchange exchange, Session session, SjmsEndpoint endpoint) throws Exception { - Message answer; - Object body; - Map<String, Object> bodyHeaders; - if (exchange.getOut().getBody() != null) { - body = exchange.getOut().getBody(); - bodyHeaders = new HashMap<String, Object>(exchange.getOut().getHeaders()); - } else { - body = exchange.getIn().getBody(); - bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders()); + // redo the properties to keep + jmsMessage.clearProperties(); + for (Map.Entry<String, Object> entry : map.entrySet()) { + jmsMessage.setObjectProperty(entry.getKey(), entry.getValue()); } - answer = createMessage(exchange, session, body, bodyHeaders, endpoint); return answer; } - public static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception { - return createMessage(exchange, session, payload, messageHeaders, endpoint.isAllowNullBody(), - endpoint.getSjmsHeaderFilterStrategy(), endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter()); - } - - private static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, boolean allowNullBody, - HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception { - Message answer = null; - JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload); - - switch (messageType) { - case Bytes: - BytesMessage bytesMessage = session.createBytesMessage(); - byte[] bytesToWrite = typeConverter.convertTo(byte[].class, payload); - bytesMessage.writeBytes(bytesToWrite); - answer = bytesMessage; - break; - case Map: - MapMessage mapMessage = session.createMapMessage(); - Map objMap = (Map) payload; - for (final Map.Entry entry : (Set<Map.Entry>)objMap.entrySet()) { - mapMessage.setObject(entry.getKey().toString(), entry.getValue()); - } - answer = mapMessage; - break; - case Object: - ObjectMessage objectMessage = session.createObjectMessage(); - objectMessage.setObject((Serializable) payload); - answer = objectMessage; - break; - case Text: - TextMessage textMessage = session.createTextMessage(); - String convertedText = typeConverter.convertTo(String.class, payload); - textMessage.setText(convertedText); - answer = textMessage; - break; - case Stream: - StreamMessage streamMessage = session.createStreamMessage(); - Collection collection = (Collection)payload; - for (final Object obj : collection) { - streamMessage.writeObject(obj); - } - answer = streamMessage; - break; - case Message: - if (allowNullBody && payload == null) { - answer = session.createMessage(); - } else if (payload != null) { - throw new JMSException("Unsupported message body type " + ObjectHelper.classCanonicalName(payload)); - } else { - throw new JMSException("Null body is not allowed"); + /** + * Tests whether a given property with the name exists + * + * @param jmsMessage the JMS message + * @param name name of the property to test if exists + * @return <tt>true</tt> if the property exists, <tt>false</tt> if not. + * @throws JMSException can be thrown + */ + public static boolean hasProperty(Message jmsMessage, String name) throws JMSException { + Enumeration<?> en = jmsMessage.getPropertyNames(); + while (en.hasMoreElements()) { + String key = (String) en.nextElement(); + if (name.equals(key)) { + return true; } - break; - default: - break; } - - appendJmsProperties(answer, exchange, exchange.getIn(), headerFilterStrategy, keyFormatStrategy); - return answer; + return false; } /** - * Appends the JMS headers from the Camel {@link Message} + * Gets a JMS property + * + * @param jmsMessage the JMS message + * @param name name of the property to get + * @return the property value, or <tt>null</tt> if does not exists + * @throws JMSException can be thrown */ - private static void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in, - HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException { - Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet(); - for (Map.Entry<String, Object> entry : entries) { - String headerName = entry.getKey(); - Object headerValue = entry.getValue(); - appendJmsProperty(jmsMessage, exchange, in, headerName, headerValue, headerFilterStrategy, keyFormatStrategy); + public static Object getProperty(Message jmsMessage, String name) throws JMSException { + Object value = jmsMessage.getObjectProperty(name); + if (value == null) { + value = jmsMessage.getStringProperty(name); } + return value; } - private static void appendJmsProperty(Message jmsMessage, Exchange exchange, org.apache.camel.Message in, - String headerName, Object headerValue, - HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException { - if (isStandardJMSHeader(headerName)) { - if (headerName.equals("JMSCorrelationID")) { - jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue)); - } else if (headerName.equals("JMSReplyTo") && headerValue != null) { - if (headerValue instanceof String) { - // if the value is a String we must normalize it first, and must include the prefix - // as ActiveMQ requires that when converting the String to a javax.jms.Destination type - headerValue = normalizeDestinationName((String) headerValue, true); - } - Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue); - JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo); - } else if (headerName.equals("JMSType")) { - jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue)); - } else if (headerName.equals("JMSPriority")) { - jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue)); - } else if (headerName.equals("JMSDeliveryMode")) { - JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue); - } else if (headerName.equals("JMSExpiration")) { - jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue)); - } else { - // The following properties are set by the MessageProducer: - // JMSDestination - // The following are set on the underlying JMS provider: - // JMSMessageID, JMSTimestamp, JMSRedelivered - // log at trace level to not spam log - LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue); - } - } else if (shouldOutputHeader(in, headerName, headerValue, exchange, headerFilterStrategy)) { - // only primitive headers and strings is allowed as properties - // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html - Object value = getValidJMSHeaderValue(headerName, headerValue); - if (value != null) { - // must encode to safe JMS header name before setting property on jmsMessage - String key = keyFormatStrategy.encodeKey(headerName); - // set the property - JmsMessageHelper.setProperty(jmsMessage, key, value); - } else if (LOG.isDebugEnabled()) { - // okay the value is not a primitive or string so we cannot sent it over the wire - LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}", - new Object[]{headerName, headerValue.getClass().getName(), headerValue}); - } + /** + * Sets the property on the given JMS message. + * + * @param jmsMessage the JMS message + * @param name name of the property to set + * @param value the value + * @throws JMSException can be thrown + */ + public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException { + if (value == null) { + return; + } + if (value instanceof Byte) { + jmsMessage.setByteProperty(name, (Byte) value); + } else if (value instanceof Boolean) { + jmsMessage.setBooleanProperty(name, (Boolean) value); + } else if (value instanceof Double) { + jmsMessage.setDoubleProperty(name, (Double) value); + } else if (value instanceof Float) { + jmsMessage.setFloatProperty(name, (Float) value); + } else if (value instanceof Integer) { + jmsMessage.setIntProperty(name, (Integer) value); + } else if (value instanceof Long) { + jmsMessage.setLongProperty(name, (Long) value); + } else if (value instanceof Short) { + jmsMessage.setShortProperty(name, (Short) value); + } else if (value instanceof String) { + jmsMessage.setStringProperty(name, (String) value); + } else { + // fallback to Object + jmsMessage.setObjectProperty(name, value); } } /** - * Is the given header a standard JMS header - * @param headerName the header name - * @return <tt>true</tt> if its a standard JMS header + * Sets the correlation id on the JMS message. + * <p/> + * Will ignore exception thrown + * + * @param message the JMS message + * @param correlationId the correlation id */ - protected static boolean isStandardJMSHeader(String headerName) { - if (!headerName.startsWith("JMS")) { - return false; - } - if (headerName.startsWith("JMSX")) { - return false; + public static void setCorrelationId(Message message, String correlationId) { + try { + message.setJMSCorrelationID(correlationId); + } catch (JMSException e) { + // ignore } - // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM) - if (headerName.startsWith("JMS_")) { + } + + /** + * Whether the destination name has either queue or temp queue prefix. + * + * @param destination the destination + * @return <tt>true</tt> if queue or temp-queue prefix, <tt>false</tt> otherwise + */ + public static boolean isQueuePrefix(String destination) { + if (ObjectHelper.isEmpty(destination)) { return false; } - // the 4th char must be a letter to be a standard JMS header - if (headerName.length() > 3) { - Character fourth = headerName.charAt(3); - if (Character.isLetter(fourth)) { - return true; - } + return destination.startsWith(JmsConstants.QUEUE_PREFIX) || destination.startsWith(JmsConstants.TEMP_QUEUE_PREFIX); + } + + /** + * Whether the destination name has either topic or temp topic prefix. + * + * @param destination the destination + * @return <tt>true</tt> if topic or temp-topic prefix, <tt>false</tt> otherwise + */ + public static boolean isTopicPrefix(String destination) { + if (ObjectHelper.isEmpty(destination)) { + return false; } - return false; + return destination.startsWith(JmsConstants.TOPIC_PREFIX) || destination.startsWith(JmsConstants.TEMP_TOPIC_PREFIX); } /** - * Strategy to test if the given header is valid according to the JMS spec to be set as a property - * on the JMS message. + * Normalizes the destination name. * <p/> - * This default implementation will allow: - * <ul> - * <li>any primitives and their counter Objects (Integer, Double etc.)</li> - * <li>String and any other literals, Character, CharSequence</li> - * <li>Boolean</li> - * <li>Number</li> - * <li>java.util.Date</li> - * </ul> + * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which + * was intended as <tt>queue://foo</tt>. * - * @param headerName the header name - * @param headerValue the header value - * @return the value to use, <tt>null</tt> to ignore this header + * @param destination the destination + * @return the normalized destination */ - protected static Object getValidJMSHeaderValue(String headerName, Object headerValue) { - if (headerValue instanceof String) { - return headerValue; - } else if (headerValue instanceof BigInteger) { - return headerValue.toString(); - } else if (headerValue instanceof BigDecimal) { - return headerValue.toString(); - } else if (headerValue instanceof Number) { - return headerValue; - } else if (headerValue instanceof Character) { - return headerValue; - } else if (headerValue instanceof CharSequence) { - return headerValue.toString(); - } else if (headerValue instanceof Boolean) { - return headerValue; - } else if (headerValue instanceof Date) { - return headerValue.toString(); - } - return null; + public static String normalizeDestinationName(String destination) { + // do not include prefix which is the current behavior when using this method. + return normalizeDestinationName(destination, false); } - @SuppressWarnings("unchecked") - public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) throws JMSException { - Map<String, Object> headers = new HashMap<String, Object>(); - if (jmsMessage != null) { - // lets populate the standard JMS message headers - try { - headers.put(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID()); - headers.put(JMS_DELIVERY_MODE, jmsMessage.getJMSDeliveryMode()); - headers.put(JMS_DESTINATION, jmsMessage.getJMSDestination()); - headers.put(JMS_EXPIRATION, jmsMessage.getJMSExpiration()); - headers.put(JMS_MESSAGE_ID, jmsMessage.getJMSMessageID()); - headers.put(JMS_PRIORITY, jmsMessage.getJMSPriority()); - headers.put(JMS_REDELIVERED, jmsMessage.getJMSRedelivered()); - headers.put(JMS_TIMESTAMP, jmsMessage.getJMSTimestamp()); - headers.put(JMS_REPLY_TO, getJMSReplyTo(jmsMessage)); - headers.put(JMS_TYPE, getJMSType(jmsMessage)); - - // this works around a bug in the ActiveMQ property handling - headers.put(JMSX_GROUP_ID, jmsMessage.getStringProperty(JMSX_GROUP_ID)); - } catch (JMSException e) { - throw new RuntimeCamelException(e); + /** + * Normalizes the destination name. + * <p/> + * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which + * was intended as <tt>queue://foo</tt>. + * + * @param destination the destination + * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name + * @return the normalized destination + */ + public static String normalizeDestinationName(String destination, boolean includePrefix) { + if (ObjectHelper.isEmpty(destination)) { + return destination; + } + if (destination.startsWith(JmsConstants.QUEUE_PREFIX)) { + String s = removeStartingCharacters(destination.substring(JmsConstants.QUEUE_PREFIX.length()), '/'); + if (includePrefix) { + s = JmsConstants.QUEUE_PREFIX + "//" + s; } - - for (final Enumeration<String> enumeration = jmsMessage.getPropertyNames(); enumeration.hasMoreElements();) { - String key = enumeration.nextElement(); - if (hasIllegalHeaderKey(key)) { - throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value"); - } - Object value = jmsMessage.getObjectProperty(key); - String decodedName = keyFormatStrategy.decodeKey(key); - headers.put(decodedName, value); + return s; + } else if (destination.startsWith(JmsConstants.TEMP_QUEUE_PREFIX)) { + String s = removeStartingCharacters(destination.substring(JmsConstants.TEMP_QUEUE_PREFIX.length()), '/'); + if (includePrefix) { + s = JmsConstants.TEMP_QUEUE_PREFIX + "//" + s; } - } - if (out) { - exchange.getOut().setHeaders(headers); + return s; + } else if (destination.startsWith(JmsConstants.TOPIC_PREFIX)) { + String s = removeStartingCharacters(destination.substring(JmsConstants.TOPIC_PREFIX.length()), '/'); + if (includePrefix) { + s = JmsConstants.TOPIC_PREFIX + "//" + s; + } + return s; + } else if (destination.startsWith(JmsConstants.TEMP_TOPIC_PREFIX)) { + String s = removeStartingCharacters(destination.substring(JmsConstants.TEMP_TOPIC_PREFIX.length()), '/'); + if (includePrefix) { + s = JmsConstants.TEMP_TOPIC_PREFIX + "//" + s; + } + return s; } else { - exchange.getIn().setHeaders(headers); + return destination; } - return exchange; } /** - * Strategy to allow filtering of headers which are put on the JMS message - * <p/> - * <b>Note</b>: Currently only supports sending java identifiers as keys + * Sets the JMSReplyTo on the message. + * + * @param message the message + * @param replyTo the reply to destination */ - protected static boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String headerName, - Object headerValue, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) { - return headerFilterStrategy == null - || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange); + public static void setJMSReplyTo(Message message, Destination replyTo) { + try { + message.setJMSReplyTo(replyTo); + } catch (Exception e) { + // ignore due OracleAQ does not support accessing JMSReplyTo + } + } + + /** + * Gets the JMSReplyTo from the message. + * + * @param message the message + * @return the reply to, can be <tt>null</tt> + */ + public static Destination getJMSReplyTo(Message message) { + try { + return message.getJMSReplyTo(); + } catch (Exception e) { + // ignore due OracleAQ does not support accessing JMSReplyTo + } + + return null; } /** * Gets the JMSType from the message. * - * @param message the message + * @param message the message * @return the type, can be <tt>null</tt> */ public static String getJMSType(Message message) { @@ -431,6 +295,54 @@ public final class JmsMessageHelper implements JmsConstants { } /** + * Gets the String Properties from the message. + * + * @param message the message + * @return the type, can be <tt>null</tt> + */ + public static String getStringProperty(Message message, String propertyName) { + try { + return message.getStringProperty(propertyName); + } catch (Exception e) { + // ignore due some broker client does not support accessing StringProperty + } + + return null; + } + + /** + * Gets the JMSRedelivered from the message. + * + * @param message the message + * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, <tt>null</tt> if not able to determine + */ + public static Boolean getJMSRedelivered(Message message) { + try { + return message.getJMSRedelivered(); + } catch (Exception e) { + // ignore if JMS broker do not support this + } + + return null; + } + + /** + * Gets the JMSMessageID from the message. + * + * @param message the message + * @return the JMSMessageID, or <tt>null</tt> if not able to get + */ + public static String getJMSMessageID(Message message) { + try { + return message.getJMSMessageID(); + } catch (Exception e) { + // ignore if JMS broker do not support this + } + + return null; + } + + /** * Sets the JMSDeliveryMode on the message. * * @param exchange the exchange @@ -471,219 +383,19 @@ public final class JmsMessageHelper implements JmsConstants { } /** - * Sets the correlation id on the JMS message. - * <p/> - * Will ignore exception thrown - * - * @param message the JMS message - * @param type the correlation id - */ - public static void setMessageType(Message message, String type) { - try { - message.setJMSType(type); - } catch (JMSException e) { - LOG.debug("Error setting the message type: {}", type, e); - } - } - - /** - * Sets the correlation id on the JMS message. - * <p/> - * Will ignore exception thrown - * - * @param message the JMS message - * @param correlationId the correlation id - */ - public static void setCorrelationId(Message message, String correlationId) { - try { - message.setJMSCorrelationID(correlationId); - } catch (JMSException e) { - LOG.debug("Error setting the correlationId: {}", correlationId, e); - } - } - - /** - * Sets the JMSReplyTo on the message. + * Gets the JMSCorrelationIDAsBytes from the message. * * @param message the message - * @param replyTo the reply to destination + * @return the JMSCorrelationIDAsBytes, or <tt>null</tt> if not able to get */ - public static void setJMSReplyTo(Message message, Destination replyTo) { + public static String getJMSCorrelationIDAsBytes(Message message) { try { - message.setJMSReplyTo(replyTo); + return new String(message.getJMSCorrelationIDAsBytes()); } catch (Exception e) { - LOG.debug("Error setting the correlationId: {}", replyTo.toString()); + // ignore if JMS broker do not support this } - } - /** - * Gets the JMSReplyTo from the message. - * - * @param message the message - * @return the reply to, can be <tt>null</tt> - */ - public static Destination getJMSReplyTo(Message message) { - try { - return message.getJMSReplyTo(); - } catch (Exception e) { - // ignore due OracleAQ does not support accessing JMSReplyTo - } return null; } - /** - * Sets the property on the given JMS message. - * - * @param jmsMessage the JMS message - * @param name name of the property to set - * @param value the value - * @throws JMSException can be thrown - */ - public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException { - if (value == null) { - jmsMessage.setObjectProperty(name, null); - } else if (value instanceof Byte) { - jmsMessage.setByteProperty(name, (Byte) value); - } else if (value instanceof Boolean) { - jmsMessage.setBooleanProperty(name, (Boolean) value); - } else if (value instanceof Double) { - jmsMessage.setDoubleProperty(name, (Double) value); - } else if (value instanceof Float) { - jmsMessage.setFloatProperty(name, (Float) value); - } else if (value instanceof Integer) { - jmsMessage.setIntProperty(name, (Integer) value); - } else if (value instanceof Long) { - jmsMessage.setLongProperty(name, (Long) value); - } else if (value instanceof Short) { - jmsMessage.setShortProperty(name, (Short) value); - } else if (value instanceof String) { - jmsMessage.setStringProperty(name, (String) value); - } else { - // fallback to Object - jmsMessage.setObjectProperty(name, value); - } - } - - public static JmsMessageType discoverMessageTypeFromPayload(final Object payload) { - JmsMessageType answer; - // Default is a JMS Message since a body is not required - if (payload == null) { - answer = JmsMessageType.Message; - } else { - // Something was found in the body so determine - // what type of message we need to create - if (byte[].class.isInstance(payload)) { - answer = JmsMessageType.Bytes; - } else if (Map.class.isInstance(payload)) { - answer = JmsMessageType.Map; - } else if (Collection.class.isInstance(payload)) { - answer = JmsMessageType.Stream; - } else if (InputStream.class.isInstance(payload)) { - answer = JmsMessageType.Bytes; - } else if (ByteBuffer.class.isInstance(payload)) { - answer = JmsMessageType.Bytes; - } else if (File.class.isInstance(payload)) { - answer = JmsMessageType.Bytes; - } else if (Reader.class.isInstance(payload)) { - answer = JmsMessageType.Text; - } else if (String.class.isInstance(payload)) { - answer = JmsMessageType.Text; - } else if (CharBuffer.class.isInstance(payload)) { - answer = JmsMessageType.Text; - } else if (char[].class.isInstance(payload)) { - answer = JmsMessageType.Text; - } else if (Character.class.isInstance(payload)) { - answer = JmsMessageType.Text; - } else if (Serializable.class.isInstance(payload)) { - answer = JmsMessageType.Object; - } else { - answer = JmsMessageType.Message; - } - } - return answer; - } - - public static JmsMessageType discoverJmsMessageType(Message message) { - JmsMessageType answer; - if (message != null) { - if (BytesMessage.class.isInstance(message)) { - answer = JmsMessageType.Bytes; - } else if (MapMessage.class.isInstance(message)) { - answer = JmsMessageType.Map; - } else if (TextMessage.class.isInstance(message)) { - answer = JmsMessageType.Text; - } else if (StreamMessage.class.isInstance(message)) { - answer = JmsMessageType.Stream; - } else if (ObjectMessage.class.isInstance(message)) { - answer = JmsMessageType.Object; - } else { - answer = JmsMessageType.Message; - } - } else { - answer = JmsMessageType.Message; - } - return answer; - } - - private static boolean hasIllegalHeaderKey(String key) { - return key == null || key.isEmpty() || key.contains(".") || key.contains("-"); - } - - /** - * Normalizes the destination name. - * <p/> - * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which - * was intended as <tt>queue://foo</tt>. - * - * @param destination the destination - * @return the normalized destination - */ - public static String normalizeDestinationName(String destination) { - // do not include prefix which is the current behavior when using this method. - return normalizeDestinationName(destination, false); - } - - /** - * Normalizes the destination name. - * <p/> - * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which - * was intended as <tt>queue://foo</tt>. - * - * @param destination the destination - * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name - * @return the normalized destination - */ - public static String normalizeDestinationName(String destination, boolean includePrefix) { - if (ObjectHelper.isEmpty(destination)) { - return destination; - } - if (destination.startsWith(QUEUE_PREFIX)) { - String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/'); - if (includePrefix) { - s = QUEUE_PREFIX + "//" + s; - } - return s; - } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) { - String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/'); - if (includePrefix) { - s = TEMP_QUEUE_PREFIX + "//" + s; - } - return s; - } else if (destination.startsWith(TOPIC_PREFIX)) { - String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/'); - if (includePrefix) { - s = TOPIC_PREFIX + "//" + s; - } - return s; - } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) { - String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/'); - if (includePrefix) { - s = TEMP_TOPIC_PREFIX + "//" + s; - } - return s; - } else { - return destination; - } - } - } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java deleted file mode 100644 index 3b7566f..0000000 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms.jms; - -/** - * Strategy for applying encoding and decoding of JMS headers so they apply to - * the JMS spec. - */ -public interface KeyFormatStrategy { - - /** - * Encodes the key before its sent as a {@link javax.jms.Message} message. - * - * @param key the original key - * @return the encoded key - */ - String encodeKey(String key); - - /** - * Decodes the key after its received from a {@link javax.jms.Message} - * message. - * - * @param key the encoded key - * @return the decoded key as the original key - */ - String decodeKey(String key); -} http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java new file mode 100644 index 0000000..894ef61 --- /dev/null +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.jms; + +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.camel.Exchange; + +/** + * A strategy that allows custom components to plugin and perform custom logic when Camel creates {@link javax.jms.Message} instance. + * <p/> + * For example to populate the message with custom information that are component specific and not part of the JMS specification. + */ +public interface MessageCreatedStrategy { + + /** + * Callback when the JMS message has <i>just</i> been created, which allows custom modifications afterwards. + * + * @param exchange the current exchange + * @param session the JMS session used to create the message + * @param cause optional exception occurred that should be sent as reply instead of a regular body + */ + void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause); +} http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java index 3fa23b0..93f8648 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java @@ -32,7 +32,6 @@ import org.apache.camel.component.sjms.BatchMessage; import org.apache.camel.component.sjms.MessageProducerResources; import org.apache.camel.component.sjms.SjmsProducer; import org.apache.camel.component.sjms.TransactionCommitStrategy; -import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy; import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; @@ -46,11 +45,6 @@ public class InOnlyProducer extends SjmsProducer { super(endpoint); } - /* - * @see org.apache.camel.component.sjms.SjmsProducer#doCreateProducerModel() - * @return - * @throws Exception - */ @Override public MessageProducerResources doCreateProducerModel() throws Exception { MessageProducerResources answer; @@ -75,13 +69,6 @@ public class InOnlyProducer extends SjmsProducer { return answer; } - /* - * @see - * org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback) - * @param exchange - * @param callback - * @throws Exception - */ @Override public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception { try { @@ -93,19 +80,18 @@ public class InOnlyProducer extends SjmsProducer { Message message; if (BatchMessage.class.isInstance(object)) { BatchMessage<?> batchMessage = (BatchMessage<?>) object; - message = JmsMessageHelper.createMessage(exchange, producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint()); + message = getEndpoint().getBinding().makeJmsMessage(exchange, batchMessage.getPayload(), batchMessage.getHeaders(), producer.getSession(), null); } else { - message = JmsMessageHelper.createMessage(exchange, producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint()); + message = getEndpoint().getBinding().makeJmsMessage(exchange, object, exchange.getIn().getHeaders(), producer.getSession(), null); } messages.add(message); } } else { - Object payload = exchange.getIn().getBody(); - Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint()); + Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession()); messages.add(message); } } else { - Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), null, exchange.getIn().getHeaders(), getEndpoint()); + Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession()); messages.add(message); } @@ -124,4 +110,5 @@ public class InOnlyProducer extends SjmsProducer { callback.done(isSynchronous()); } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index 039daf0..1c535b6 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -36,11 +36,14 @@ import org.apache.camel.Exchange; import org.apache.camel.component.sjms.MessageConsumerResources; import org.apache.camel.component.sjms.MessageProducerResources; import org.apache.camel.component.sjms.SjmsEndpoint; +import org.apache.camel.component.sjms.SjmsMessage; import org.apache.camel.component.sjms.SjmsProducer; import org.apache.camel.component.sjms.jms.JmsConstants; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; +import org.apache.camel.spi.UuidGenerator; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.impl.GenericObjectPool; @@ -52,6 +55,17 @@ public class InOutProducer extends SjmsProducer { private static final Map<String, Exchanger<Object>> EXCHANGERS = new ConcurrentHashMap<String, Exchanger<Object>>(); + private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-"; + private UuidGenerator uuidGenerator; + + public UuidGenerator getUuidGenerator() { + return uuidGenerator; + } + + public void setUuidGenerator(UuidGenerator uuidGenerator) { + this.uuidGenerator = uuidGenerator; + } + /** * A pool of {@link MessageConsumerResources} objects that are the reply * consumers. @@ -134,6 +148,10 @@ public class InOutProducer extends SjmsProducer { } else { log.debug("Using {} as the reply to destination.", getNamedReplyTo()); } + if (uuidGenerator == null) { + // use the generator configured on the camel context + uuidGenerator = getEndpoint().getCamelContext().getUuidGenerator(); + } if (getConsumers() == null) { setConsumers(new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory())); getConsumers().setMaxActive(getConsumerCount()); @@ -185,16 +203,14 @@ public class InOutProducer extends SjmsProducer { exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy())); } - Message request = JmsMessageHelper.createMessage(exchange, producer.getSession(), getEndpoint()); + Message request = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession()); - // TODO just set the correlation id don't get it from the - // message - String correlationId; - if (exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class) == null) { - correlationId = UUID.randomUUID().toString().replace("-", ""); - } else { - correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class); + String correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class); + if (correlationId == null) { + // we append the 'Camel-' prefix to know it was generated by us + correlationId = GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid(); } + Object responseObject = null; Exchanger<Object> messageExchanger = new Exchanger<Object>(); JmsMessageHelper.setCorrelationId(request, correlationId); @@ -229,8 +245,13 @@ public class InOutProducer extends SjmsProducer { if (responseObject instanceof Throwable) { exchange.setException((Throwable) responseObject); } else if (responseObject instanceof Message) { - Message response = (Message) responseObject; - JmsMessageHelper.populateExchange(response, exchange, true, getEndpoint().getJmsKeyFormatStrategy()); + Message message = (Message) responseObject; + + SjmsMessage response = new SjmsMessage(message, consumer.getSession(), getEndpoint().getBinding()); + // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand + // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access + // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218). + exchange.setOut(response); } else { exchange.setException(new CamelException("Unknown response type: " + responseObject)); } http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java deleted file mode 100644 index d389886..0000000 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sjms.typeconversion; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.io.StringReader; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.util.HashMap; -import java.util.Map; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.sjms.support.JmsTestSupport; -import org.junit.Ignore; -import org.junit.Test; - -public class JMSMessageHelperTypeConversionTest extends JmsTestSupport { - - private static final String SJMS_QUEUE_URI = "sjms:queue:start"; - private static final String MOCK_RESULT_URI = "mock:result"; - private Exchange message; - - @Test - public void testJMSMessageHelperString() throws Exception { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); - - template.sendBody(SJMS_QUEUE_URI, "Hello Camel"); - assertMockEndpointsSatisfied(); - assertTrue(String.class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperMap() throws Exception { - getMockEndpoint(MOCK_RESULT_URI).expectedMessageCount(1); - - Map<Object, Object> map = new HashMap<Object, Object>(); - map.put("Hello", "Camel"); - map.put("Int", Integer.MAX_VALUE); - map.put("Boolean", Boolean.TRUE); - map.put(Boolean.TRUE, Long.MAX_VALUE); - - template.sendBody(SJMS_QUEUE_URI, map); - assertMockEndpointsSatisfied(); - assertTrue(Map.class.isInstance(message.getIn().getBody())); - assertEquals("Camel", message.getIn().getBody(Map.class).get("Hello")); - assertEquals(Integer.MAX_VALUE, message.getIn().getBody(Map.class).get("Int")); - assertEquals(Boolean.TRUE, message.getIn().getBody(Map.class).get("Boolean")); - assertEquals(Long.MAX_VALUE, message.getIn().getBody(Map.class).get("true")); - } - - @Ignore - @Test - public void testJMSMessageHelperCollection() throws Exception { - // TODO: Once SJMS can accept a Collection as Body - } - - @Test - public void testJMSMessageHelperByteArray() throws Exception { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); - - byte[] bytes = "Hello Camel".getBytes(); - template.sendBody(SJMS_QUEUE_URI, bytes); - assertMockEndpointsSatisfied(); - assertTrue(byte[].class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperInputStream() throws Exception { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); - String p = "Hello Camel"; - InputStream is = new ByteArrayInputStream(p.getBytes()); - template.sendBody(SJMS_QUEUE_URI, is); - assertMockEndpointsSatisfied(); - assertTrue(byte[].class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperCharBuffer() throws Exception { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); - CharBuffer cb = CharBuffer.wrap("Hello Camel"); - template.sendBody(SJMS_QUEUE_URI, cb); - assertMockEndpointsSatisfied(); - assertTrue(String.class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperByteBuffer() throws Exception { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); - String p = "Hello Camel"; - ByteBuffer bb = ByteBuffer.wrap(p.getBytes()); - template.sendBody(SJMS_QUEUE_URI, bb); - assertMockEndpointsSatisfied(); - assertTrue(byte[].class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperFile() throws InterruptedException, IOException { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes()); - String p = "Hello Camel"; - File f = File.createTempFile("tmp-test", ".txt"); - BufferedWriter bw = new BufferedWriter(new FileWriter(f)); - bw.write(p); - bw.close(); - template.sendBody(SJMS_QUEUE_URI, f); - assertMockEndpointsSatisfied(); - boolean resultDelete = f.delete(); - assertTrue(resultDelete); - assertTrue(byte[].class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperReader() throws InterruptedException, IOException { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); - String p = "Hello Camel"; - File f = File.createTempFile("tmp-test", ".txt"); - BufferedWriter bw = new BufferedWriter(new FileWriter(f)); - bw.write(p); - bw.close(); - Reader test = new BufferedReader(new FileReader(f.getAbsolutePath())); - template.sendBody(SJMS_QUEUE_URI, test); - assertMockEndpointsSatisfied(); - assertTrue(f.delete()); - assertTrue(String.class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperStringReader() throws InterruptedException, FileNotFoundException { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); - String p = "Hello Camel"; - StringReader test = new StringReader(p); - template.sendBody(SJMS_QUEUE_URI, test); - assertMockEndpointsSatisfied(); - assertTrue(String.class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperChar() throws InterruptedException, FileNotFoundException { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("H"); - char p = 'H'; - template.sendBody(SJMS_QUEUE_URI, p); - assertMockEndpointsSatisfied(); - assertTrue(String.class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperCharacter() throws InterruptedException, FileNotFoundException { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("H"); - Character p = 'H'; - template.sendBody(SJMS_QUEUE_URI, p); - assertMockEndpointsSatisfied(); - assertTrue(String.class.isInstance(message.getIn().getBody())); - } - - @Test - public void testJMSMessageHelperCharArray() throws InterruptedException, FileNotFoundException { - getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel"); - char[] p = {'H', 'e', 'l', 'l', 'o', ' ', 'C', 'a', 'm', 'e', 'l'}; - template.sendBody(SJMS_QUEUE_URI, p); - assertMockEndpointsSatisfied(); - assertTrue(String.class.isInstance(message.getIn().getBody())); - } - - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - public void configure() throws Exception { - interceptSendToEndpoint(MOCK_RESULT_URI).process(new Processor() { - public void process(Exchange exchange) throws Exception { - message = exchange; - } - }); - - from(SJMS_QUEUE_URI).to(MOCK_RESULT_URI); - } - }; - } -}
