http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java new file mode 100644 index 0000000..44aff5b --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.converter; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageIdHelper; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Decimal128; +import org.apache.qpid.proton.amqp.Decimal32; +import org.apache.qpid.proton.amqp.Decimal64; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.UnsignedShort; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.WritableBuffer; + +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType; + +/** + * This class was created just to separate concerns on AMQPConverter. + * For better organization of the code. + * */ +public class AmqpCoreConverter { + + public static ICoreMessage toCore(AMQPMessage message) throws Exception { + + Section body = message.getProtonMessage().getBody(); + ServerJMSMessage result; + + if (body == null) { + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message.getProtonMessage())) { + result = createObjectMessage(message.getMessageID()); + } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) { + result = createBytesMessage(message.getMessageID()); + } else { + Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); + if (charset != null) { + result = createTextMessage(message.getMessageID()); + } else { + result = createMessage(message.getMessageID()); + } + } + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); + } else if (body instanceof Data) { + Binary payload = ((Data) body).getValue(); + + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message.getProtonMessage())) { + result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } else { + Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); + if (StandardCharsets.UTF_8.equals(charset)) { + ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + + try { + CharBuffer chars = charset.newDecoder().decode(buf); + result = createTextMessage(message.getMessageID(), String.valueOf(chars)); + } catch (CharacterCodingException e) { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + } else { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + } + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); + } else if (body instanceof AmqpSequence) { + AmqpSequence sequence = (AmqpSequence) body; + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + for (Object item : sequence.getValue()) { + m.writeObject(item); + } + + result = m; + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); + } else if (body instanceof AmqpValue) { + Object value = ((AmqpValue) body).getValue(); + if (value == null || value instanceof String) { + result = createTextMessage(message.getMessageID(), (String) value); + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING); + } else if (value instanceof Binary) { + Binary payload = (Binary) value; + + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message.getProtonMessage())) { + result = createObjectMessage(message.getMessageID(), payload); + } else { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); + } else if (value instanceof List) { + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + for (Object item : (List<Object>) value) { + m.writeObject(item); + } + result = m; + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST); + } else if (value instanceof Map) { + result = createMapMessage(message.getMessageID(), (Map<String, Object>) value); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP); + } else { + ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + try { + TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); + TLSEncode.getEncoder().writeObject(body); + result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex()); + } finally { + buf.release(); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); + } + } + } else { + throw new RuntimeException("Unexpected body type: " + body.getClass()); + } + + populateMessage(result, message.getProtonMessage()); + + return result != null ? result.getInnerMessage() : null; + } + + protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + Header header = amqp.getHeader(); + if (header != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER, true); + + if (header.getDurable() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true); + jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } else { + jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + + if (header.getPriority() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true); + jms.setJMSPriority(header.getPriority().intValue()); + } else { + jms.setJMSPriority(javax.jms.Message.DEFAULT_PRIORITY); + } + + if (header.getFirstAcquirer() != null) { + jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer()); + } + + if (header.getDeliveryCount() != null) { + // AMQP Delivery Count counts only failed delivers where JMS + // Delivery Count should include the original delivery in the count. + jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1); + } + } else { + jms.setJMSPriority((byte) javax.jms.Message.DEFAULT_PRIORITY); + jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + + final MessageAnnotations ma = amqp.getMessageAnnotations(); + if (ma != null) { + for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) { + String key = entry.getKey().toString(); + if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + long deliveryTime = ((Number) entry.getValue()).longValue(); + jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); + } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { + long delay = ((Number) entry.getValue()).longValue(); + if (delay > 0) { + jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); + } + } + + setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); + } + } + + final ApplicationProperties ap = amqp.getApplicationProperties(); + if (ap != null) { + for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) { + setProperty(jms, entry.getKey().toString(), entry.getValue()); + } + } + + final Properties properties = amqp.getProperties(); + if (properties != null) { + if (properties.getMessageId() != null) { + jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId())); + } + Binary userId = properties.getUserId(); + if (userId != null) { + // TODO - Better Way to set this? + jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); + } + if (properties.getTo() != null) { + jms.setJMSDestination(new ServerDestination(properties.getTo())); + } + if (properties.getSubject() != null) { + jms.setJMSType(properties.getSubject()); + } + if (properties.getReplyTo() != null) { + jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo())); + } + if (properties.getCorrelationId() != null) { + jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); + } + if (properties.getContentType() != null) { + jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString()); + } + if (properties.getContentEncoding() != null) { + jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString()); + } + if (properties.getCreationTime() != null) { + jms.setJMSTimestamp(properties.getCreationTime().getTime()); + } + if (properties.getGroupId() != null) { + jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId()); + } + if (properties.getGroupSequence() != null) { + jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue()); + } + if (properties.getReplyToGroupId() != null) { + jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId()); + } + if (properties.getAbsoluteExpiryTime() != null) { + jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); + } + } + + // If the jms expiration has not yet been set... + if (header != null && jms.getJMSExpiration() == 0) { + // Then lets try to set it based on the message ttl. + long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; + if (header.getTtl() != null) { + ttl = header.getTtl().longValue(); + } + + if (ttl == 0) { + jms.setJMSExpiration(0); + } else { + jms.setJMSExpiration(System.currentTimeMillis() + ttl); + } + } + + final Footer fp = amqp.getFooter(); + if (fp != null) { + for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); + } + } + + return jms; + } + + private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException { + if (value instanceof UnsignedLong) { + long v = ((UnsignedLong) value).longValue(); + msg.setLongProperty(key, v); + } else if (value instanceof UnsignedInteger) { + long v = ((UnsignedInteger) value).longValue(); + if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { + msg.setIntProperty(key, (int) v); + } else { + msg.setLongProperty(key, v); + } + } else if (value instanceof UnsignedShort) { + int v = ((UnsignedShort) value).intValue(); + if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { + msg.setShortProperty(key, (short) v); + } else { + msg.setIntProperty(key, v); + } + } else if (value instanceof UnsignedByte) { + short v = ((UnsignedByte) value).shortValue(); + if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { + msg.setByteProperty(key, (byte) v); + } else { + msg.setShortProperty(key, v); + } + } else if (value instanceof Symbol) { + msg.setStringProperty(key, value.toString()); + } else if (value instanceof Decimal128) { + msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); + } else if (value instanceof Decimal64) { + msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); + } else if (value instanceof Decimal32) { + msg.setFloatProperty(key, ((Decimal32) value).floatValue()); + } else if (value instanceof Binary) { + msg.setStringProperty(key, value.toString()); + } else { + msg.setObjectProperty(key, value); + } + } + + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java new file mode 100644 index 0000000..c29ec9f --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.converter; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageIdHelper; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.jboss.logging.Logger; + +import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TEMP_QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TEMP_TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress; + +public class CoreAmqpConverter { + + private static Logger logger = Logger.getLogger(CoreAmqpConverter.class); + + public static AMQPMessage checkAMQP(Message message) throws Exception { + if (message instanceof AMQPMessage) { + return (AMQPMessage)message; + } else { + // It will first convert to Core, then to AMQP + return fromCore(message.toCore()); + } + } + + public static AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception { + if (coreMessage == null) { + return null; + } + + ServerJMSMessage message = ServerJMSMessage.wrapCoreMessage(coreMessage); + message.decode(); + + long messageFormat = 0; + Header header = null; + Properties properties = null; + Map<Symbol, Object> daMap = null; + Map<Symbol, Object> maMap = null; + Map<String, Object> apMap = null; + Map<Object, Object> footerMap = null; + + Section body = convertBody(message); + + if (message.getInnerMessage().isDurable()) { + if (header == null) { + header = new Header(); + } + header.setDurable(true); + } + byte priority = (byte) message.getJMSPriority(); + if (priority != javax.jms.Message.DEFAULT_PRIORITY) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); + } + String type = message.getJMSType(); + if (type != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setSubject(type); + } + String messageId = message.getJMSMessageID(); + if (messageId != null) { + if (properties == null) { + properties = new Properties(); + } + try { + properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId)); + } catch (ActiveMQAMQPIllegalStateException e) { + properties.setMessageId(messageId); + } + } + Destination destination = message.getJMSDestination(); + if (destination != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setTo(toAddress(destination)); + if (maMap == null) { + maMap = new HashMap<>(); + } + maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); + } + Destination replyTo = message.getJMSReplyTo(); + if (replyTo != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyTo(toAddress(replyTo)); + if (maMap == null) { + maMap = new HashMap<>(); + } + maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); + } + String correlationId = message.getJMSCorrelationID(); + if (correlationId != null) { + if (properties == null) { + properties = new Properties(); + } + try { + properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); + } catch (ActiveMQAMQPIllegalStateException e) { + properties.setCorrelationId(correlationId); + } + } + long expiration = message.getJMSExpiration(); + if (expiration != 0) { + long ttl = expiration - System.currentTimeMillis(); + if (ttl < 0) { + ttl = 1; + } + + if (header == null) { + header = new Header(); + } + header.setTtl(new UnsignedInteger((int) ttl)); + + if (properties == null) { + properties = new Properties(); + } + properties.setAbsoluteExpiryTime(new Date(expiration)); + } + long timeStamp = message.getJMSTimestamp(); + if (timeStamp != 0) { + if (properties == null) { + properties = new Properties(); + } + properties.setCreationTime(new Date(timeStamp)); + } + + final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage()); + for (String key : keySet) { + if (key.startsWith("JMSX")) { + if (key.equals("JMSXUserID")) { + String value = message.getStringProperty(key); + if (properties == null) { + properties = new Properties(); + } + properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8))); + continue; + } else if (key.equals("JMSXGroupID")) { + String value = message.getStringProperty(key); + if (properties == null) { + properties = new Properties(); + } + properties.setGroupId(value); + continue; + } else if (key.equals("JMSXGroupSeq")) { + UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key)); + if (properties == null) { + properties = new Properties(); + } + properties.setGroupSequence(value); + continue; + } + } else if (key.startsWith(JMS_AMQP_PREFIX)) { + // AMQP Message Information stored from a conversion to the Core Message + if (key.equals(JMS_AMQP_NATIVE)) { + // skip..internal use only + continue; + } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) { + // skip..internal use only + continue; + } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { + if (header == null) { + header = new Header(); + } + header.setFirstAcquirer(message.getBooleanProperty(key)); + continue; + } else if (key.equals(JMS_AMQP_HEADER)) { + if (header == null) { + header = new Header(); + } + continue; + } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) { + if (header == null) { + header = new Header(); + } + header.setDurable(message.getInnerMessage().isDurable()); + continue; + } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); + continue; + } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { + if (properties == null) { + properties = new Properties(); + } + continue; + } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) { + if (daMap == null) { + daMap = new HashMap<>(); + } + String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); + daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); + continue; + } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) { + if (maMap == null) { + maMap = new HashMap<>(); + } + String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); + maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); + continue; + } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentType(Symbol.getSymbol(message.getStringProperty(key))); + continue; + } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key))); + continue; + } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyToGroupId(message.getStringProperty(key)); + continue; + } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) { + if (footerMap == null) { + footerMap = new HashMap<>(); + } + String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); + footerMap.put(name, message.getObjectProperty(key)); + continue; + } + } else if (key.equals("_AMQ_GROUP_ID")) { + String value = message.getStringProperty(key); + if (properties == null) { + properties = new Properties(); + } + properties.setGroupId(value); + continue; + } else if (key.equals(NATIVE_MESSAGE_ID)) { + // skip..internal use only + continue; + } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) { + // skip..remove annotation from previous inbound transformation + continue; + } + + if (apMap == null) { + apMap = new HashMap<>(); + } + + Object objectProperty = message.getObjectProperty(key); + if (objectProperty instanceof byte[]) { + objectProperty = new Binary((byte[]) objectProperty); + } + + apMap.put(key, objectProperty); + } + + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + + try { + EncoderImpl encoder = TLSEncode.getEncoder(); + encoder.setByteBuffer(new NettyWritable(buffer)); + + if (header != null) { + encoder.writeObject(header); + } + if (daMap != null) { + encoder.writeObject(new DeliveryAnnotations(daMap)); + } + if (maMap != null) { + encoder.writeObject(new MessageAnnotations(maMap)); + } + if (properties != null) { + encoder.writeObject(properties); + } + if (apMap != null) { + encoder.writeObject(new ApplicationProperties(apMap)); + } + if (body != null) { + encoder.writeObject(body); + } + if (footerMap != null) { + encoder.writeObject(new Footer(footerMap)); + } + + byte[] data = new byte[buffer.writerIndex()]; + buffer.readBytes(data); + + return new AMQPMessage(messageFormat, data); + + } finally { + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); + buffer.release(); + } + } + + private static Section convertBody(ServerJMSMessage message) throws JMSException { + + Section body = null; + short orignalEncoding = AMQP_UNKNOWN; + + try { + orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING); + } catch (Exception ex) { + // Ignore and stick with UNKNOWN + } + + if (message instanceof ServerJMSBytesMessage) { + Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message); + + if (payload == null) { + payload = EMPTY_BINARY; + } + + switch (orignalEncoding) { + case AMQP_NULL: + break; + case AMQP_VALUE_BINARY: + body = new AmqpValue(payload); + break; + case AMQP_DATA: + case AMQP_UNKNOWN: + default: + body = new Data(payload); + break; + } + } else if (message instanceof ServerJMSTextMessage) { + switch (orignalEncoding) { + case AMQP_NULL: + break; + case AMQP_DATA: + body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message)); + break; + case AMQP_VALUE_STRING: + case AMQP_UNKNOWN: + default: + body = new AmqpValue(((TextMessage) message).getText()); + break; + } + } else if (message instanceof ServerJMSMapMessage) { + body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message)); + } else if (message instanceof ServerJMSStreamMessage) { + ArrayList<Object> list = new ArrayList<>(); + final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message; + try { + while (true) { + list.add(m.readObject()); + } + } catch (MessageEOFException e) { + } + + switch (orignalEncoding) { + case AMQP_SEQUENCE: + body = new AmqpSequence(list); + break; + case AMQP_VALUE_LIST: + case AMQP_UNKNOWN: + default: + body = new AmqpValue(list); + break; + } + } else if (message instanceof ServerJMSObjectMessage) { + Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message); + + if (payload == null) { + payload = EMPTY_BINARY; + } + + switch (orignalEncoding) { + case AMQP_VALUE_BINARY: + body = new AmqpValue(payload); + break; + case AMQP_DATA: + case AMQP_UNKNOWN: + default: + body = new Data(payload); + break; + } + + // For a non-AMQP message we tag the outbound content type as containing + // a serialized Java object so that an AMQP client has a hint as to what + // we are sending it. + if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) { + message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + } + } else if (message instanceof ServerJMSMessage) { + // If this is not an AMQP message that was converted then the original encoding + // will be unknown so we check for special cases of messages with special data + // encoded into the server message body. + if (orignalEncoding == AMQP_UNKNOWN) { + ICoreMessage internalMessage = message.getInnerMessage(); + int readerIndex = internalMessage.getBodyBuffer().readerIndex(); + try { + Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); + if (s != null) { + body = new AmqpValue(s.toString()); + } + } catch (Throwable ignored) { + logger.debug("Exception ignored during conversion", ignored.getMessage(), ignored); + } finally { + internalMessage.getBodyBuffer().readerIndex(readerIndex); + } + } + } + + return body; + } + + private static Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException { + byte[] data = new byte[(int) message.getBodyLength()]; + message.readBytes(data); + message.reset(); // Need to reset after readBytes or future readBytes + + return new Binary(data); + } + + private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException { + Binary result = null; + String text = message.getText(); + if (text != null) { + result = new Binary(text.getBytes(StandardCharsets.UTF_8)); + } + + return result; + } + + private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { + message.getInnerMessage().getBodyBuffer().resetReaderIndex(); + int size = message.getInnerMessage().getBodyBuffer().readInt(); + byte[] bytes = new byte[size]; + message.getInnerMessage().getBodyBuffer().readBytes(bytes); + + return new Binary(bytes); + } + + private static Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException { + final HashMap<String, Object> map = new LinkedHashMap<>(); + + @SuppressWarnings("unchecked") + final Enumeration<String> names = message.getMapNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + Object value = message.getObject(key); + if (value instanceof byte[]) { + value = new Binary((byte[]) value); + } + map.put(key, value); + } + + return map; + } + + private static byte destinationType(Destination destination) { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return TEMP_QUEUE_TYPE; + } else { + return QUEUE_TYPE; + } + } else if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return TEMP_TOPIC_TYPE; + } else { + return TOPIC_TYPE; + } + } + + throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java deleted file mode 100644 index 125a20f..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter; - -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; - -import java.io.IOException; - -import javax.jms.BytesMessage; - -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport; -import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.codec.WritableBuffer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -public class ProtonMessageConverter implements MessageConverter { - - public ProtonMessageConverter(IDGenerator idGenerator) { - inboundTransformer = new JMSMappingInboundTransformer(idGenerator); - outboundTransformer = new JMSMappingOutboundTransformer(idGenerator); - } - - private final InboundTransformer inboundTransformer; - private final OutboundTransformer outboundTransformer; - - @Override - public org.apache.activemq.artemis.api.core.Message inbound(Object messageSource) throws Exception { - AMQPMessage encodedMessageSource = (AMQPMessage) messageSource; - ServerJMSMessage transformedMessage = null; - - try { - transformedMessage = inboundTransformer.transform(encodedMessageSource); - } catch (Exception e) { - ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName()); - ActiveMQClientLogger.LOGGER.trace("Transformation error:", e); - - throw new IOException("Failed to transform incoming delivery, skipping."); - } - - transformedMessage.encode(); - - return transformedMessage.getInnerMessage(); - } - - @Override - public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount) throws Exception { - // Useful for testing but not recommended for real life use. - ByteBuf nettyBuffer = Unpooled.buffer(1024); - NettyWritable buffer = new NettyWritable(nettyBuffer); - long messageFormat = (long) outbound(messageOutbound, deliveryCount, buffer); - - EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), - nettyBuffer.readableBytes()); - - return encoded; - } - - public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception { - ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); - - jmsMessage.decode(); - - if (jmsMessage.getBooleanProperty(JMS_AMQP_NATIVE)) { - if (jmsMessage instanceof BytesMessage) { - return AMQPNativeOutboundTransformer.transform(outboundTransformer, (ServerJMSBytesMessage) jmsMessage, buffer); - } else { - return 0; - } - } else { - return outboundTransformer.transform(jmsMessage, buffer); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java index c3a60f0..8d473a7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms; import javax.jms.BytesMessage; import javax.jms.JMSException; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; @@ -49,8 +49,8 @@ import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF; public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage { - public ServerJMSBytesMessage(Message message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSBytesMessage(ICoreMessage message) { + super(message); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java index df79183..f72239e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.utils.TypedProperties; @@ -51,8 +52,8 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe /* * This constructor is used to construct messages prior to sending */ - public ServerJMSMapMessage(Message message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSMapMessage(ICoreMessage message) { + super(message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index adf4621..2a52f7a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -25,34 +25,48 @@ import java.util.Enumeration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.reader.MessageUtil; import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; public class ServerJMSMessage implements Message { - protected final CoreMessage message; - - protected int deliveryCount; - - public org.apache.activemq.artemis.api.core.Message getInnerMessage() { - return message; - } + protected final ICoreMessage message; + private ActiveMQBuffer readBodyBuffer; - public ServerJMSMessage(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) { - this.message = (CoreMessage)message; - this.deliveryCount = deliveryCount; + public ServerJMSMessage(ICoreMessage message) { + this.message = message; + } + + public static ServerJMSMessage wrapCoreMessage(ICoreMessage wrapped) { + switch (wrapped.getType()) { + case STREAM_TYPE: + return new ServerJMSStreamMessage(wrapped); + case BYTES_TYPE: + return new ServerJMSBytesMessage(wrapped); + case MAP_TYPE: + return new ServerJMSMapMessage(wrapped); + case TEXT_TYPE: + return new ServerJMSTextMessage(wrapped); + case OBJECT_TYPE: + return new ServerJMSObjectMessage(wrapped); + default: + return new ServerJMSMessage(wrapped); + } } - public int getDeliveryCount() { - return deliveryCount; + public ICoreMessage getInnerMessage() { + return message; } - private ActiveMQBuffer readBodyBuffer; - /** * When reading we use a protected copy so multi-threads can work fine */ @@ -112,13 +126,13 @@ public class ServerJMSMessage implements Message { } @Override - public final void setJMSCorrelationID(String correlationID) throws JMSException { - MessageUtil.setJMSCorrelationID(message, correlationID); + public final String getJMSCorrelationID() throws JMSException { + return MessageUtil.getJMSCorrelationID(message); } @Override - public final String getJMSCorrelationID() throws JMSException { - return MessageUtil.getJMSCorrelationID(message); + public final void setJMSCorrelationID(String correlationID) throws JMSException { + MessageUtil.setJMSCorrelationID(message, correlationID); } @Override @@ -253,19 +267,11 @@ public class ServerJMSMessage implements Message { @Override public final int getIntProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return deliveryCount; - } - return message.getIntProperty(name); } @Override public final long getLongProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return deliveryCount; - } - return message.getLongProperty(name); } @@ -281,10 +287,6 @@ public class ServerJMSMessage implements Message { @Override public final String getStringProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return String.valueOf(deliveryCount); - } - return message.getStringProperty(name); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java index 15b04a9..23ffb09 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.jms; -import java.io.Serializable; - import javax.jms.JMSException; import javax.jms.ObjectMessage; +import java.io.Serializable; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.qpid.proton.amqp.Binary; @@ -30,8 +30,8 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe private Binary payload; - public ServerJMSObjectMessage(Message message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSObjectMessage(ICoreMessage message) { + super(message); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java index b092e61..9aaf4c3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java @@ -21,6 +21,7 @@ import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; import javax.jms.StreamMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.utils.DataConstants; @@ -43,8 +44,8 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St private int bodyLength = 0; - public ServerJMSStreamMessage(Message message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSStreamMessage(ICoreMessage message) { + super(message); } // StreamMessage implementation ---------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java index 058a3e9..f770185 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms; import javax.jms.JMSException; import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -48,8 +49,8 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag /* * This constructor is used to construct messages prior to sending */ - public ServerJMSTextMessage(Message message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSTextMessage(ICoreMessage message) { + super(message); } // TextMessage implementation ------------------------------------ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java index 0a39573..9583051 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMe import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; -import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Data; @@ -79,7 +78,6 @@ public final class AMQPMessageSupport { public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY; public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; - public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT; public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE; public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER; public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE; @@ -103,6 +101,15 @@ public final class AMQPMessageSupport { public static final short AMQP_VALUE_MAP = 7; public static final short AMQP_VALUE_LIST = 8; + public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest"); + public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to"); + + public static final byte QUEUE_TYPE = 0x00; + public static final byte TOPIC_TYPE = 0x01; + public static final byte TEMP_QUEUE_TYPE = 0x02; + public static final byte TEMP_TOPIC_TYPE = 0x03; + + /** * Content type used to mark Data sections as containing a serialized java object. */ @@ -179,23 +186,6 @@ public final class AMQPMessageSupport { } } - public static ServerJMSMessage wrapMessage(int messageType, org.apache.activemq.artemis.api.core.Message wrapped, int deliveryCount) { - switch (messageType) { - case STREAM_TYPE: - return new ServerJMSStreamMessage(wrapped, deliveryCount); - case BYTES_TYPE: - return new ServerJMSBytesMessage(wrapped, deliveryCount); - case MAP_TYPE: - return new ServerJMSMapMessage(wrapped, deliveryCount); - case TEXT_TYPE: - return new ServerJMSTextMessage(wrapped, deliveryCount); - case OBJECT_TYPE: - return new ServerJMSObjectMessage(wrapped, deliveryCount); - default: - return new ServerJMSMessage(wrapped, deliveryCount); - } - } - public static String toAddress(Destination destination) { if (destination instanceof ActiveMQDestination) { return ((ActiveMQDestination) destination).getAddress(); @@ -203,56 +193,56 @@ public final class AMQPMessageSupport { return null; } - public static ServerJMSBytesMessage createBytesMessage(IDGenerator idGenerator) { - return new ServerJMSBytesMessage(newMessage(idGenerator, BYTES_TYPE), 0); + public static ServerJMSBytesMessage createBytesMessage(long id) { + return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE)); } - public static ServerJMSMessage createBytesMessage(IDGenerator idGenerator, byte[] array, int arrayOffset, int length) throws JMSException { - ServerJMSBytesMessage message = createBytesMessage(idGenerator); + public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException { + ServerJMSBytesMessage message = createBytesMessage(id); message.writeBytes(array, arrayOffset, length); return message; } - public static ServerJMSStreamMessage createStreamMessage(IDGenerator idGenerator) { - return new ServerJMSStreamMessage(newMessage(idGenerator, STREAM_TYPE), 0); + public static ServerJMSStreamMessage createStreamMessage(long id) { + return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE)); } - public static ServerJMSMessage createMessage(IDGenerator idGenerator) { - return new ServerJMSMessage(newMessage(idGenerator, DEFAULT_TYPE), 0); + public static ServerJMSMessage createMessage(long id) { + return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE)); } - public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator) { - return new ServerJMSTextMessage(newMessage(idGenerator, TEXT_TYPE), 0); + public static ServerJMSTextMessage createTextMessage(long id) { + return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE)); } - public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator, String text) throws JMSException { - ServerJMSTextMessage message = createTextMessage(idGenerator); + public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException { + ServerJMSTextMessage message = createTextMessage(id); message.setText(text); return message; } - public static ServerJMSObjectMessage createObjectMessage(IDGenerator idGenerator) { - return new ServerJMSObjectMessage(newMessage(idGenerator, OBJECT_TYPE), 0); + public static ServerJMSObjectMessage createObjectMessage(long id) { + return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE)); } - public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, Binary serializedForm) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(idGenerator); + public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id); message.setSerializedForm(serializedForm); return message; } - public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, byte[] array, int offset, int length) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(idGenerator); + public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id); message.setSerializedForm(new Binary(array, offset, length)); return message; } - public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator) { - return new ServerJMSMapMessage(newMessage(idGenerator, MAP_TYPE), 0); + public static ServerJMSMapMessage createMapMessage(long id) { + return new ServerJMSMapMessage(newMessage(id, MAP_TYPE)); } - public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator, Map<String, Object> content) throws JMSException { - ServerJMSMapMessage message = createMapMessage(idGenerator); + public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException { + ServerJMSMapMessage message = createMapMessage(id); final Set<Map.Entry<String, Object>> set = content.entrySet(); for (Map.Entry<String, Object> entry : set) { Object value = entry.getValue(); @@ -265,8 +255,8 @@ public final class AMQPMessageSupport { return message; } - private static CoreMessage newMessage(IDGenerator idGenerator, byte messageType) { - CoreMessage message = new CoreMessage(idGenerator.generateID(), 512); + private static CoreMessage newMessage(long id, byte messageType) { + CoreMessage message = new CoreMessage(id, 512); message.setType(messageType); ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); return message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java deleted file mode 100644 index 70c755a..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -@Deprecated -public class AMQPMessageTypes { - - // TODO - Remove in future release as these are no longer used by the - // inbound JMS Transformer. - - public static final String AMQP_TYPE_KEY = "amqp:type"; - - public static final String AMQP_SEQUENCE = "amqp:sequence"; - - public static final String AMQP_LIST = "amqp:list"; -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java deleted file mode 100644 index 8e89bb3..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.io.UnsupportedEncodingException; - -import javax.jms.JMSException; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.message.ProtonJMessage; - -public class AMQPNativeOutboundTransformer extends OutboundTransformer { - - public AMQPNativeOutboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException { - if (message == null || !(message instanceof ServerJMSBytesMessage)) { - return 0; - } - - return transform(this, (ServerJMSBytesMessage) message, buffer); - } - - public static long transform(OutboundTransformer options, ServerJMSBytesMessage message, WritableBuffer buffer) throws JMSException { - byte[] data = new byte[(int) message.getBodyLength()]; - message.readBytes(data); - message.reset(); - - // The AMQP delivery-count field only includes prior failed delivery attempts, - int amqpDeliveryCount = message.getDeliveryCount() - 1; - if (amqpDeliveryCount >= 1) { - - // decode... - ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); - int offset = 0; - int len = data.length; - while (len > 0) { - final int decoded = amqp.decode(data, offset, len); - assert decoded > 0 : "Make progress decoding the message"; - offset += decoded; - len -= decoded; - } - - // Update the DeliveryCount header which might require adding a Header - if (amqp.getHeader() == null && amqpDeliveryCount > 0) { - amqp.setHeader(new Header()); - } - - amqp.getHeader().setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); - - amqp.encode(buffer); - } else { - buffer.put(data, 0, data.length); - } - - return 0; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java deleted file mode 100644 index 22042da..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.message.Message; - -public class EncodedMessage { - - private final Binary data; - final long messageFormat; - - public EncodedMessage(long messageFormat, byte[] data, int offset, int length) { - this.data = new Binary(data, offset, length); - this.messageFormat = messageFormat; - } - - public long getMessageFormat() { - return messageFormat; - } - - public Message decode() throws Exception { - Message amqp = Message.Factory.create(); - - int offset = getArrayOffset(); - int len = getLength(); - while (len > 0) { - final int decoded = amqp.decode(getArray(), offset, len); - assert decoded > 0 : "Make progress decoding the message"; - offset += decoded; - len -= decoded; - } - - return amqp; - } - - public int getLength() { - return data.getLength(); - } - - public int getArrayOffset() { - return data.getArrayOffset(); - } - - public byte[] getArray() { - return data.getArray(); - } - - @Override - public String toString() { - return data.toString(); - } -}
