http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 59c306f..985f4f5 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -18,40 +18,63 @@ package org.apache.activemq.transport.amqp.message; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL; -import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_ENCODING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_TYPE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.DELIVERY_ANNOTATION_PREFIX; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FIRST_ACQUIRER; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FOOTER_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.HEADER; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX_LENGTH; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_FORMAT; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.NATIVE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.ORIGINAL_ENCODING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.PROPERTIES; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.REPLYTO_GROUP_ID; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getMapFromMessageBody; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Date; -import java.util.Enumeration; import java.util.HashMap; import java.util.Map; -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.ObjectMessage; import javax.jms.Queue; -import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.MessageId; import org.apache.activemq.transport.amqp.AmqpProtocolException; +import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.activemq.util.TypeConversionSupport; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; @@ -66,12 +89,12 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.CompositeWritableBuffer; -import org.apache.qpid.proton.codec.DroppingWritableBuffer; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.fusesource.hawtbuf.UTF8Buffer; -public class JMSMappingOutboundTransformer extends OutboundTransformer { +public class JMSMappingOutboundTransformer implements OutboundTransformer { public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); @@ -81,225 +104,276 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_TOPIC_TYPE = 0x03; - // Deprecated legacy values used by old QPid AMQP 1.0 JMS client. - - public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type"); - public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type"); - - public static final String LEGACY_QUEUE_TYPE = "queue"; - public static final String LEGACY_TOPIC_TYPE = "topic"; - public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue"; - public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic"; - - public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) { - super(vendor); + // For now Proton requires that we create a decoder to create an encoder + private final DecoderImpl decoder = new DecoderImpl(); + private final EncoderImpl encoder = new EncoderImpl(decoder); + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); } @Override - public EncodedMessage transform(Message msg) throws Exception { - if (msg == null) { - return null; - } - - try { - if (msg.getBooleanProperty(prefixVendor + "NATIVE")) { - return null; - } - } catch (MessageFormatException e) { + public EncodedMessage transform(ActiveMQMessage message) throws Exception { + if (message == null) { return null; } - ProtonJMessage amqp = convert(msg); - - long messageFormat; - try { - messageFormat = msg.getLongProperty(this.messageFormatKey); - } catch (MessageFormatException e) { - return null; - } - - ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); - final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); - int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); - if (overflow.position() > 0) { - buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); - c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); - } - - return new EncodedMessage(messageFormat, buffer.array(), 0, c); - } - - /** - * Perform the conversion between JMS Message and Proton Message without - * re-encoding it to array. This is needed because some frameworks may elect - * to do this on their own way. - * - * @param message - * The message to transform into an AMQP version for dispatch. - * - * @return an AMQP Message that represents the given JMS Message. - * - * @throws Exception if an error occurs during the conversion. - */ - public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException { - Header header = new Header(); - Properties props = new Properties(); + long messageFormat = 0; + Header header = null; + Properties properties = null; Map<Symbol, Object> daMap = null; Map<Symbol, Object> maMap = null; Map<String,Object> apMap = null; Map<Object, Object> footerMap = null; - Section body = null; - body = convertBody(message); + Section body = convertBody(message); - header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); - header.setPriority(new UnsignedByte((byte) message.getJMSPriority())); - if (message.getJMSType() != null) { - props.setSubject(message.getJMSType()); + if (message.isPersistent()) { + if (header == null) { + header = new Header(); + } + header.setDurable(true); } - if (message.getJMSMessageID() != null) { - props.setMessageId(vendor.getOriginalMessageId(message)); + byte priority = message.getPriority(); + if (priority != Message.DEFAULT_PRIORITY) { + if (header == null) { + header = new Header(); + } + header.setPriority(new UnsignedByte(priority)); + } + String type = message.getType(); + if (type != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setSubject(type); + } + MessageId messageId = message.getMessageId(); + if (messageId != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setMessageId(getOriginalMessageId(message)); } - if (message.getJMSDestination() != null) { - props.setTo(vendor.toAddress(message.getJMSDestination())); + ActiveMQDestination destination = message.getDestination(); + if (destination != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setTo(destination.getQualifiedName()); if (maMap == null) { maMap = new HashMap<Symbol, Object>(); } - maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination())); - - // Deprecated: used by legacy QPid AMQP 1.0 JMS client - maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination())); + maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); } - if (message.getJMSReplyTo() != null) { - props.setReplyTo(vendor.toAddress(message.getJMSReplyTo())); + ActiveMQDestination replyTo = message.getReplyTo(); + if (replyTo != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyTo(replyTo.getQualifiedName()); if (maMap == null) { maMap = new HashMap<Symbol, Object>(); } - maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo())); - - // Deprecated: used by legacy QPid AMQP 1.0 JMS client - maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo())); + maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); } - if (message.getJMSCorrelationID() != null) { - String correlationId = message.getJMSCorrelationID(); + String correlationId = message.getCorrelationId(); + if (correlationId != null) { + if (properties == null) { + properties = new Properties(); + } try { - props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); + properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); } catch (AmqpProtocolException e) { - props.setCorrelationId(correlationId); + properties.setCorrelationId(correlationId); } } - if (message.getJMSExpiration() != 0) { - long ttl = message.getJMSExpiration() - System.currentTimeMillis(); + long expiration = message.getExpiration(); + if (expiration != 0) { + long ttl = expiration - System.currentTimeMillis(); if (ttl < 0) { ttl = 1; } + + if (header == null) { + header = new Header(); + } header.setTtl(new UnsignedInteger((int) ttl)); - props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration())); + if (properties == null) { + properties = new Properties(); + } + properties.setAbsoluteExpiryTime(new Date(expiration)); } - if (message.getJMSTimestamp() != 0) { - props.setCreationTime(new Date(message.getJMSTimestamp())); + long timeStamp = message.getTimestamp(); + if (timeStamp != 0) { + if (properties == null) { + properties = new Properties(); + } + properties.setCreationTime(new Date(timeStamp)); } - @SuppressWarnings("unchecked") - final Enumeration<String> keys = message.getPropertyNames(); - - while (keys.hasMoreElements()) { - String key = keys.nextElement(); - if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) { - // skip transformer appended properties - } else if (key.equals(firstAcquirerKey)) { - header.setFirstAcquirer(message.getBooleanProperty(key)); - } else if (key.startsWith("JMSXDeliveryCount")) { - // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. - int amqpDeliveryCount = message.getIntProperty(key) - 1; - if (amqpDeliveryCount > 0) { - header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); - } - } else if (key.startsWith("JMSXUserID")) { - String value = message.getStringProperty(key); - props.setUserId(new Binary(value.getBytes("UTF-8"))); - } else if (key.startsWith("JMSXGroupID")) { - String value = message.getStringProperty(key); - props.setGroupId(value); - if (apMap == null) { - apMap = new HashMap<String, Object>(); - } - apMap.put(key, value); - } else if (key.startsWith("JMSXGroupSeq")) { - UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key)); - props.setGroupSequence(value); - if (apMap == null) { - apMap = new HashMap<String, Object>(); - } - apMap.put(key, value); - } else if (key.startsWith(prefixDeliveryAnnotationsKey)) { - if (daMap == null) { - daMap = new HashMap<Symbol, Object>(); - } - String name = key.substring(prefixDeliveryAnnotationsKey.length()); - daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - } else if (key.startsWith(prefixMessageAnnotationsKey)) { - if (maMap == null) { - maMap = new HashMap<Symbol, Object>(); - } - String name = key.substring(prefixMessageAnnotationsKey.length()); - maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - } else if (key.equals(contentTypeKey)) { - props.setContentType(Symbol.getSymbol(message.getStringProperty(key))); - } else if (key.equals(contentEncodingKey)) { - props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key))); - } else if (key.equals(replyToGroupIDKey)) { - props.setReplyToGroupId(message.getStringProperty(key)); - } else if (key.startsWith(prefixFooterKey)) { - if (footerMap == null) { - footerMap = new HashMap<Object, Object>(); - } - String name = key.substring(prefixFooterKey.length()); - footerMap.put(name, message.getObjectProperty(key)); - } else { - if (apMap == null) { - apMap = new HashMap<String, Object>(); + // JMSX Message Properties + int deliveryCount = message.getRedeliveryCounter(); + if (deliveryCount > 0) { + if (header == null) { + header = new Header(); + } + header.setDeliveryCount(new UnsignedInteger(deliveryCount)); + } + String userId = message.getUserID(); + if (userId != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8))); + } + String groupId = message.getGroupID(); + if (groupId != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setGroupId(groupId); + } + int groupSequence = message.getGroupSequence(); + if (groupSequence > 0) { + UnsignedInteger value = new UnsignedInteger(groupSequence); + if (properties == null) { + properties = new Properties(); + } + properties.setGroupSequence(value); + } + + final Map<String, Object> entries; + try { + entries = message.getProperties(); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + + for (Map.Entry<String, Object> entry : entries.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (value instanceof UTF8Buffer) { + value = value.toString(); + } + + if (key.startsWith(JMS_AMQP_PREFIX)) { + if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) { + // skip transformer appended properties + continue; + } else if (key.startsWith(ORIGINAL_ENCODING, JMS_AMQP_PREFIX_LENGTH)) { + // skip transformer appended properties + continue; + } else if (key.startsWith(MESSAGE_FORMAT, JMS_AMQP_PREFIX_LENGTH)) { + messageFormat = (long) TypeConversionSupport.convert(entry.getValue(), Long.class); + continue; + } else if (key.startsWith(HEADER, JMS_AMQP_PREFIX_LENGTH)) { + if (header == null) { + header = new Header(); + } + continue; + } else if (key.startsWith(PROPERTIES, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + continue; + } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { + if (maMap == null) { + maMap = new HashMap<Symbol, Object>(); + } + String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); + maMap.put(Symbol.valueOf(name), value); + continue; + } else if (key.startsWith(FIRST_ACQUIRER, JMS_AMQP_PREFIX_LENGTH)) { + if (header == null) { + header = new Header(); + } + header.setFirstAcquirer((boolean) TypeConversionSupport.convert(value, Boolean.class)); + continue; + } else if (key.startsWith(CONTENT_TYPE, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentType(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class))); + continue; + } else if (key.startsWith(CONTENT_ENCODING, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentEncoding(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class))); + continue; + } else if (key.startsWith(REPLYTO_GROUP_ID, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyToGroupId((String) TypeConversionSupport.convert(value, String.class)); + continue; + } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { + if (daMap == null) { + daMap = new HashMap<Symbol, Object>(); + } + String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); + daMap.put(Symbol.valueOf(name), value); + continue; + } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { + if (footerMap == null) { + footerMap = new HashMap<Object, Object>(); + } + String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); + footerMap.put(name, value); + continue; } - apMap.put(key, message.getObjectProperty(key)); } + + // The property didn't map into any other slot so we store it in the + // Application Properties section of the message. + if (apMap == null) { + apMap = new HashMap<String, Object>(); + } + apMap.put(key, value); } - MessageAnnotations ma = null; - if (maMap != null) { - ma = new MessageAnnotations(maMap); + final AmqpWritableBuffer buffer = new AmqpWritableBuffer(); + encoder.setByteBuffer(buffer); + + if (header != null) { + encoder.writeObject(header); } - DeliveryAnnotations da = null; if (daMap != null) { - da = new DeliveryAnnotations(daMap); + encoder.writeObject(new DeliveryAnnotations(daMap)); + } + if (maMap != null) { + encoder.writeObject(new MessageAnnotations(maMap)); + } + if (properties != null) { + encoder.writeObject(properties); } - ApplicationProperties ap = null; if (apMap != null) { - ap = new ApplicationProperties(apMap); + encoder.writeObject(new ApplicationProperties(apMap)); + } + if (body != null) { + encoder.writeObject(body); } - Footer footer = null; if (footerMap != null) { - footer = new Footer(footerMap); + encoder.writeObject(new Footer(footerMap)); } - return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer); + return new EncodedMessage(messageFormat, buffer.getArray(), 0, buffer.getArrayLength()); } - private Section convertBody(Message message) throws JMSException { + private Section convertBody(ActiveMQMessage message) throws JMSException { Section body = null; short orignalEncoding = AMQP_UNKNOWN; - if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) { - try { - orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY); - } catch (Exception ex) { - } + try { + orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING); + } catch (Exception ex) { + // Ignore and stick with UNKNOWN } - if (message instanceof BytesMessage) { - Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message); + if (message instanceof ActiveMQBytesMessage) { + Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message); if (payload == null) { payload = EMPTY_BINARY; @@ -317,12 +391,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { body = new Data(payload); break; } - } else if (message instanceof TextMessage) { + } else if (message instanceof ActiveMQTextMessage) { switch (orignalEncoding) { case AMQP_NULL: break; case AMQP_DATA: - body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message)); + body = new Data(getBinaryFromMessageBody((ActiveMQTextMessage) message)); break; case AMQP_VALUE_STRING: case AMQP_UNKNOWN: @@ -330,11 +404,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { body = new AmqpValue(((TextMessage) message).getText()); break; } - } else if (message instanceof MapMessage) { - body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message)); - } else if (message instanceof StreamMessage) { + } else if (message instanceof ActiveMQMapMessage) { + body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message)); + } else if (message instanceof ActiveMQStreamMessage) { ArrayList<Object> list = new ArrayList<Object>(); - final StreamMessage m = (StreamMessage) message; + final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message; try { while (true) { list.add(m.readObject()); @@ -352,8 +426,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { body = new AmqpValue(list); break; } - } else if (message instanceof ObjectMessage) { - Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message); + } else if (message instanceof ActiveMQObjectMessage) { + Binary payload = getBinaryFromMessageBody((ActiveMQObjectMessage) message); if (payload == null) { payload = EMPTY_BINARY; @@ -373,8 +447,10 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { // For a non-AMQP message we tag the outbound content type as containing // a serialized Java object so that an AMQP client has a hint as to what // we are sending it. - if (!message.propertyExists(contentTypeKey)) { - vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) { + message.setReadOnlyProperties(false); + message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + message.setReadOnlyProperties(true); } } @@ -399,23 +475,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); } - // Used by legacy QPid AMQP 1.0 JMS client. - @Deprecated - private static String destinationAttributes(Destination destination) { - if (destination instanceof Queue) { - if (destination instanceof TemporaryQueue) { - return LEGACY_TEMP_QUEUE_TYPE; - } else { - return LEGACY_QUEUE_TYPE; - } - } else if (destination instanceof Topic) { - if (destination instanceof TemporaryTopic) { - return LEGACY_TEMP_TOPIC_TYPE; - } else { - return LEGACY_TOPIC_TYPE; + private static Object getOriginalMessageId(ActiveMQMessage message) { + Object result; + MessageId messageId = message.getMessageId(); + if (messageId.getTextView() != null) { + try { + result = AMQPMessageIdHelper.INSTANCE.toIdObject(messageId.getTextView()); + } catch (AmqpProtocolException e) { + result = messageId.getTextView(); } + } else { + result = messageId.toString(); } - throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); + return result; } }
http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java index 2eefa50..6ca9ced 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java @@ -16,54 +16,10 @@ */ package org.apache.activemq.transport.amqp.message; -import javax.jms.Message; +import org.apache.activemq.command.ActiveMQMessage; -public abstract class OutboundTransformer { +public interface OutboundTransformer { - protected final ActiveMQJMSVendor vendor; + public abstract EncodedMessage transform(ActiveMQMessage message) throws Exception; - protected String prefixVendor; - - protected String prefixDeliveryAnnotations = "DA_"; - protected String prefixMessageAnnotations= "MA_"; - protected String prefixFooter = "FT_"; - - protected String messageFormatKey; - protected String nativeKey; - protected String firstAcquirerKey; - protected String prefixDeliveryAnnotationsKey; - protected String prefixMessageAnnotationsKey; - protected String contentTypeKey; - protected String contentEncodingKey; - protected String replyToGroupIDKey; - protected String prefixFooterKey; - - public OutboundTransformer(ActiveMQJMSVendor vendor) { - this.vendor = vendor; - this.setPrefixVendor("JMS_AMQP_"); - } - - public abstract EncodedMessage transform(Message jms) throws Exception; - - public String getPrefixVendor() { - return prefixVendor; - } - - public void setPrefixVendor(String prefixVendor) { - this.prefixVendor = prefixVendor; - - messageFormatKey = prefixVendor + "MESSAGE_FORMAT"; - nativeKey = prefixVendor + "NATIVE"; - firstAcquirerKey = prefixVendor + "FirstAcquirer"; - prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; - prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; - contentTypeKey = prefixVendor + "ContentType"; - contentEncodingKey = prefixVendor + "ContentEncoding"; - replyToGroupIDKey = prefixVendor + "ReplyToGroupID"; - prefixFooterKey = prefixVendor + prefixFooter; - } - - public ActiveMQJMSVendor getVendor() { - return vendor; - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 503a05e..33c319e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -37,7 +37,6 @@ import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer; import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer; -import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.activemq.transport.amqp.message.InboundTransformer; import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; @@ -138,14 +137,14 @@ public class AmqpReceiver extends AmqpAbstractReceiver { if (inboundTransformer == null) { String transformer = session.getConnection().getConfiguredTransformer(); if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) { - inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new JMSMappingInboundTransformer(); } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) { - inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new AMQPNativeInboundTransformer(); } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) { - inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new AMQPRawInboundTransformer(); } else { LOG.warn("Unknown transformer type {} using native one instead", transformer); - inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new AMQPNativeInboundTransformer(); } } return inboundTransformer; @@ -157,7 +156,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver { EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length); InboundTransformer transformer = getTransformer(); - ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em); + ActiveMQMessage message = transformer.transform(em); current = null; http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 455e0b0..2531c1a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.protocol; import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT; import java.io.IOException; import java.util.LinkedList; @@ -39,7 +40,6 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; -import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer; import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.activemq.transport.amqp.message.OutboundTransformer; @@ -75,11 +75,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; - private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE); + private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(); private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>(); private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); - private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; private final ConsumerInfo consumerInfo; private AbstractSubscription subscription; @@ -437,8 +436,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { temp = (ActiveMQMessage) md.getMessage(); } - if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { - temp.setProperty(MESSAGE_FORMAT_KEY, 0); + if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) { + temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0); } } @@ -477,6 +476,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { currentDelivery = getEndpoint().delivery(tag, 0, tag.length); } currentDelivery.setContext(md); + currentDelivery.setMessageFormat((int) amqp.getMessageFormat()); } else { // TODO: message could not be generated what now? } http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java index b513c1a..201cee2 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java @@ -87,8 +87,6 @@ public class AmqpTransformerTest { assertTrue(message instanceof BytesMessage); Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); - Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); - assertEquals(0L, messageFormat.longValue()); assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); assertEquals(7, message.getJMSPriority()); @@ -136,8 +134,6 @@ public class AmqpTransformerTest { LOG.info("Recieved message: {}", message); assertTrue(message instanceof BytesMessage); Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); - Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); - assertEquals(0L, messageFormat.longValue()); assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); @@ -184,8 +180,6 @@ public class AmqpTransformerTest { assertTrue(message instanceof TextMessage); Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); - Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); - assertEquals(0L, messageFormat.longValue()); assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed); assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java index 84d5864..fa61e14 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -468,4 +469,46 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { amqp.close(); openwire.close(); } + + //----- Test Qpid JMS to Qpid JMS interop with transformers --------------// + + @Test + public void testQpidJMSToQpidJMSMessageSendReceive() throws Exception { + final int SIZE = 1024; + final int NUM_MESSAGES = 100; + + Connection amqpSend = createConnection("client-1"); + Connection amqpReceive = createConnection("client-2"); + + amqpReceive.start(); + + Session senderSession = amqpSend.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session receiverSession = amqpReceive.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination queue = senderSession.createQueue(getDestinationName()); + + MessageProducer amqpProducer = senderSession.createProducer(queue); + MessageConsumer amqpConsumer = receiverSession.createConsumer(queue); + + byte[] payload = new byte[SIZE]; + + for (int i = 0; i < NUM_MESSAGES; ++i) { + BytesMessage outgoing = senderSession.createBytesMessage(); + outgoing.setLongProperty("SendTime", System.currentTimeMillis()); + outgoing.writeBytes(payload); + amqpProducer.send(outgoing); + } + + // Now consumer the message + for (int i = 0; i < NUM_MESSAGES; ++i) { + Message received = amqpConsumer.receive(2000); + assertNotNull(received); + assertTrue("Expected BytesMessage but got " + received, received instanceof BytesMessage); + BytesMessage incoming = (BytesMessage) received; + assertEquals(SIZE, incoming.getBodyLength()); + } + + amqpReceive.close(); + amqpSend.close(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 2b1b874..e5e1bbd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -257,7 +257,7 @@ public class AmqpMessage { * @return the set message ID in String form or null if not set. */ public String getMessageId() { - if (message.getProperties() == null) { + if (message.getProperties() == null || message.getProperties().getMessageId() == null) { return null; } @@ -309,7 +309,7 @@ public class AmqpMessage { * @return the set correlation ID in String form or null if not set. */ public String getCorrelationId() { - if (message.getProperties() == null) { + if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) { return null; } @@ -387,7 +387,7 @@ public class AmqpMessage { * @return true if the message is marked as being durable. */ public boolean isDurable() { - if (message.getHeader() == null) { + if (message.getHeader() == null || message.getHeader().getDurable() == null) { return false; } http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java index ba0f014..1427b5a 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java @@ -51,7 +51,6 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.message.Message; import org.junit.Test; -import org.mockito.Mockito; public class JMSMappingInboundTransformerTest { @@ -65,8 +64,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE); @@ -86,8 +84,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); @@ -107,8 +104,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); @@ -122,8 +118,7 @@ public class JMSMappingInboundTransformerTest { @Test public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType("text/plain"); @@ -143,8 +138,7 @@ public class JMSMappingInboundTransformerTest { * @throws Exception if an error occurs during the test. */ public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType("unknown-content-type"); @@ -174,8 +168,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -197,8 +190,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -222,8 +214,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -246,8 +237,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -350,8 +340,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -377,8 +366,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -398,8 +386,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -415,8 +402,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setBody(new AmqpValue(new Binary(new byte[0]))); @@ -443,8 +429,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -465,8 +450,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -487,8 +471,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -509,8 +492,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -531,8 +513,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); @@ -548,8 +529,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); @@ -589,9 +569,7 @@ public class JMSMappingInboundTransformerTest { } private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception { - ActiveMQTextMessage mockTextMessage = createMockTextMessage(); - ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); String toAddress = "toAddress"; Message amqp = Message.Factory.create(); @@ -608,11 +586,6 @@ public class JMSMappingInboundTransformerTest { javax.jms.Message jmsMessage = transformer.transform(em); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); - - // Verify that createDestination was called with the provided 'to' - // address and 'Destination' class - // TODO - No need to really test this bit ? - // Mockito.verify(mockVendor).createDestination(toAddress, expectedClass); } //----- ReplyTo Conversions ----------------------------------------------// @@ -643,9 +616,7 @@ public class JMSMappingInboundTransformerTest { } private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception { - ActiveMQTextMessage mockTextMessage = createMockTextMessage(); - ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); String replyToAddress = "replyToAddress"; Message amqp = Message.Factory.create(); @@ -662,31 +633,10 @@ public class JMSMappingInboundTransformerTest { javax.jms.Message jmsMessage = transformer.transform(em); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); - - // Verify that createDestination was called with the provided 'replyTo' - // address and 'Destination' class - // TODO - No need to really test this bit ? - // Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass); } //----- Utility Methods --------------------------------------------------// - private ActiveMQTextMessage createMockTextMessage() { - return Mockito.mock(ActiveMQTextMessage.class); - } - - private ActiveMQJMSVendor createMockVendor(ActiveMQTextMessage mockTextMessage) { - ActiveMQJMSVendor mockVendor = Mockito.mock(ActiveMQJMSVendor.class); - Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage); - Mockito.when(mockVendor.createTextMessage(Mockito.any(String.class))).thenReturn(mockTextMessage); - - return mockVendor; - } - - private ActiveMQJMSVendor createVendor() { - return ActiveMQJMSVendor.INSTANCE; - } - private EncodedMessage encodeMessage(Message message) { byte[] encodeBuffer = new byte[1024 * 8]; int encodedSize;
