reorg on converter
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/19608cb4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/19608cb4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/19608cb4 Branch: refs/heads/artemis-1009 Commit: 19608cb4c3889e7741efbbdaf5774cd59da679c9 Parents: e9b731b Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Thu Mar 2 20:04:14 2017 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Mar 2 20:04:30 2017 -0500 ---------------------------------------------------------------------- .../amqp/converter/AMQPContentTypeSupport.java | 146 ++++++++++ .../amqp/converter/AMQPMessageIdHelper.java | 252 ++++++++++++++++++ .../amqp/converter/AMQPMessageSupport.java | 264 +++++++++++++++++++ .../amqp/converter/AmqpCoreConverter.java | 57 ++-- .../amqp/converter/CoreAmqpConverter.java | 61 +++-- .../message/AMQPContentTypeSupport.java | 146 ---------- .../converter/message/AMQPMessageIdHelper.java | 252 ------------------ .../converter/message/AMQPMessageSupport.java | 264 ------------------- .../message/AMQPContentTypeSupportTest.java | 8 +- .../message/AMQPMessageIdHelperTest.java | 11 +- .../message/AMQPMessageSupportTest.java | 11 +- .../JMSMappingInboundTransformerTest.java | 1 + .../JMSMappingOutboundTransformerTest.java | 13 +- 13 files changed, 745 insertions(+), 741 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java new file mode 100644 index 0000000..e040138 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; +import java.nio.charset.UnsupportedCharsetException; +import java.util.StringTokenizer; + +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; + +public final class AMQPContentTypeSupport { + + private static final String UTF_8 = "UTF-8"; + private static final String CHARSET = "charset"; + private static final String TEXT = "text"; + private static final String APPLICATION = "application"; + private static final String JAVASCRIPT = "javascript"; + private static final String XML = "xml"; + private static final String XML_VARIANT = "+xml"; + private static final String JSON = "json"; + private static final String JSON_VARIANT = "+json"; + private static final String XML_DTD = "xml-dtd"; + private static final String ECMASCRIPT = "ecmascript"; + + /** + * @param contentType + * the contentType of the received message + * @return the character set to use, or null if not to treat the message as text + * @throws ActiveMQAMQPInvalidContentTypeException + * if the content-type is invalid in some way. + */ + public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException { + if (contentType == null || contentType.trim().isEmpty()) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty"); + } + + int subTypeSeparator = contentType.indexOf("/"); + if (subTypeSeparator == -1) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType); + } + + final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim(); + + String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); + + String parameterPart = null; + int parameterSeparator = subTypePart.indexOf(";"); + if (parameterSeparator != -1) { + if (parameterSeparator < subTypePart.length() - 1) { + parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); + } + subTypePart = subTypePart.substring(0, parameterSeparator).trim(); + } + + if (subTypePart.isEmpty()) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType); + } + + final String subType = subTypePart; + + if (isTextual(type, subType)) { + String charset = findCharset(parameterPart); + if (charset == null) { + charset = UTF_8; + } + + if (UTF_8.equals(charset)) { + return StandardCharsets.UTF_8; + } else { + try { + return Charset.forName(charset); + } catch (IllegalCharsetNameException icne) { + throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset); + } catch (UnsupportedCharsetException uce) { + throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset); + } + } + } + + return null; + } + + // ----- Internal Content Type utilities ----------------------------------// + + private static boolean isTextual(String type, String subType) { + if (TEXT.equals(type)) { + return true; + } + + if (APPLICATION.equals(type)) { + if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT) + || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) { + return true; + } + } + + return false; + } + + private static String findCharset(String paramaterPart) { + String charset = null; + + if (paramaterPart != null) { + StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";"); + while (tokenizer.hasMoreTokens()) { + String parameter = tokenizer.nextToken().trim(); + int eqIndex = parameter.indexOf('='); + if (eqIndex != -1) { + String name = parameter.substring(0, eqIndex); + if (CHARSET.equalsIgnoreCase(name.trim())) { + String value = unquote(parameter.substring(eqIndex + 1)); + + charset = value.toUpperCase(); + break; + } + } + } + } + + return charset; + } + + private static String unquote(String s) { + if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) { + return s.substring(1, s.length() - 1); + } else { + return s; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java new file mode 100644 index 0000000..00282e0 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java @@ -0,0 +1,252 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * Helper class for identifying and converting message-id and correlation-id values between the + * AMQP types and the Strings values used by JMS. + * <p> + * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, + * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a + * string representation of these for interoperability with other AMQP clients, the following + * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID + * value:<br> + * <p> + * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br> + * {@literal "AMQP_UUID:<string representation of uuid>"}<br> + * {@literal "AMQP_ULONG:<string representation of ulong>"}<br> + * {@literal "AMQP_STRING:<string>"}<br> + * <p> + * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to + * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used + * otherwise. + * <p> + * When provided a string for conversion which attempts to identify itself as an encoded binary, + * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown. + */ +public class AMQPMessageIdHelper { + + public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper(); + + public static final String AMQP_STRING_PREFIX = "AMQP_STRING:"; + public static final String AMQP_UUID_PREFIX = "AMQP_UUID:"; + public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:"; + public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:"; + + private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length(); + private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length(); + private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length(); + private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length(); + private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); + + /** + * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes + * type information as a prefix where necessary to convey or escape the type of the provided + * object. + * + * @param messageId + * the raw messageId object to process + * @return the base string to be used in creating the actual id. + */ + public String toBaseMessageIdString(Object messageId) { + if (messageId == null) { + return null; + } else if (messageId instanceof String) { + String stringId = (String) messageId; + + // If the given string has a type encoding prefix, + // we need to escape it as an encoded string (even if + // the existing encoding prefix was also for string) + if (hasTypeEncodingPrefix(stringId)) { + return AMQP_STRING_PREFIX + stringId; + } else { + return stringId; + } + } else if (messageId instanceof UUID) { + return AMQP_UUID_PREFIX + messageId.toString(); + } else if (messageId instanceof UnsignedLong) { + return AMQP_ULONG_PREFIX + messageId.toString(); + } else if (messageId instanceof Binary) { + ByteBuffer dup = ((Binary) messageId).asByteBuffer(); + + byte[] bytes = new byte[dup.remaining()]; + dup.get(bytes); + + String hex = convertBinaryToHexString(bytes); + + return AMQP_BINARY_PREFIX + hex; + } else { + throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass()); + } + } + + /** + * Takes the provided base id string and return the appropriate amqp messageId style object. + * Converts the type based on any relevant encoding information found as a prefix. + * + * @param baseId + * the object to be converted to an AMQP MessageId value. + * @return the AMQP messageId style object + * @throws ActiveMQAMQPIllegalStateException + * if the provided baseId String indicates an encoded type but can't be converted to + * that type. + */ + public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException { + if (baseId == null) { + return null; + } + + try { + if (hasAmqpUuidPrefix(baseId)) { + String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH); + return UUID.fromString(uuidString); + } else if (hasAmqpUlongPrefix(baseId)) { + String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH); + return UnsignedLong.valueOf(longString); + } else if (hasAmqpStringPrefix(baseId)) { + return strip(baseId, AMQP_STRING_PREFIX_LENGTH); + } else if (hasAmqpBinaryPrefix(baseId)) { + String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH); + byte[] bytes = convertHexStringToBinary(hexString); + return new Binary(bytes); + } else { + // We have a string without any type prefix, transmit it as-is. + return baseId; + } + } catch (IllegalArgumentException e) { + throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value"); + } + } + + /** + * Convert the provided hex-string into a binary representation where each byte represents + * two characters of the hex string. + * <p> + * The hex characters may be upper or lower case. + * + * @param hexString + * string to convert to a binary value. + * @return a byte array containing the binary representation + * @throws IllegalArgumentException + * if the provided String is a non-even length or contains non-hex characters + */ + public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException { + int length = hexString.length(); + + // As each byte needs two characters in the hex encoding, the string must be an even + // length. + if (length % 2 != 0) { + throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString); + } + + byte[] binary = new byte[length / 2]; + + for (int i = 0; i < length; i += 2) { + char highBitsChar = hexString.charAt(i); + char lowBitsChar = hexString.charAt(i + 1); + + int highBits = hexCharToInt(highBitsChar, hexString) << 4; + int lowBits = hexCharToInt(lowBitsChar, hexString); + + binary[i / 2] = (byte) (highBits + lowBits); + } + + return binary; + } + + /** + * Convert the provided binary into a hex-string representation where each character + * represents 4 bits of the provided binary, i.e each byte requires two characters. + * <p> + * The returned hex characters are upper-case. + * + * @param bytes + * the binary value to convert to a hex String instance. + * @return a String containing a hex representation of the bytes + */ + public String convertBinaryToHexString(byte[] bytes) { + // Each byte is represented as 2 chars + StringBuilder builder = new StringBuilder(bytes.length * 2); + + for (byte b : bytes) { + // The byte will be expanded to int before shifting, replicating the + // sign bit, so mask everything beyond the first 4 bits afterwards + int highBitsInt = (b >> 4) & 0xF; + // We only want the first 4 bits + int lowBitsInt = b & 0xF; + + builder.append(HEX_CHARS[highBitsInt]); + builder.append(HEX_CHARS[lowBitsInt]); + } + + return builder.toString(); + } + + // ----- Internal implementation ------------------------------------------// + + private boolean hasTypeEncodingPrefix(String stringId) { + return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId); + } + + private boolean hasAmqpStringPrefix(String stringId) { + return stringId.startsWith(AMQP_STRING_PREFIX); + } + + private boolean hasAmqpUlongPrefix(String stringId) { + return stringId.startsWith(AMQP_ULONG_PREFIX); + } + + private boolean hasAmqpUuidPrefix(String stringId) { + return stringId.startsWith(AMQP_UUID_PREFIX); + } + + private boolean hasAmqpBinaryPrefix(String stringId) { + return stringId.startsWith(AMQP_BINARY_PREFIX); + } + + private String strip(String id, int numChars) { + return id.substring(numChars); + } + + private int hexCharToInt(char ch, String orig) throws IllegalArgumentException { + if (ch >= '0' && ch <= '9') { + // subtract '0' to get difference in position as an int + return ch - '0'; + } else if (ch >= 'A' && ch <= 'F') { + // subtract 'A' to get difference in position as an int + // and then add 10 for the offset of 'A' + return ch - 'A' + 10; + } else if (ch >= 'a' && ch <= 'f') { + // subtract 'a' to get difference in position as an int + // and then add 10 for the offset of 'a' + return ch - 'a' + 10; + } + + throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java new file mode 100644 index 0000000..194fabe --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import javax.jms.Destination; +import javax.jms.JMSException; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.Message; + +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; + +/** + * Support class containing constant values and static methods that are used to map to / from + * AMQP Message types being sent or received. + */ +public final class AMQPMessageSupport { + + // Message Properties used to map AMQP to JMS and back + + public static final String JMS_AMQP_PREFIX = "JMS_AMQP_"; + public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length(); + + public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT"; + public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING"; + public static final String NATIVE = "NATIVE"; + public static final String HEADER = "HEADER"; + public static final String PROPERTIES = "PROPERTIES"; + + public static final String FIRST_ACQUIRER = "FirstAcquirer"; + public static final String CONTENT_TYPE = "ContentType"; + public static final String CONTENT_ENCODING = "ContentEncoding"; + public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; + public static final String DURABLE = "DURABLE"; + public static final String PRIORITY = "PRIORITY"; + + public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; + public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; + public static final String FOOTER_PREFIX = "FT_"; + + public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; + public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE; + public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY; + public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; + public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; + public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE; + public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER; + public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE; + public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING; + public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID; + public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX; + public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX; + public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX; + + // Message body type definitions + public static final Binary EMPTY_BINARY = new Binary(new byte[0]); + public static final Data EMPTY_BODY = new Data(EMPTY_BINARY); + + public static final short AMQP_UNKNOWN = 0; + public static final short AMQP_NULL = 1; + public static final short AMQP_DATA = 2; + public static final short AMQP_SEQUENCE = 3; + public static final short AMQP_VALUE_NULL = 4; + public static final short AMQP_VALUE_STRING = 5; + public static final short AMQP_VALUE_BINARY = 6; + public static final short AMQP_VALUE_MAP = 7; + public static final short AMQP_VALUE_LIST = 8; + + public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest"); + public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to"); + + public static final byte QUEUE_TYPE = 0x00; + public static final byte TOPIC_TYPE = 0x01; + public static final byte TEMP_QUEUE_TYPE = 0x02; + public static final byte TEMP_TOPIC_TYPE = 0x03; + + + /** + * Content type used to mark Data sections as containing a serialized java object. + */ + public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object"; + + /** + * Content type used to mark Data sections as containing arbitrary bytes. + */ + public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; + + /** + * Lookup and return the correct Proton Symbol instance based on the given key. + * + * @param key + * the String value name of the Symbol to locate. + * + * @return the Symbol value that matches the given key. + */ + public static Symbol getSymbol(String key) { + return Symbol.valueOf(key); + } + + /** + * Safe way to access message annotations which will check internal structure and either + * return the annotation if it exists or null if the annotation or any annotations are + * present. + * + * @param key + * the String key to use to lookup an annotation. + * @param message + * the AMQP message object that is being examined. + * + * @return the given annotation value or null if not present in the message. + */ + public static Object getMessageAnnotation(String key, Message message) { + if (message != null && message.getMessageAnnotations() != null) { + Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue(); + return annotations.get(AMQPMessageSupport.getSymbol(key)); + } + + return null; + } + + /** + * Check whether the content-type field of the properties section (if present) in the given + * message matches the provided string (where null matches if there is no content type + * present. + * + * @param contentType + * content type string to compare against, or null if none + * @param message + * the AMQP message object that is being examined. + * + * @return true if content type matches + */ + public static boolean isContentType(String contentType, Message message) { + if (contentType == null) { + return message.getContentType() == null; + } else { + return contentType.equals(message.getContentType()); + } + } + + /** + * @param contentType + * the contentType of the received message + * @return the character set to use, or null if not to treat the message as text + */ + public static Charset getCharsetForTextualContent(String contentType) { + try { + return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType); + } catch (ActiveMQAMQPInvalidContentTypeException e) { + return null; + } + } + + public static String toAddress(Destination destination) { + if (destination instanceof ActiveMQDestination) { + return ((ActiveMQDestination) destination).getAddress(); + } + return null; + } + + public static ServerJMSBytesMessage createBytesMessage(long id) { + return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE)); + } + + public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException { + ServerJMSBytesMessage message = createBytesMessage(id); + message.writeBytes(array, arrayOffset, length); + return message; + } + + public static ServerJMSStreamMessage createStreamMessage(long id) { + return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE)); + } + + public static ServerJMSMessage createMessage(long id) { + return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE)); + } + + public static ServerJMSTextMessage createTextMessage(long id) { + return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE)); + } + + public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException { + ServerJMSTextMessage message = createTextMessage(id); + message.setText(text); + return message; + } + + public static ServerJMSObjectMessage createObjectMessage(long id) { + return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE)); + } + + public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id); + message.setSerializedForm(serializedForm); + return message; + } + + public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id); + message.setSerializedForm(new Binary(array, offset, length)); + return message; + } + + public static ServerJMSMapMessage createMapMessage(long id) { + return new ServerJMSMapMessage(newMessage(id, MAP_TYPE)); + } + + public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException { + ServerJMSMapMessage message = createMapMessage(id); + final Set<Map.Entry<String, Object>> set = content.entrySet(); + for (Map.Entry<String, Object> entry : set) { + Object value = entry.getValue(); + if (value instanceof Binary) { + Binary binary = (Binary) value; + value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength()); + } + message.setObject(entry.getKey(), value); + } + return message; + } + + private static CoreMessage newMessage(long id, byte messageType) { + CoreMessage message = new CoreMessage(id, 512); + message.setType(messageType); + ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); + return message; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 44aff5b..23474a4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -35,7 +35,6 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.qpid.proton.amqp.Binary; @@ -59,34 +58,34 @@ import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.WritableBuffer; import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_LIST; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_MAP; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType; /** * This class was created just to separate concerns on AMQPConverter. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index c29ec9f..d82abff 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -45,7 +45,6 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; @@ -70,36 +69,36 @@ import org.jboss.logging.Logger; import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.QUEUE_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TEMP_QUEUE_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TEMP_TOPIC_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TOPIC_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_UNKNOWN; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_LIST; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.EMPTY_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_NATIVE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PROPERTIES; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress; public class CoreAmqpConverter { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java deleted file mode 100644 index 01d72c8..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.StandardCharsets; -import java.nio.charset.UnsupportedCharsetException; -import java.util.StringTokenizer; - -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; - -public final class AMQPContentTypeSupport { - - private static final String UTF_8 = "UTF-8"; - private static final String CHARSET = "charset"; - private static final String TEXT = "text"; - private static final String APPLICATION = "application"; - private static final String JAVASCRIPT = "javascript"; - private static final String XML = "xml"; - private static final String XML_VARIANT = "+xml"; - private static final String JSON = "json"; - private static final String JSON_VARIANT = "+json"; - private static final String XML_DTD = "xml-dtd"; - private static final String ECMASCRIPT = "ecmascript"; - - /** - * @param contentType - * the contentType of the received message - * @return the character set to use, or null if not to treat the message as text - * @throws ActiveMQAMQPInvalidContentTypeException - * if the content-type is invalid in some way. - */ - public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException { - if (contentType == null || contentType.trim().isEmpty()) { - throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty"); - } - - int subTypeSeparator = contentType.indexOf("/"); - if (subTypeSeparator == -1) { - throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType); - } - - final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim(); - - String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); - - String parameterPart = null; - int parameterSeparator = subTypePart.indexOf(";"); - if (parameterSeparator != -1) { - if (parameterSeparator < subTypePart.length() - 1) { - parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); - } - subTypePart = subTypePart.substring(0, parameterSeparator).trim(); - } - - if (subTypePart.isEmpty()) { - throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType); - } - - final String subType = subTypePart; - - if (isTextual(type, subType)) { - String charset = findCharset(parameterPart); - if (charset == null) { - charset = UTF_8; - } - - if (UTF_8.equals(charset)) { - return StandardCharsets.UTF_8; - } else { - try { - return Charset.forName(charset); - } catch (IllegalCharsetNameException icne) { - throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset); - } catch (UnsupportedCharsetException uce) { - throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset); - } - } - } - - return null; - } - - // ----- Internal Content Type utilities ----------------------------------// - - private static boolean isTextual(String type, String subType) { - if (TEXT.equals(type)) { - return true; - } - - if (APPLICATION.equals(type)) { - if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT) - || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) { - return true; - } - } - - return false; - } - - private static String findCharset(String paramaterPart) { - String charset = null; - - if (paramaterPart != null) { - StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";"); - while (tokenizer.hasMoreTokens()) { - String parameter = tokenizer.nextToken().trim(); - int eqIndex = parameter.indexOf('='); - if (eqIndex != -1) { - String name = parameter.substring(0, eqIndex); - if (CHARSET.equalsIgnoreCase(name.trim())) { - String value = unquote(parameter.substring(eqIndex + 1)); - - charset = value.toUpperCase(); - break; - } - } - } - } - - return charset; - } - - private static String unquote(String s) { - if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) { - return s.substring(1, s.length() - 1); - } else { - return s; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java deleted file mode 100644 index 4a2123d..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.nio.ByteBuffer; -import java.util.UUID; - -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.UnsignedLong; - -/** - * Helper class for identifying and converting message-id and correlation-id values between the - * AMQP types and the Strings values used by JMS. - * <p> - * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, - * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a - * string representation of these for interoperability with other AMQP clients, the following - * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID - * value:<br> - * <p> - * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br> - * {@literal "AMQP_UUID:<string representation of uuid>"}<br> - * {@literal "AMQP_ULONG:<string representation of ulong>"}<br> - * {@literal "AMQP_STRING:<string>"}<br> - * <p> - * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to - * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used - * otherwise. - * <p> - * When provided a string for conversion which attempts to identify itself as an encoded binary, - * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown. - */ -public class AMQPMessageIdHelper { - - public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper(); - - public static final String AMQP_STRING_PREFIX = "AMQP_STRING:"; - public static final String AMQP_UUID_PREFIX = "AMQP_UUID:"; - public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:"; - public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:"; - - private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length(); - private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length(); - private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length(); - private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length(); - private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); - - /** - * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes - * type information as a prefix where necessary to convey or escape the type of the provided - * object. - * - * @param messageId - * the raw messageId object to process - * @return the base string to be used in creating the actual id. - */ - public String toBaseMessageIdString(Object messageId) { - if (messageId == null) { - return null; - } else if (messageId instanceof String) { - String stringId = (String) messageId; - - // If the given string has a type encoding prefix, - // we need to escape it as an encoded string (even if - // the existing encoding prefix was also for string) - if (hasTypeEncodingPrefix(stringId)) { - return AMQP_STRING_PREFIX + stringId; - } else { - return stringId; - } - } else if (messageId instanceof UUID) { - return AMQP_UUID_PREFIX + messageId.toString(); - } else if (messageId instanceof UnsignedLong) { - return AMQP_ULONG_PREFIX + messageId.toString(); - } else if (messageId instanceof Binary) { - ByteBuffer dup = ((Binary) messageId).asByteBuffer(); - - byte[] bytes = new byte[dup.remaining()]; - dup.get(bytes); - - String hex = convertBinaryToHexString(bytes); - - return AMQP_BINARY_PREFIX + hex; - } else { - throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass()); - } - } - - /** - * Takes the provided base id string and return the appropriate amqp messageId style object. - * Converts the type based on any relevant encoding information found as a prefix. - * - * @param baseId - * the object to be converted to an AMQP MessageId value. - * @return the AMQP messageId style object - * @throws ActiveMQAMQPIllegalStateException - * if the provided baseId String indicates an encoded type but can't be converted to - * that type. - */ - public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException { - if (baseId == null) { - return null; - } - - try { - if (hasAmqpUuidPrefix(baseId)) { - String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH); - return UUID.fromString(uuidString); - } else if (hasAmqpUlongPrefix(baseId)) { - String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH); - return UnsignedLong.valueOf(longString); - } else if (hasAmqpStringPrefix(baseId)) { - return strip(baseId, AMQP_STRING_PREFIX_LENGTH); - } else if (hasAmqpBinaryPrefix(baseId)) { - String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH); - byte[] bytes = convertHexStringToBinary(hexString); - return new Binary(bytes); - } else { - // We have a string without any type prefix, transmit it as-is. - return baseId; - } - } catch (IllegalArgumentException e) { - throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value"); - } - } - - /** - * Convert the provided hex-string into a binary representation where each byte represents - * two characters of the hex string. - * <p> - * The hex characters may be upper or lower case. - * - * @param hexString - * string to convert to a binary value. - * @return a byte array containing the binary representation - * @throws IllegalArgumentException - * if the provided String is a non-even length or contains non-hex characters - */ - public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException { - int length = hexString.length(); - - // As each byte needs two characters in the hex encoding, the string must be an even - // length. - if (length % 2 != 0) { - throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString); - } - - byte[] binary = new byte[length / 2]; - - for (int i = 0; i < length; i += 2) { - char highBitsChar = hexString.charAt(i); - char lowBitsChar = hexString.charAt(i + 1); - - int highBits = hexCharToInt(highBitsChar, hexString) << 4; - int lowBits = hexCharToInt(lowBitsChar, hexString); - - binary[i / 2] = (byte) (highBits + lowBits); - } - - return binary; - } - - /** - * Convert the provided binary into a hex-string representation where each character - * represents 4 bits of the provided binary, i.e each byte requires two characters. - * <p> - * The returned hex characters are upper-case. - * - * @param bytes - * the binary value to convert to a hex String instance. - * @return a String containing a hex representation of the bytes - */ - public String convertBinaryToHexString(byte[] bytes) { - // Each byte is represented as 2 chars - StringBuilder builder = new StringBuilder(bytes.length * 2); - - for (byte b : bytes) { - // The byte will be expanded to int before shifting, replicating the - // sign bit, so mask everything beyond the first 4 bits afterwards - int highBitsInt = (b >> 4) & 0xF; - // We only want the first 4 bits - int lowBitsInt = b & 0xF; - - builder.append(HEX_CHARS[highBitsInt]); - builder.append(HEX_CHARS[lowBitsInt]); - } - - return builder.toString(); - } - - // ----- Internal implementation ------------------------------------------// - - private boolean hasTypeEncodingPrefix(String stringId) { - return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId); - } - - private boolean hasAmqpStringPrefix(String stringId) { - return stringId.startsWith(AMQP_STRING_PREFIX); - } - - private boolean hasAmqpUlongPrefix(String stringId) { - return stringId.startsWith(AMQP_ULONG_PREFIX); - } - - private boolean hasAmqpUuidPrefix(String stringId) { - return stringId.startsWith(AMQP_UUID_PREFIX); - } - - private boolean hasAmqpBinaryPrefix(String stringId) { - return stringId.startsWith(AMQP_BINARY_PREFIX); - } - - private String strip(String id, int numChars) { - return id.substring(numChars); - } - - private int hexCharToInt(char ch, String orig) throws IllegalArgumentException { - if (ch >= '0' && ch <= '9') { - // subtract '0' to get difference in position as an int - return ch - '0'; - } else if (ch >= 'A' && ch <= 'F') { - // subtract 'A' to get difference in position as an int - // and then add 10 for the offset of 'A' - return ch - 'A' + 10; - } else if (ch >= 'a' && ch <= 'f') { - // subtract 'a' to get difference in position as an int - // and then add 10 for the offset of 'a' - return ch - 'a' + 10; - } - - throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java deleted file mode 100644 index 9583051..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import javax.jms.Destination; -import javax.jms.JMSException; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Map; -import java.util.Set; - -import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.message.impl.CoreMessage; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.message.Message; - -import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; -import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE; -import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; -import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; -import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; -import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; - -/** - * Support class containing constant values and static methods that are used to map to / from - * AMQP Message types being sent or received. - */ -public final class AMQPMessageSupport { - - // Message Properties used to map AMQP to JMS and back - - public static final String JMS_AMQP_PREFIX = "JMS_AMQP_"; - public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length(); - - public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT"; - public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING"; - public static final String NATIVE = "NATIVE"; - public static final String HEADER = "HEADER"; - public static final String PROPERTIES = "PROPERTIES"; - - public static final String FIRST_ACQUIRER = "FirstAcquirer"; - public static final String CONTENT_TYPE = "ContentType"; - public static final String CONTENT_ENCODING = "ContentEncoding"; - public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; - public static final String DURABLE = "DURABLE"; - public static final String PRIORITY = "PRIORITY"; - - public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; - public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; - public static final String FOOTER_PREFIX = "FT_"; - - public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; - public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE; - public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY; - public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; - public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; - public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE; - public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER; - public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE; - public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING; - public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID; - public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX; - public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX; - public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX; - - // Message body type definitions - public static final Binary EMPTY_BINARY = new Binary(new byte[0]); - public static final Data EMPTY_BODY = new Data(EMPTY_BINARY); - - public static final short AMQP_UNKNOWN = 0; - public static final short AMQP_NULL = 1; - public static final short AMQP_DATA = 2; - public static final short AMQP_SEQUENCE = 3; - public static final short AMQP_VALUE_NULL = 4; - public static final short AMQP_VALUE_STRING = 5; - public static final short AMQP_VALUE_BINARY = 6; - public static final short AMQP_VALUE_MAP = 7; - public static final short AMQP_VALUE_LIST = 8; - - public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest"); - public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to"); - - public static final byte QUEUE_TYPE = 0x00; - public static final byte TOPIC_TYPE = 0x01; - public static final byte TEMP_QUEUE_TYPE = 0x02; - public static final byte TEMP_TOPIC_TYPE = 0x03; - - - /** - * Content type used to mark Data sections as containing a serialized java object. - */ - public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object"; - - /** - * Content type used to mark Data sections as containing arbitrary bytes. - */ - public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; - - /** - * Lookup and return the correct Proton Symbol instance based on the given key. - * - * @param key - * the String value name of the Symbol to locate. - * - * @return the Symbol value that matches the given key. - */ - public static Symbol getSymbol(String key) { - return Symbol.valueOf(key); - } - - /** - * Safe way to access message annotations which will check internal structure and either - * return the annotation if it exists or null if the annotation or any annotations are - * present. - * - * @param key - * the String key to use to lookup an annotation. - * @param message - * the AMQP message object that is being examined. - * - * @return the given annotation value or null if not present in the message. - */ - public static Object getMessageAnnotation(String key, Message message) { - if (message != null && message.getMessageAnnotations() != null) { - Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue(); - return annotations.get(AMQPMessageSupport.getSymbol(key)); - } - - return null; - } - - /** - * Check whether the content-type field of the properties section (if present) in the given - * message matches the provided string (where null matches if there is no content type - * present. - * - * @param contentType - * content type string to compare against, or null if none - * @param message - * the AMQP message object that is being examined. - * - * @return true if content type matches - */ - public static boolean isContentType(String contentType, Message message) { - if (contentType == null) { - return message.getContentType() == null; - } else { - return contentType.equals(message.getContentType()); - } - } - - /** - * @param contentType - * the contentType of the received message - * @return the character set to use, or null if not to treat the message as text - */ - public static Charset getCharsetForTextualContent(String contentType) { - try { - return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType); - } catch (ActiveMQAMQPInvalidContentTypeException e) { - return null; - } - } - - public static String toAddress(Destination destination) { - if (destination instanceof ActiveMQDestination) { - return ((ActiveMQDestination) destination).getAddress(); - } - return null; - } - - public static ServerJMSBytesMessage createBytesMessage(long id) { - return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE)); - } - - public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException { - ServerJMSBytesMessage message = createBytesMessage(id); - message.writeBytes(array, arrayOffset, length); - return message; - } - - public static ServerJMSStreamMessage createStreamMessage(long id) { - return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE)); - } - - public static ServerJMSMessage createMessage(long id) { - return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE)); - } - - public static ServerJMSTextMessage createTextMessage(long id) { - return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE)); - } - - public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException { - ServerJMSTextMessage message = createTextMessage(id); - message.setText(text); - return message; - } - - public static ServerJMSObjectMessage createObjectMessage(long id) { - return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE)); - } - - public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(id); - message.setSerializedForm(serializedForm); - return message; - } - - public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(id); - message.setSerializedForm(new Binary(array, offset, length)); - return message; - } - - public static ServerJMSMapMessage createMapMessage(long id) { - return new ServerJMSMapMessage(newMessage(id, MAP_TYPE)); - } - - public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException { - ServerJMSMapMessage message = createMapMessage(id); - final Set<Map.Entry<String, Object>> set = content.entrySet(); - for (Map.Entry<String, Object> entry : set) { - Object value = entry.getValue(); - if (value instanceof Binary) { - Binary binary = (Binary) value; - value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength()); - } - message.setObject(entry.getKey(), value); - } - return message; - } - - private static CoreMessage newMessage(long id, byte messageType) { - CoreMessage message = new CoreMessage(id, 512); - message.setType(messageType); - ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); - return message; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java index 4caead7..c6108b4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java @@ -16,15 +16,17 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPContentTypeSupport; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + public class AMQPContentTypeSupportTest { @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java index c53cda5..60c1989 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java @@ -20,19 +20,20 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - import java.util.UUID; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedLong; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + public class AMQPMessageIdHelperTest { private AMQPMessageIdHelper messageIdHelper; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java index d4e078f..6aeb4dc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java @@ -16,20 +16,21 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.util.HashMap; import java.util.Map; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.message.Message; import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class AMQPMessageSupportTest { // ---------- getSymbol ---------------------------------------------------// http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java index 693b4e0..963be20 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java @@ -35,6 +35,7 @@ import java.util.UUID; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19608cb4/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java index f38da3a..e56b8ff 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; @@ -48,12 +49,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_UNKNOWN; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull;