http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java index 9dd29ab..629c499 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,27 +16,50 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import javax.jms.BytesMessage; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import java.io.Serializable; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import java.util.Set; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; +import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; public class JMSMappingInboundTransformer extends InboundTransformer { - public JMSMappingInboundTransformer(JMSVendor vendor) { - super(vendor); + public JMSMappingInboundTransformer(IDGenerator idGenerator) { + super(idGenerator); } @Override @@ -46,75 +69,128 @@ public class JMSMappingInboundTransformer extends InboundTransformer { @Override public InboundTransformer getFallbackTransformer() { - return new AMQPNativeInboundTransformer(getVendor()); + return new AMQPNativeInboundTransformer(idGenerator); } - @SuppressWarnings({"unchecked"}) @Override - public Message transform(EncodedMessage amqpMessage) throws Exception { - org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); + public ServerJMSMessage transform(EncodedMessage encodedMessage) throws Exception { + ServerJMSMessage transformedMessage = null; + + try { + Message amqpMessage = encodedMessage.decode(); + transformedMessage = createServerMessage(amqpMessage); + populateMessage(transformedMessage, amqpMessage); + } catch (Exception ex) { + InboundTransformer transformer = this.getFallbackTransformer(); + + while (transformer != null) { + try { + transformedMessage = transformer.transform(encodedMessage); + break; + } catch (Exception e) { + transformer = transformer.getFallbackTransformer(); + } + } + } + + // Regardless of the transformer that finally decoded the message we need to ensure that + // the AMQP Message Format value is preserved for application on retransmit. + if (transformedMessage != null && encodedMessage.getMessageFormat() != 0) { + transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, encodedMessage.getMessageFormat()); + } + + return transformedMessage; + } + + @SuppressWarnings("unchecked") + private ServerJMSMessage createServerMessage(Message message) throws Exception { + + Section body = message.getBody(); + ServerJMSMessage result; - Message rc; - final Section body = amqp.getBody(); if (body == null) { - rc = vendor.createMessage(); + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { + result = createObjectMessage(idGenerator); + } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) { + result = createBytesMessage(idGenerator); + } else { + Charset charset = getCharsetForTextualContent(message.getContentType()); + if (charset != null) { + result = createTextMessage(idGenerator); + } else { + result = createMessage(idGenerator); + } + } + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); } else if (body instanceof Data) { - Binary d = ((Data) body).getValue(); - BytesMessage m = vendor.createBytesMessage(); - m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); - rc = m; + Binary payload = ((Data) body).getValue(); + + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { + result = createObjectMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) { + result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } else { + Charset charset = getCharsetForTextualContent(message.getContentType()); + if (StandardCharsets.UTF_8.equals(charset)) { + ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + + try { + CharBuffer chars = charset.newDecoder().decode(buf); + result = createTextMessage(idGenerator, String.valueOf(chars)); + } catch (CharacterCodingException e) { + result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + } else { + result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + } + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); } else if (body instanceof AmqpSequence) { AmqpSequence sequence = (AmqpSequence) body; - StreamMessage m = vendor.createStreamMessage(); + ServerJMSStreamMessage m = createStreamMessage(idGenerator); for (Object item : sequence.getValue()) { m.writeObject(item); } - rc = m; - m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_SEQUENCE); + + result = m; + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); } else if (body instanceof AmqpValue) { Object value = ((AmqpValue) body).getValue(); - if (value == null) { - rc = vendor.createObjectMessage(); - } - if (value instanceof String) { - TextMessage m = vendor.createTextMessage(); - m.setText((String) value); - rc = m; + if (value == null || value instanceof String) { + result = createTextMessage(idGenerator, (String) value); + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING); } else if (value instanceof Binary) { - Binary d = (Binary) value; - BytesMessage m = vendor.createBytesMessage(); - m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); - rc = m; + Binary payload = (Binary) value; + + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { + result = createObjectMessage(idGenerator, payload); + } else { + result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); } else if (value instanceof List) { - StreamMessage m = vendor.createStreamMessage(); + ServerJMSStreamMessage m = createStreamMessage(idGenerator); for (Object item : (List<Object>) value) { m.writeObject(item); } - rc = m; - m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_LIST); + result = m; + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST); } else if (value instanceof Map) { - MapMessage m = vendor.createMapMessage(); - final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet(); - for (Map.Entry<String, Object> entry : set) { - m.setObject(entry.getKey(), entry.getValue()); - } - rc = m; + result = createMapMessage(idGenerator, (Map<String, Object>) value); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP); } else { - ObjectMessage m = vendor.createObjectMessage(); - m.setObject((Serializable) value); - rc = m; + // Trigger fall-back to native encoder which generates BytesMessage with the + // original message stored in the message body. + throw new ActiveMQAMQPInternalErrorException("Unable to encode to ActiveMQ JMS Message"); } } else { throw new RuntimeException("Unexpected body type: " + body.getClass()); } - rc.setJMSDeliveryMode(defaultDeliveryMode); - rc.setJMSPriority(defaultPriority); - rc.setJMSExpiration(defaultTtl); - - populateMessage(rc, amqp); - rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); - rc.setBooleanProperty(prefixVendor + "NATIVE", false); - return rc; + return result; } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 9f28a6b..9ee0344 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,30 +16,61 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageEOFException; -import javax.jms.ObjectMessage; import javax.jms.Queue; -import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Date; -import java.util.Enumeration; -import java.util.HashMap; import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; @@ -54,12 +85,16 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.WritableBuffer; import org.jboss.logging.Logger; public class JMSMappingOutboundTransformer extends OutboundTransformer { private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class); + public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); @@ -68,227 +103,458 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_TOPIC_TYPE = 0x03; - public JMSMappingOutboundTransformer(JMSVendor vendor) { - super(vendor); + // For now Proton requires that we create a decoder to create an encoder + private static class EncoderDecoderPair { + DecoderImpl decoder = new DecoderImpl(); + EncoderImpl encoder = new EncoderImpl(decoder); + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); + } } - /** - * Perform the conversion between JMS Message and Proton Message without - * re-encoding it to array. This is needed because some frameworks may elect - * to do this on their own way (Netty for instance using Nettybuffers) - * - * @param msg - * @return - * @throws Exception - */ - public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException { - Header header = new Header(); - Properties props = new Properties(); - HashMap<Symbol, Object> daMap = null; - HashMap<Symbol, Object> maMap = null; - HashMap apMap = null; - Section body = null; - HashMap footerMap = null; - if (msg instanceof BytesMessage) { - BytesMessage m = (BytesMessage) msg; - byte[] data = new byte[(int) m.getBodyLength()]; - m.readBytes(data); - m.reset(); // Need to reset after readBytes or future readBytes - // calls (ex: redeliveries) will fail and return -1 - body = new Data(new Binary(data)); - } - if (msg instanceof TextMessage) { - body = new AmqpValue(((TextMessage) msg).getText()); - } - if (msg instanceof MapMessage) { - final HashMap<String, Object> map = new HashMap<>(); - final MapMessage m = (MapMessage) msg; - final Enumeration<String> names = m.getMapNames(); - while (names.hasMoreElements()) { - String key = names.nextElement(); - map.put(key, m.getObject(key)); - } - body = new AmqpValue(map); + private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() { + @Override + protected EncoderDecoderPair initialValue() { + return new EncoderDecoderPair(); } - if (msg instanceof StreamMessage) { - ArrayList<Object> list = new ArrayList<>(); - final StreamMessage m = (StreamMessage) msg; - try { - while (true) { - list.add(m.readObject()); - } - } catch (MessageEOFException e) { - } + }; - String amqpType = msg.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY); - if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) { - body = new AmqpValue(list); - } else { - body = new AmqpSequence(list); - } - } - if (msg instanceof ObjectMessage) { - body = new AmqpValue(((ObjectMessage) msg).getObject()); + public JMSMappingOutboundTransformer(IDGenerator idGenerator) { + super(idGenerator); + } + + @Override + public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException { + if (message == null) { + return 0; } - if (body == null && msg instanceof ServerJMSMessage) { + long messageFormat = 0; + Header header = null; + Properties properties = null; + Map<Symbol, Object> daMap = null; + Map<Symbol, Object> maMap = null; + Map<String, Object> apMap = null; + Map<Object, Object> footerMap = null; - MessageInternal internalMessage = ((ServerJMSMessage) msg).getInnerMessage(); - if (!internalMessage.containsProperty("AMQP_MESSAGE_FORMAT")) { - int readerIndex = internalMessage.getBodyBuffer().readerIndex(); - try { - Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); - if (s != null) { - body = new AmqpValue(s.toString()); - } - } catch (Throwable ignored) { - logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored); - } finally { - internalMessage.getBodyBuffer().readerIndex(readerIndex); - } + Section body = convertBody(message); + + if (message.getInnerMessage().isDurable()) { + if (header == null) { + header = new Header(); } + header.setDurable(true); } - - header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); - header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); - if (msg.getJMSType() != null) { - props.setSubject(msg.getJMSType()); + byte priority = (byte) message.getJMSPriority(); + if (priority != Message.DEFAULT_PRIORITY) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); } - if (msg.getJMSMessageID() != null) { - - String msgId = msg.getJMSMessageID(); - + String type = message.getJMSType(); + if (type != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setSubject(type); + } + String messageId = message.getJMSMessageID(); + if (messageId != null) { + if (properties == null) { + properties = new Properties(); + } try { - props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId)); + properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId)); } catch (ActiveMQAMQPIllegalStateException e) { - props.setMessageId(msgId); + properties.setMessageId(messageId); } } - if (msg.getJMSDestination() != null) { - props.setTo(vendor.toAddress(msg.getJMSDestination())); + Destination destination = message.getJMSDestination(); + if (destination != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setTo(toAddress(destination)); if (maMap == null) { - maMap = new HashMap<>(); + maMap = new HashMap<Symbol, Object>(); } - maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination())); + maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); } - if (msg.getJMSReplyTo() != null) { - props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo())); + Destination replyTo = message.getJMSReplyTo(); + if (replyTo != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyTo(toAddress(replyTo)); if (maMap == null) { - maMap = new HashMap<>(); + maMap = new HashMap<Symbol, Object>(); } - maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo())); + maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); } - if (msg.getJMSCorrelationID() != null) { - String correlationId = msg.getJMSCorrelationID(); - + String correlationId = message.getJMSCorrelationID(); + if (correlationId != null) { + if (properties == null) { + properties = new Properties(); + } try { - props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); + properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); } catch (ActiveMQAMQPIllegalStateException e) { - props.setCorrelationId(correlationId); + properties.setCorrelationId(correlationId); } } - if (msg.getJMSExpiration() != 0) { - long ttl = msg.getJMSExpiration() - System.currentTimeMillis(); + long expiration = message.getJMSExpiration(); + if (expiration != 0) { + long ttl = expiration - System.currentTimeMillis(); if (ttl < 0) { ttl = 1; } + + if (header == null) { + header = new Header(); + } header.setTtl(new UnsignedInteger((int) ttl)); - props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration())); + if (properties == null) { + properties = new Properties(); + } + properties.setAbsoluteExpiryTime(new Date(expiration)); } - if (msg.getJMSTimestamp() != 0) { - props.setCreationTime(new Date(msg.getJMSTimestamp())); + long timeStamp = message.getJMSTimestamp(); + if (timeStamp != 0) { + if (properties == null) { + properties = new Properties(); + } + properties.setCreationTime(new Date(timeStamp)); } - final Enumeration<String> keys = msg.getPropertyNames(); - while (keys.hasMoreElements()) { - String key = keys.nextElement(); - if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) { - // skip.. - } else if (key.equals(firstAcquirerKey)) { - header.setFirstAcquirer(msg.getBooleanProperty(key)); - } else if (key.startsWith("JMSXDeliveryCount")) { - // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. - int amqpDeliveryCount = msg.getIntProperty(key) - 1; - if (amqpDeliveryCount > 0) { - header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); - } - } else if (key.startsWith("JMSXUserID")) { - String value = msg.getStringProperty(key); - props.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8))); - } else if (key.startsWith("JMSXGroupID") || key.startsWith("_AMQ_GROUP_ID")) { - String value = msg.getStringProperty(key); - props.setGroupId(value); - if (apMap == null) { - apMap = new HashMap(); - } - apMap.put(key, value); - } else if (key.startsWith("JMSXGroupSeq")) { - UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key)); - props.setGroupSequence(value); - if (apMap == null) { - apMap = new HashMap(); - } - apMap.put(key, value); - } else if (key.startsWith(prefixDeliveryAnnotationsKey)) { - if (daMap == null) { - daMap = new HashMap<>(); + final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage()); + for (String key : keySet) { + if (key.startsWith("JMSX")) { + if (key.equals("JMSXDeliveryCount")) { + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. + int amqpDeliveryCount = message.getDeliveryCount() - 1; + if (amqpDeliveryCount > 0) { + if (header == null) { + header = new Header(); + } + header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); + } + continue; + } else if (key.equals("JMSXUserID")) { + String value = message.getStringProperty(key); + if (properties == null) { + properties = new Properties(); + } + properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8))); + continue; + } else if (key.equals("JMSXGroupID")) { + String value = message.getStringProperty(key); + if (properties == null) { + properties = new Properties(); + } + properties.setGroupId(value); + continue; + } else if (key.equals("JMSXGroupSeq")) { + UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key)); + if (properties == null) { + properties = new Properties(); + } + properties.setGroupSequence(value); + continue; } - String name = key.substring(prefixDeliveryAnnotationsKey.length()); - daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); - } else if (key.startsWith(prefixMessageAnnotationsKey)) { - if (maMap == null) { - maMap = new HashMap<>(); + } else if (key.startsWith(JMS_AMQP_PREFIX)) { + // AMQP Message Information stored from a conversion to the Core Message + if (key.equals(JMS_AMQP_MESSAGE_FORMAT)) { + messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT); + continue; + } else if (key.equals(JMS_AMQP_NATIVE)) { + // skip..internal use only + continue; + } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) { + // skip..internal use only + continue; + } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { + if (header == null) { + header = new Header(); + } + header.setFirstAcquirer(message.getBooleanProperty(key)); + continue; + } else if (key.equals(JMS_AMQP_HEADER)) { + if (header == null) { + header = new Header(); + } + continue; + } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { + if (properties == null) { + properties = new Properties(); + } + continue; + } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) { + if (daMap == null) { + daMap = new HashMap<Symbol, Object>(); + } + String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); + daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); + continue; + } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) { + if (maMap == null) { + maMap = new HashMap<Symbol, Object>(); + } + String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); + maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); + continue; + } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentType(Symbol.getSymbol(message.getStringProperty(key))); + continue; + } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key))); + continue; + } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyToGroupId(message.getStringProperty(key)); + continue; + } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) { + if (footerMap == null) { + footerMap = new HashMap<Object, Object>(); + } + String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); + footerMap.put(name, message.getObjectProperty(key)); + continue; } - String name = key.substring(prefixMessageAnnotationsKey.length()); - maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); - } else if (key.equals(contentTypeKey)) { - props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); - } else if (key.equals(contentEncodingKey)) { - props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key))); - } else if (key.equals(replyToGroupIDKey)) { - props.setReplyToGroupId(msg.getStringProperty(key)); - } else if (key.startsWith(prefixFooterKey)) { - if (footerMap == null) { - footerMap = new HashMap(); + } else if (key.equals("_AMQ_GROUP_ID")) { + String value = message.getStringProperty(key); + if (properties == null) { + properties = new Properties(); } - String name = key.substring(prefixFooterKey.length()); - footerMap.put(name, msg.getObjectProperty(key)); + properties.setGroupId(value); + continue; + } else if (key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) { + // skip..internal use only + continue; + } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) { + // skip..remove annotation from previous inbound transformation + continue; } else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) { - // skip - } else { - if (apMap == null) { - apMap = new HashMap(); - } - Object objectProperty = msg.getObjectProperty(key); - if (objectProperty instanceof byte[]) { - Binary binary = new Binary((byte[]) objectProperty); - apMap.put(key, binary); - } else { - apMap.put(key, objectProperty); - } + // skip..internal use only - TODO - Remove this deprecated value in future release. + continue; + } + + if (apMap == null) { + apMap = new HashMap<String, Object>(); } + + Object objectProperty = message.getObjectProperty(key); + if (objectProperty instanceof byte[]) { + objectProperty = new Binary((byte[]) objectProperty); + } + + apMap.put(key, objectProperty); } - MessageAnnotations ma = null; - if (maMap != null) { - ma = new MessageAnnotations(maMap); + EncoderImpl encoder = tlsCodec.get().encoder; + encoder.setByteBuffer(buffer); + + if (header != null) { + encoder.writeObject(header); } - DeliveryAnnotations da = null; if (daMap != null) { - da = new DeliveryAnnotations(daMap); + encoder.writeObject(new DeliveryAnnotations(daMap)); + } + if (maMap != null) { + encoder.writeObject(new MessageAnnotations(maMap)); + } + if (properties != null) { + encoder.writeObject(properties); } - ApplicationProperties ap = null; if (apMap != null) { - ap = new ApplicationProperties(apMap); + encoder.writeObject(new ApplicationProperties(apMap)); + } + if (body != null) { + encoder.writeObject(body); } - Footer footer = null; if (footerMap != null) { - footer = new Footer(footerMap); + encoder.writeObject(new Footer(footerMap)); + } + + return messageFormat; + } + + private Section convertBody(ServerJMSMessage message) throws JMSException { + + Section body = null; + short orignalEncoding = AMQP_UNKNOWN; + + try { + orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING); + } catch (Exception ex) { + // Ignore and stick with UNKNOWN + } + + if (message instanceof ServerJMSBytesMessage) { + Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message); + + if (payload == null) { + payload = EMPTY_BINARY; + } + + switch (orignalEncoding) { + case AMQP_NULL: + break; + case AMQP_VALUE_BINARY: + body = new AmqpValue(payload); + break; + case AMQP_DATA: + case AMQP_UNKNOWN: + default: + body = new Data(payload); + break; + } + } else if (message instanceof ServerJMSTextMessage) { + switch (orignalEncoding) { + case AMQP_NULL: + break; + case AMQP_DATA: + body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message)); + break; + case AMQP_VALUE_STRING: + case AMQP_UNKNOWN: + default: + body = new AmqpValue(((TextMessage) message).getText()); + break; + } + } else if (message instanceof ServerJMSMapMessage) { + body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message)); + } else if (message instanceof ServerJMSStreamMessage) { + ArrayList<Object> list = new ArrayList<>(); + final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message; + try { + while (true) { + list.add(m.readObject()); + } + } catch (MessageEOFException e) { + } + + // Deprecated encoding markers - TODO - Remove on future release + if (orignalEncoding == AMQP_UNKNOWN) { + String amqpType = message.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY); + if (amqpType != null) { + if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) { + orignalEncoding = AMQP_VALUE_LIST; + } else { + orignalEncoding = AMQP_SEQUENCE; + } + } + } + + switch (orignalEncoding) { + case AMQP_SEQUENCE: + body = new AmqpSequence(list); + break; + case AMQP_VALUE_LIST: + case AMQP_UNKNOWN: + default: + body = new AmqpValue(list); + break; + } + } else if (message instanceof ServerJMSObjectMessage) { + Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message); + + if (payload == null) { + payload = EMPTY_BINARY; + } + + switch (orignalEncoding) { + case AMQP_VALUE_BINARY: + body = new AmqpValue(payload); + break; + case AMQP_DATA: + case AMQP_UNKNOWN: + default: + body = new Data(payload); + break; + } + + // For a non-AMQP message we tag the outbound content type as containing + // a serialized Java object so that an AMQP client has a hint as to what + // we are sending it. + if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) { + message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + } + } else if (message instanceof ServerJMSMessage) { + // If this is not an AMQP message that was converted then the original encoding + // will be unknown so we check for special cases of messages with special data + // encoded into the server message body. + if (orignalEncoding == AMQP_UNKNOWN) { + MessageInternal internalMessage = message.getInnerMessage(); + int readerIndex = internalMessage.getBodyBuffer().readerIndex(); + try { + Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); + if (s != null) { + body = new AmqpValue(s.toString()); + } + } catch (Throwable ignored) { + logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored); + } finally { + internalMessage.getBodyBuffer().readerIndex(readerIndex); + } + } + } + + return body; + } + + private Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException { + byte[] data = new byte[(int) message.getBodyLength()]; + message.readBytes(data); + message.reset(); // Need to reset after readBytes or future readBytes + + return new Binary(data); + } + + private Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException { + Binary result = null; + String text = message.getText(); + if (text != null) { + result = new Binary(text.getBytes(StandardCharsets.UTF_8)); + } + + return result; + } + + private Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { + message.getInnerMessage().getBodyBuffer().resetReaderIndex(); + int size = message.getInnerMessage().getBodyBuffer().readInt(); + byte[] bytes = new byte[size]; + message.getInnerMessage().getBodyBuffer().readBytes(bytes); + + return new Binary(bytes); + } + + private Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException { + final HashMap<String, Object> map = new LinkedHashMap<>(); + + @SuppressWarnings("unchecked") + final Enumeration<String> names = message.getMapNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + Object value = message.getObject(key); + if (value instanceof byte[]) { + value = new Binary((byte[]) value); + } + map.put(key, value); } - return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer); + return map; } private static byte destinationType(Destination destination) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java deleted file mode 100644 index 9a0ed63..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -public interface JMSVendor { - - BytesMessage createBytesMessage(); - - StreamMessage createStreamMessage(); - - Message createMessage(); - - TextMessage createTextMessage(); - - ObjectMessage createObjectMessage(); - - MapMessage createMapMessage(); - - void setJMSXUserID(Message message, String value); - - Destination createDestination(String name); - - void setJMSXGroupID(Message message, String groupId); - - void setJMSXGroupSequence(Message message, int value); - - void setJMSXDeliveryCount(Message message, long value); - - String toAddress(Destination destination); - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java index 310d4ba..f15490f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,54 +16,36 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -public abstract class OutboundTransformer { - - JMSVendor vendor; - String prefixVendor; +import java.io.UnsupportedEncodingException; - String prefixDeliveryAnnotations = "DA_"; - String prefixMessageAnnotations = "MA_"; - String prefixFooter = "FT_"; +import javax.jms.JMSException; - String messageFormatKey; - String nativeKey; - String firstAcquirerKey; - String prefixDeliveryAnnotationsKey; - String prefixMessageAnnotationsKey; - String contentTypeKey; - String contentEncodingKey; - String replyToGroupIDKey; - String prefixFooterKey; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.qpid.proton.codec.WritableBuffer; - public OutboundTransformer(JMSVendor vendor) { - this.vendor = vendor; - this.setPrefixVendor("JMS_AMQP_"); - } - - public String getPrefixVendor() { - return prefixVendor; - } - - public void setPrefixVendor(String prefixVendor) { - this.prefixVendor = prefixVendor; +public abstract class OutboundTransformer { - messageFormatKey = prefixVendor + "MESSAGE_FORMAT"; - nativeKey = prefixVendor + "NATIVE"; - firstAcquirerKey = prefixVendor + "FirstAcquirer"; - prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; - prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; - contentTypeKey = prefixVendor + "ContentType"; - contentEncodingKey = prefixVendor + "ContentEncoding"; - replyToGroupIDKey = prefixVendor + "ReplyToGroupID"; - prefixFooterKey = prefixVendor + prefixFooter; + protected IDGenerator idGenerator; + public OutboundTransformer(IDGenerator idGenerator) { + this.idGenerator = idGenerator; } - public JMSVendor getVendor() { - return vendor; - } + /** + * Given an JMS Message perform a conversion to an AMQP Message and encode into a form that + * is ready for transmission. + * + * @param message + * the message to transform + * @param buffer + * the buffer where encoding should write to + * + * @return the message format key of the encoded message. + * + * @throws Exception + * if an error occurs during message transformation + */ + public abstract long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException; - public void setVendor(JMSVendor vendor) { - this.vendor = vendor; - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java new file mode 100644 index 0000000..4def92c --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidContentTypeException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.exceptions; + +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.qpid.proton.amqp.transport.AmqpError; + +public class ActiveMQAMQPInvalidContentTypeException extends ActiveMQAMQPException { + + public ActiveMQAMQPInvalidContentTypeException(String message) { + super(AmqpError.INTERNAL_ERROR, message, ActiveMQExceptionType.INTERNAL_ERROR); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 7d401fa..94e6a47 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -19,8 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.util.Map; import java.util.Objects; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -52,9 +50,11 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.message.ProtonJMessage; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class); @@ -407,33 +407,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return 0; } - //encode the message - ProtonJMessage serverMessage; - try { - // This can be done a lot better here - serverMessage = sessionSPI.encodeMessage(message, deliveryCount); - } catch (Throwable e) { - log.warn(e.getMessage(), e); - throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); - } - - return performSend(serverMessage, message); - } - - private static boolean hasCapabilities(Symbol symbol, Source source) { - if (source != null) { - if (source.getCapabilities() != null) { - for (Symbol cap : source.getCapabilities()) { - if (symbol.equals(cap)) { - return true; - } - } - } - } - return false; - } - - protected int performSend(ProtonJMessage serverMessage, Object context) { if (!creditsSemaphore.tryAcquire()) { try { creditsSemaphore.acquire(); @@ -444,22 +417,32 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers + // presettle means we can settle the message on the dealer side before we send it, i.e. for browsers boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; - //we only need a tag if we are going to ack later + // we only need a tag if we are going to settle later byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); try { - serverMessage.encode(new NettyWritable(nettyBuffer)); + long messageFormat = 0; + + // Encode the Server Message into the given Netty Buffer as an AMQP + // Message transformed from the internal message model. + try { + messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer)); + } catch (Throwable e) { + log.warn(e.getMessage(), e); + throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); + } int size = nettyBuffer.writerIndex(); synchronized (connection.getLock()) { final Delivery delivery; delivery = sender.delivery(tag, 0, tag.length); - delivery.setContext(context); + delivery.setMessageFormat((int) messageFormat); + delivery.setContext(message); // this will avoid a copy.. patch provided by Norman using buffer.array() sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); @@ -479,6 +462,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } + private static boolean hasCapabilities(Symbol symbol, Source source) { + if (source != null) { + if (source.getCapabilities() != null) { + for (Symbol cap : source.getCapabilities()) { + if (symbol.equals(cap)) { + return true; + } + } + } + } + return false; + } + private static String createQueueName(String clientId, String pubId) { return clientId + "." + pubId; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 148482e..96ce90e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -16,32 +16,31 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter; -import java.io.ByteArrayOutputStream; +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage; + import java.io.IOException; -import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.utils.SimpleIDGenerator; -import org.apache.blacklist.ABadClass; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; @@ -53,10 +52,13 @@ import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Test; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + public class TestConversions extends Assert { @Test - public void testObjectMessageWhiteList() throws Exception { + public void testAmqpValueOfBooleanIsPassedThrough() throws Exception { Map<String, Object> mapprop = createPropertiesMap(); ApplicationProperties properties = new ApplicationProperties(mapprop); MessageImpl message = (MessageImpl) Message.Factory.create(); @@ -73,39 +75,15 @@ public class TestConversions extends Assert { EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSObjectMessage serverMessage = (ServerJMSObjectMessage) converter.inboundJMSType(encodedMessage); + ServerMessage serverMessage = converter.inbound(encodedMessage); - verifyProperties(serverMessage); + verifyProperties(new ServerJMSMessage(serverMessage, 0)); - assertEquals(true, serverMessage.getObject()); + EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); + Message amqpMessage = encoded.decode(); - Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); - - AmqpValue value = (AmqpValue) ((Message) obj).getBody(); + AmqpValue value = (AmqpValue) amqpMessage.getBody(); assertEquals(value.getValue(), true); - - } - - @Test - public void testObjectMessageNotOnWhiteList() throws Exception { - - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessageImpl message = new ServerMessageImpl(1, 1024); - message.setType((byte) 2); - ServerJMSObjectMessage serverMessage = new ServerJMSObjectMessage(message, 1024); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream ois = new ObjectOutputStream(out); - ois.writeObject(new ABadClass()); - byte[] src = out.toByteArray(); - serverMessage.getInnerMessage().getBodyBuffer().writeInt(src.length); - serverMessage.getInnerMessage().getBodyBuffer().writeBytes(src); - - try { - converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); - fail("should throw ClassNotFoundException"); - } catch (ClassNotFoundException e) { - //ignore - } } @Test @@ -126,22 +104,23 @@ public class TestConversions extends Assert { EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSBytesMessage serverMessage = (ServerJMSBytesMessage) converter.inboundJMSType(encodedMessage); + ServerMessage serverMessage = converter.inbound(encodedMessage); - verifyProperties(serverMessage); + ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0); - assertEquals(bodyBytes.length, serverMessage.getBodyLength()); + verifyProperties(bytesMessage); + + assertEquals(bodyBytes.length, bytesMessage.getBodyLength()); byte[] newBodyBytes = new byte[4]; - serverMessage.readBytes(newBodyBytes); + bytesMessage.readBytes(newBodyBytes); Assert.assertArrayEquals(bodyBytes, newBodyBytes); - Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); + Object obj = converter.outbound(serverMessage, 0); System.out.println("output = " + obj); - } private void verifyProperties(javax.jms.Message message) throws Exception { @@ -175,25 +154,25 @@ public class TestConversions extends Assert { EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSMapMessage serverMessage = (ServerJMSMapMessage) converter.inboundJMSType(encodedMessage); + ServerMessage serverMessage = converter.inbound(encodedMessage); - verifyProperties(serverMessage); + ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0); + mapMessage.decode(); - Assert.assertEquals(1, serverMessage.getInt("someint")); - Assert.assertEquals("value", serverMessage.getString("somestr")); + verifyProperties(mapMessage); - Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); + Assert.assertEquals(1, mapMessage.getInt("someint")); + Assert.assertEquals("value", mapMessage.getString("somestr")); - reEncodeMsg(obj); + EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); + Message amqpMessage = encoded.decode(); - MessageImpl outMessage = (MessageImpl) obj; - AmqpValue value = (AmqpValue) outMessage.getBody(); - Map mapoutput = (Map) value.getValue(); + AmqpValue value = (AmqpValue) amqpMessage.getBody(); + Map<?, ?> mapoutput = (Map<?, ?>) value.getValue(); assertEquals(Integer.valueOf(1), mapoutput.get("someint")); - System.out.println("output = " + obj); - + System.out.println("output = " + amqpMessage); } @Test @@ -212,26 +191,25 @@ public class TestConversions extends Assert { EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSStreamMessage serverMessage = (ServerJMSStreamMessage) converter.inboundJMSType(encodedMessage); + ServerMessage serverMessage = converter.inbound(encodedMessage); simulatePersistence(serverMessage); - verifyProperties(serverMessage); + ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0); - serverMessage.reset(); + verifyProperties(streamMessage); - assertEquals(10, serverMessage.readInt()); - assertEquals("10", serverMessage.readString()); + streamMessage.reset(); - Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); + assertEquals(10, streamMessage.readInt()); + assertEquals("10", streamMessage.readString()); - reEncodeMsg(obj); + EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); + Message amqpMessage = encoded.decode(); - MessageImpl outMessage = (MessageImpl) obj; - List list = ((AmqpSequence) outMessage.getBody()).getValue(); + List<?> list = ((AmqpSequence) amqpMessage.getBody()).getValue(); Assert.assertEquals(Integer.valueOf(10), list.get(0)); Assert.assertEquals("10", list.get(1)); - } @Test @@ -247,33 +225,33 @@ public class TestConversions extends Assert { EncodedMessage encodedMessage = encodeMessage(message); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerJMSTextMessage serverMessage = (ServerJMSTextMessage) converter.inboundJMSType(encodedMessage); + ServerMessage serverMessage = converter.inbound(encodedMessage); simulatePersistence(serverMessage); - verifyProperties(serverMessage); + ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0); + textMessage.decode(); - Assert.assertEquals(text, serverMessage.getText()); + verifyProperties(textMessage); - Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); + Assert.assertEquals(text, textMessage.getText()); - reEncodeMsg(obj); + EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); + Message amqpMessage = encoded.decode(); - MessageImpl outMessage = (MessageImpl) obj; - AmqpValue value = (AmqpValue) outMessage.getBody(); + AmqpValue value = (AmqpValue) amqpMessage.getBody(); String textValue = (String) value.getValue(); Assert.assertEquals(text, textValue); - System.out.println("output = " + obj); - + System.out.println("output = " + amqpMessage); } - private void simulatePersistence(ServerJMSMessage serverMessage) { - serverMessage.getInnerMessage().setAddress(new SimpleString("jms.queue.SomeAddress")); + private void simulatePersistence(ServerMessage serverMessage) { + serverMessage.setAddress(new SimpleString("jms.queue.SomeAddress")); // This is just to simulate what would happen during the persistence of the message // We need to still be able to recover the message when we read it back - ((EncodingSupport) serverMessage.getInnerMessage()).encode(new EmptyBuffer()); + ((EncodingSupport) serverMessage).encode(new EmptyBuffer()); } private ProtonJMessage reEncodeMsg(Object obj) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java new file mode 100644 index 0000000..4caead7 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter.message; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; +import org.junit.Test; + +public class AMQPContentTypeSupportTest { + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeWithOnlyType() throws Exception { + doParseContentTypeTestImpl("type", null); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeEndsWithSlash() throws Exception { + doParseContentTypeTestImpl("type/", null); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeMissingSubtype() throws Exception { + doParseContentTypeTestImpl("type/;", null); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeEmptyString() throws Exception { + doParseContentTypeTestImpl("", null); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeNullString() throws Exception { + doParseContentTypeTestImpl(null, null); + } + + @Test + public void testParseContentTypeNoParamsAfterSeparatorNonTextual() throws Exception { + // Expect null as this is not a textual type + doParseContentTypeTestImpl("type/subtype;", null); + } + + @Test + public void testParseContentTypeNoParamsAfterSeparatorTextualType() throws Exception { + doParseContentTypeTestImpl("text/plain;", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeEmptyParamsAfterSeparator() throws Exception { + doParseContentTypeTestImpl("text/plain;;", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeNoParams() throws Exception { + doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithCharsetUtf8() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithCharsetAscii() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII); + } + + @Test + public void testParseContentTypeWithMultipleParams() throws Exception { + doParseContentTypeTestImpl("text/plain; param=value; charset=us-ascii", StandardCharsets.US_ASCII); + } + + @Test + public void testParseContentTypeWithCharsetQuoted() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=\"us-ascii\"", StandardCharsets.US_ASCII); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeWithCharsetQuotedEmpty() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=\"\"", null); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeWithCharsetQuoteNotClosed() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=\"unclosed", null); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeWithCharsetQuoteNotClosedEmpty() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=\"", null); + } + + @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) + public void testParseContentTypeWithNoCharsetValue() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=", null); + } + + @Test + public void testParseContentTypeWithTextPlain() throws Exception { + doParseContentTypeTestImpl("text/plain;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithTextJson() throws Exception { + doParseContentTypeTestImpl("text/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("text/json;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("text/json;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("text/json", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithTextHtml() throws Exception { + doParseContentTypeTestImpl("text/html;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("text/html;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("text/html;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("text/html", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithTextFoo() throws Exception { + doParseContentTypeTestImpl("text/foo;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("text/foo;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("text/foo;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("text/foo", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationJson() throws Exception { + doParseContentTypeTestImpl("application/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("application/json;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("application/json;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("application/json", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationJsonVariant() throws Exception { + doParseContentTypeTestImpl("application/something+json;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("application/something+json;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("application/something+json;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("application/something+json", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationJavascript() throws Exception { + doParseContentTypeTestImpl("application/javascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("application/javascript;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("application/javascript;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("application/javascript", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationEcmascript() throws Exception { + doParseContentTypeTestImpl("application/ecmascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("application/ecmascript;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("application/ecmascript;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("application/ecmascript", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationXml() throws Exception { + doParseContentTypeTestImpl("application/xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("application/xml;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("application/xml;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("application/xml", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationXmlVariant() throws Exception { + doParseContentTypeTestImpl("application/something+xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("application/something+xml;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("application/something+xml;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("application/something+xml", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationXmlDtd() throws Exception { + doParseContentTypeTestImpl("application/xml-dtd;charset=iso-8859-1", StandardCharsets.ISO_8859_1); + doParseContentTypeTestImpl("application/xml-dtd;charset=us-ascii", StandardCharsets.US_ASCII); + doParseContentTypeTestImpl("application/xml-dtd;charset=utf-8", StandardCharsets.UTF_8); + doParseContentTypeTestImpl("application/xml-dtd", StandardCharsets.UTF_8); + } + + @Test + public void testParseContentTypeWithApplicationOtherNotTextual() throws Exception { + // Expect null as this is not a textual type + doParseContentTypeTestImpl("application/other", null); + } + + @Test + public void testParseContentTypeWithApplicationOctetStream() throws Exception { + // Expect null as this is not a textual type + doParseContentTypeTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, null); + } + + @Test + public void testParseContentTypeWithApplicationJavaSerialized() throws Exception { + // Expect null as this is not a textual type + doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null); + } + + private void doParseContentTypeTestImpl(String contentType, Charset expected) throws ActiveMQAMQPInvalidContentTypeException { + Charset charset = AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType); + if (expected == null) { + assertNull("Expected no charset, but got:" + charset, charset); + } else { + assertEquals("Charset not as expected", expected, charset); + } + } +}
