ARTEMIS-770 AMQP Message Transformer refactor Refactor the AMQP Message transformers both for better performance and also to fix a number of issues with the transformers creating inbound and outbound messages with incorrectly mapped values or extra data appended where it should not be.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/62627bf2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/62627bf2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/62627bf2 Branch: refs/heads/master Commit: 62627bf2ee4789d8c019170fba0fcbf64a697a38 Parents: 6f6d984 Author: Timothy Bish <[email protected]> Authored: Mon Oct 3 14:23:50 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Oct 7 10:42:52 2016 -0400 ---------------------------------------------------------------------- .../amqp/broker/AMQPSessionCallback.java | 12 +- .../amqp/converter/ActiveMQJMSVendor.java | 148 --- .../amqp/converter/ProtonMessageConverter.java | 83 +- .../amqp/converter/jms/ServerJMSMessage.java | 9 +- .../converter/jms/ServerJMSObjectMessage.java | 47 +- .../message/AMQPContentTypeSupport.java | 146 +++ .../converter/message/AMQPMessageIdHelper.java | 59 +- .../converter/message/AMQPMessageSupport.java | 272 ++++++ .../converter/message/AMQPMessageTypes.java | 4 + .../message/AMQPNativeInboundTransformer.java | 28 +- .../message/AMQPNativeOutboundTransformer.java | 82 +- .../message/AMQPRawInboundTransformer.java | 48 +- .../amqp/converter/message/EncodedMessage.java | 12 +- .../converter/message/InboundTransformer.java | 182 ++-- .../message/JMSMappingInboundTransformer.java | 198 ++-- .../message/JMSMappingOutboundTransformer.java | 650 +++++++++---- .../amqp/converter/message/JMSVendor.java | 53 -- .../converter/message/OutboundTransformer.java | 78 +- ...ActiveMQAMQPInvalidContentTypeException.java | 27 + .../amqp/proton/ProtonServerSenderContext.java | 64 +- .../amqp/converter/TestConversions.java | 130 ++- .../message/AMQPContentTypeSupportTest.java | 230 +++++ .../message/AMQPMessageIdHelperTest.java | 391 ++++++++ .../message/AMQPMessageSupportTest.java | 108 +++ .../JMSMappingInboundTransformerTest.java | 718 ++++++++++++++ .../JMSMappingOutboundTransformerTest.java | 952 +++++++++++++++++++ .../JMSTransformationSpeedComparisonTest.java | 300 ++++++ .../message/MessageTransformationTest.java | 264 +++++ .../transport/amqp/client/AmqpMessage.java | 24 + .../tests/integration/amqp/ProtonTest.java | 71 +- 30 files changed, 4458 insertions(+), 932 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index c7ca446..66c7b4b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -21,7 +21,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; @@ -37,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; @@ -59,9 +59,10 @@ import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.message.ProtonJMessage; +import io.netty.buffer.ByteBuf; import org.jboss.logging.Logger; public class AMQPSessionCallback implements SessionCallback { @@ -259,8 +260,11 @@ public class AMQPSessionCallback implements SessionCallback { } } - public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception { - return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount); + public long encodeMessage(Object message, int deliveryCount, WritableBuffer buffer) throws Exception { + ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter(); + + // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode. + return (long) converter.outbound((ServerMessage) message, deliveryCount, buffer); } public String tempQueueName() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java deleted file mode 100644 index 0b28660..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSVendor; -import org.apache.activemq.artemis.utils.IDGenerator; - -public class ActiveMQJMSVendor implements JMSVendor { - - private final IDGenerator serverGenerator; - - ActiveMQJMSVendor(IDGenerator idGenerator) { - this.serverGenerator = idGenerator; - } - - @Override - public BytesMessage createBytesMessage() { - return new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0); - } - - @Override - public StreamMessage createStreamMessage() { - return new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0); - } - - @Override - public Message createMessage() { - return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0); - } - - @Override - public TextMessage createTextMessage() { - return new ServerJMSTextMessage(newMessage(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE), 0); - } - - @Override - public ObjectMessage createObjectMessage() { - return new ServerJMSObjectMessage(newMessage(org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE), 0); - } - - @Override - public MapMessage createMapMessage() { - return new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0); - } - - @Override - public void setJMSXUserID(Message message, String s) { - } - - @Override - public Destination createDestination(String name) { - return new ServerDestination(name); - } - - @Override - public void setJMSXGroupID(Message message, String s) { - try { - message.setStringProperty("_AMQ_GROUP_ID", s); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public void setJMSXGroupSequence(Message message, int i) { - try { - message.setIntProperty("JMSXGroupSeq", i); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public void setJMSXDeliveryCount(Message message, long l) { - try { - message.setLongProperty("JMSXDeliveryCount", l); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) { - switch (messageType) { - case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE: - return new ServerJMSStreamMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE: - return new ServerJMSBytesMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.MAP_TYPE: - return new ServerJMSMapMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE: - return new ServerJMSTextMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: - return new ServerJMSObjectMessage(wrapped, deliveryCount); - default: - return new ServerJMSMessage(wrapped, deliveryCount); - } - } - - @Override - public String toAddress(Destination destination) { - if (destination instanceof ActiveMQDestination) { - return ((ActiveMQDestination) destination).getAddress(); - } - return null; - } - - private ServerMessageImpl newMessage(byte messageType) { - ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512); - message.setType(messageType); - ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); - return message; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java index 6eb78d0..6aa44a4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java @@ -16,91 +16,86 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter; -import javax.jms.BytesMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; + import java.io.IOException; +import javax.jms.BytesMessage; + import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer; import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer; import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer; import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer; +import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.qpid.proton.codec.WritableBuffer; -public class ProtonMessageConverter implements MessageConverter { +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; - ActiveMQJMSVendor activeMQJMSVendor; - - private final String prefixVendor; +public class ProtonMessageConverter implements MessageConverter { public ProtonMessageConverter(IDGenerator idGenerator) { - activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator); - inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor); - outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor); - prefixVendor = outboundTransformer.getPrefixVendor(); + inboundTransformer = new JMSMappingInboundTransformer(idGenerator); + outboundTransformer = new JMSMappingOutboundTransformer(idGenerator); } private final InboundTransformer inboundTransformer; - private final JMSMappingOutboundTransformer outboundTransformer; + private final OutboundTransformer outboundTransformer; @Override public ServerMessage inbound(Object messageSource) throws Exception { - ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource); - - return (ServerMessage) jmsMessage.getInnerMessage(); - } - - /** - * Just create the JMS Part of the inbound (for testing) - * - * @param messageSource - * @return - * @throws Exception https://issues.jboss.org/browse/ENTMQ-1560 - */ - public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception { - EncodedMessage encodedMessageSource = messageSource; + EncodedMessage encodedMessageSource = (EncodedMessage) messageSource; ServerJMSMessage transformedMessage = null; - InboundTransformer transformer = inboundTransformer; - - while (transformer != null) { - try { - transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource); - break; - } catch (Exception e) { - ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName()); - ActiveMQClientLogger.LOGGER.trace("Transformation error:", e); - - transformer = transformer.getFallbackTransformer(); - } - } + try { + transformedMessage = inboundTransformer.transform(encodedMessageSource); + } catch (Exception e) { + ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName()); + ActiveMQClientLogger.LOGGER.trace("Transformation error:", e); - if (transformedMessage == null) { throw new IOException("Failed to transform incoming delivery, skipping."); } transformedMessage.encode(); - return transformedMessage; + return (ServerMessage) transformedMessage.getInnerMessage(); } @Override public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception { - ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); + // Useful for testing but not recommended for real life use. + ByteBuf nettyBuffer = Unpooled.buffer(1024); + NettyWritable buffer = new NettyWritable(nettyBuffer); + long messageFormat = (long) outbound(messageOutbound, deliveryCount, buffer); + + EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), + nettyBuffer.readableBytes()); + + return encoded; + } + + public Object outbound(ServerMessage messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception { + ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); jmsMessage.decode(); - if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) { + if (jmsMessage.getBooleanProperty(JMS_AMQP_NATIVE)) { if (jmsMessage instanceof BytesMessage) { - return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage); + return AMQPNativeOutboundTransformer.transform(outboundTransformer, (ServerJMSBytesMessage) jmsMessage, buffer); } else { - return null; + return 0; } } else { - return outboundTransformer.convert(jmsMessage); + return outboundTransformer.transform(jmsMessage, buffer); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index a6eac1d..c7900e4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -16,12 +16,13 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.jms; +import java.util.Collections; +import java.util.Enumeration; + import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import java.util.Collections; -import java.util.Enumeration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -47,6 +48,10 @@ public class ServerJMSMessage implements Message { this.deliveryCount = deliveryCount; } + public int getDeliveryCount() { + return deliveryCount; + } + private ActiveMQBuffer readBodyBuffer; /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java index 7f0906e..d1eaac6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java @@ -16,31 +16,20 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.jms; +import java.io.Serializable; + import javax.jms.JMSException; import javax.jms.ObjectMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader; +import org.apache.qpid.proton.amqp.Binary; public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage { - private static final String DEFAULT_WHITELIST; - private static final String DEFAULT_BLACKLIST; - - static { - DEFAULT_WHITELIST = System.getProperty(ObjectInputStreamWithClassLoader.WHITELIST_PROPERTY, "java.lang,java.math,javax.security,java.util,org.apache.activemq,org.apache.qpid.proton.amqp"); - - DEFAULT_BLACKLIST = System.getProperty(ObjectInputStreamWithClassLoader.BLACKLIST_PROPERTY, null); - } + public static final byte TYPE = Message.OBJECT_TYPE; - public static final byte TYPE = Message.STREAM_TYPE; - - private Serializable object; + private Binary payload; public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) { super(message, deliveryCount); @@ -48,23 +37,27 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe @Override public void setObject(Serializable object) throws JMSException { - this.object = object; + throw new UnsupportedOperationException("Cannot set Object on this internal message"); } @Override public Serializable getObject() throws JMSException { - return object; + throw new UnsupportedOperationException("Cannot set Object on this internal message"); + } + + public void setSerializedForm(Binary payload) { + this.payload = payload; + } + + public Binary getSerializedForm() { + return payload; } @Override public void encode() throws Exception { super.encode(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream ous = new ObjectOutputStream(out); - ous.writeObject(object); - byte[] src = out.toByteArray(); - getInnerMessage().getBodyBuffer().writeInt(src.length); - getInnerMessage().getBodyBuffer().writeBytes(src); + getInnerMessage().getBodyBuffer().writeInt(payload.getLength()); + getInnerMessage().getBodyBuffer().writeBytes(payload.getArray(), payload.getArrayOffset(), payload.getLength()); } @Override @@ -73,10 +66,6 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe int size = getInnerMessage().getBodyBuffer().readInt(); byte[] bytes = new byte[size]; getInnerMessage().getBodyBuffer().readBytes(bytes); - try (ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes))) { - ois.setWhiteList(DEFAULT_WHITELIST); - ois.setBlackList(DEFAULT_BLACKLIST); - object = (Serializable) ois.readObject(); - } + payload = new Binary(bytes); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java new file mode 100644 index 0000000..01d72c8 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter.message; + +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; +import java.nio.charset.UnsupportedCharsetException; +import java.util.StringTokenizer; + +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; + +public final class AMQPContentTypeSupport { + + private static final String UTF_8 = "UTF-8"; + private static final String CHARSET = "charset"; + private static final String TEXT = "text"; + private static final String APPLICATION = "application"; + private static final String JAVASCRIPT = "javascript"; + private static final String XML = "xml"; + private static final String XML_VARIANT = "+xml"; + private static final String JSON = "json"; + private static final String JSON_VARIANT = "+json"; + private static final String XML_DTD = "xml-dtd"; + private static final String ECMASCRIPT = "ecmascript"; + + /** + * @param contentType + * the contentType of the received message + * @return the character set to use, or null if not to treat the message as text + * @throws ActiveMQAMQPInvalidContentTypeException + * if the content-type is invalid in some way. + */ + public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException { + if (contentType == null || contentType.trim().isEmpty()) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty"); + } + + int subTypeSeparator = contentType.indexOf("/"); + if (subTypeSeparator == -1) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType); + } + + final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim(); + + String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); + + String parameterPart = null; + int parameterSeparator = subTypePart.indexOf(";"); + if (parameterSeparator != -1) { + if (parameterSeparator < subTypePart.length() - 1) { + parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); + } + subTypePart = subTypePart.substring(0, parameterSeparator).trim(); + } + + if (subTypePart.isEmpty()) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType); + } + + final String subType = subTypePart; + + if (isTextual(type, subType)) { + String charset = findCharset(parameterPart); + if (charset == null) { + charset = UTF_8; + } + + if (UTF_8.equals(charset)) { + return StandardCharsets.UTF_8; + } else { + try { + return Charset.forName(charset); + } catch (IllegalCharsetNameException icne) { + throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset); + } catch (UnsupportedCharsetException uce) { + throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset); + } + } + } + + return null; + } + + // ----- Internal Content Type utilities ----------------------------------// + + private static boolean isTextual(String type, String subType) { + if (TEXT.equals(type)) { + return true; + } + + if (APPLICATION.equals(type)) { + if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT) + || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) { + return true; + } + } + + return false; + } + + private static String findCharset(String paramaterPart) { + String charset = null; + + if (paramaterPart != null) { + StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";"); + while (tokenizer.hasMoreTokens()) { + String parameter = tokenizer.nextToken().trim(); + int eqIndex = parameter.indexOf('='); + if (eqIndex != -1) { + String name = parameter.substring(0, eqIndex); + if (CHARSET.equalsIgnoreCase(name.trim())) { + String value = unquote(parameter.substring(eqIndex + 1)); + + charset = value.toUpperCase(); + break; + } + } + } + } + + return charset; + } + + private static String unquote(String s) { + if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) { + return s.substring(1, s.length() - 1); + } else { + return s; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java index dc7891c..e9a9969 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java @@ -28,24 +28,29 @@ import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedLong; /** - * Helper class for identifying and converting message-id and correlation-id values between - * the AMQP types and the Strings values used by JMS. + * Helper class for identifying and converting message-id and correlation-id values between the + * AMQP types and the Strings values used by JMS. * <p> - * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary, - * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these - * for interoperability with other AMQP clients, the following encoding can be used after removing or - * before adding the "ID:" prefix used for a JMSMessageID value:<br> + * <p> + * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, + * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a + * string representation of these for interoperability with other AMQP clients, the following + * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID + * value:<br> * <p> * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br> * {@literal "AMQP_UUID:<string representation of uuid>"}<br> * {@literal "AMQP_ULONG:<string representation of ulong>"}<br> * {@literal "AMQP_STRING:<string>"}<br> * <p> - * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin - * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise. * <p> - * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or - * ulong but can't be converted into the indicated format, an exception will be thrown. + * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to + * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used + * otherwise. + * <p> + * <p> + * When provided a string for conversion which attempts to identify itself as an encoded binary, + * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown. */ public class AMQPMessageIdHelper { @@ -63,11 +68,12 @@ public class AMQPMessageIdHelper { private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); /** - * Takes the provided AMQP messageId style object, and convert it to a base string. - * Encodes type information as a prefix where necessary to convey or escape the type - * of the provided object. + * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes + * type information as a prefix where necessary to convey or escape the type of the provided + * object. * - * @param messageId the raw messageId object to process + * @param messageId + * the raw messageId object to process * @return the base string to be used in creating the actual id. */ public String toBaseMessageIdString(Object messageId) { @@ -106,9 +112,12 @@ public class AMQPMessageIdHelper { * Takes the provided base id string and return the appropriate amqp messageId style object. * Converts the type based on any relevant encoding information found as a prefix. * - * @param baseId the object to be converted to an AMQP MessageId value. + * @param baseId + * the object to be converted to an AMQP MessageId value. * @return the AMQP messageId style object - * @throws ActiveMQAMQPIllegalStateException if the provided baseId String indicates an encoded type but can't be converted to that type. + * @throws ActiveMQAMQPIllegalStateException + * if the provided baseId String indicates an encoded type but can't be converted to + * that type. */ public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException { if (baseId == null) { @@ -143,15 +152,17 @@ public class AMQPMessageIdHelper { * <p> * The hex characters may be upper or lower case. * - * @param hexString string to convert to a binary value. + * @param hexString + * string to convert to a binary value. * @return a byte array containing the binary representation - * @throws IllegalArgumentException if the provided String is a non-even length or contains - * non-hex characters + * @throws IllegalArgumentException + * if the provided String is a non-even length or contains non-hex characters */ public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException { int length = hexString.length(); - // As each byte needs two characters in the hex encoding, the string must be an even length. + // As each byte needs two characters in the hex encoding, the string must be an even + // length. if (length % 2 != 0) { throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString); } @@ -177,7 +188,8 @@ public class AMQPMessageIdHelper { * <p> * The returned hex characters are upper-case. * - * @param bytes the binary value to convert to a hex String instance. + * @param bytes + * the binary value to convert to a hex String instance. * @return a String containing a hex representation of the bytes */ public String convertBinaryToHexString(byte[] bytes) { @@ -198,11 +210,10 @@ public class AMQPMessageIdHelper { return builder.toString(); } - //----- Internal implementation ------------------------------------------// + // ----- Internal implementation ------------------------------------------// private boolean hasTypeEncodingPrefix(String stringId) { - return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || - hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId); + return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId); } private boolean hasAmqpStringPrefix(String stringId) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java new file mode 100644 index 0000000..9eab737 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter.message; + +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +import javax.jms.Destination; +import javax.jms.JMSException; + +import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; +import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.Message; + +/** + * Support class containing constant values and static methods that are used to map to / from + * AMQP Message types being sent or received. + */ +public final class AMQPMessageSupport { + + // Message Properties used to map AMQP to JMS and back + + public static final String JMS_AMQP_PREFIX = "JMS_AMQP_"; + public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length(); + + public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT"; + public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING"; + public static final String NATIVE = "NATIVE"; + public static final String HEADER = "HEADER"; + public static final String PROPERTIES = "PROPERTIES"; + + public static final String FIRST_ACQUIRER = "FirstAcquirer"; + public static final String CONTENT_TYPE = "ContentType"; + public static final String CONTENT_ENCODING = "ContentEncoding"; + public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; + + public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; + public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; + public static final String FOOTER_PREFIX = "FT_"; + + public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; + public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; + public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; + public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT; + public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE; + public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER; + public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE; + public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING; + public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID; + public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX; + public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX; + public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX; + + // Message body type definitions + public static final Binary EMPTY_BINARY = new Binary(new byte[0]); + public static final Data EMPTY_BODY = new Data(EMPTY_BINARY); + + public static final short AMQP_UNKNOWN = 0; + public static final short AMQP_NULL = 1; + public static final short AMQP_DATA = 2; + public static final short AMQP_SEQUENCE = 3; + public static final short AMQP_VALUE_NULL = 4; + public static final short AMQP_VALUE_STRING = 5; + public static final short AMQP_VALUE_BINARY = 6; + public static final short AMQP_VALUE_MAP = 7; + public static final short AMQP_VALUE_LIST = 8; + + /** + * Content type used to mark Data sections as containing a serialized java object. + */ + public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object"; + + /** + * Content type used to mark Data sections as containing arbitrary bytes. + */ + public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; + + /** + * Lookup and return the correct Proton Symbol instance based on the given key. + * + * @param key + * the String value name of the Symbol to locate. + * + * @return the Symbol value that matches the given key. + */ + public static Symbol getSymbol(String key) { + return Symbol.valueOf(key); + } + + /** + * Safe way to access message annotations which will check internal structure and either + * return the annotation if it exists or null if the annotation or any annotations are + * present. + * + * @param key + * the String key to use to lookup an annotation. + * @param message + * the AMQP message object that is being examined. + * + * @return the given annotation value or null if not present in the message. + */ + public static Object getMessageAnnotation(String key, Message message) { + if (message != null && message.getMessageAnnotations() != null) { + Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue(); + return annotations.get(AMQPMessageSupport.getSymbol(key)); + } + + return null; + } + + /** + * Check whether the content-type field of the properties section (if present) in the given + * message matches the provided string (where null matches if there is no content type + * present. + * + * @param contentType + * content type string to compare against, or null if none + * @param message + * the AMQP message object that is being examined. + * + * @return true if content type matches + */ + public static boolean isContentType(String contentType, Message message) { + if (contentType == null) { + return message.getContentType() == null; + } else { + return contentType.equals(message.getContentType()); + } + } + + /** + * @param contentType + * the contentType of the received message + * @return the character set to use, or null if not to treat the message as text + */ + public static Charset getCharsetForTextualContent(String contentType) { + try { + return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType); + } catch (ActiveMQAMQPInvalidContentTypeException e) { + return null; + } + } + + public static ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) { + switch (messageType) { + case STREAM_TYPE: + return new ServerJMSStreamMessage(wrapped, deliveryCount); + case BYTES_TYPE: + return new ServerJMSBytesMessage(wrapped, deliveryCount); + case MAP_TYPE: + return new ServerJMSMapMessage(wrapped, deliveryCount); + case TEXT_TYPE: + return new ServerJMSTextMessage(wrapped, deliveryCount); + case OBJECT_TYPE: + return new ServerJMSObjectMessage(wrapped, deliveryCount); + default: + return new ServerJMSMessage(wrapped, deliveryCount); + } + } + + public static String toAddress(Destination destination) { + if (destination instanceof ActiveMQDestination) { + return ((ActiveMQDestination) destination).getAddress(); + } + return null; + } + + public static ServerJMSBytesMessage createBytesMessage(IDGenerator idGenerator) { + return new ServerJMSBytesMessage(newMessage(idGenerator, BYTES_TYPE), 0); + } + + public static ServerJMSMessage createBytesMessage(IDGenerator idGenerator, byte[] array, int arrayOffset, int length) throws JMSException { + ServerJMSBytesMessage message = createBytesMessage(idGenerator); + message.writeBytes(array, arrayOffset, length); + return message; + } + + public static ServerJMSStreamMessage createStreamMessage(IDGenerator idGenerator) { + return new ServerJMSStreamMessage(newMessage(idGenerator, STREAM_TYPE), 0); + } + + public static ServerJMSMessage createMessage(IDGenerator idGenerator) { + return new ServerJMSMessage(newMessage(idGenerator, DEFAULT_TYPE), 0); + } + + public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator) { + return new ServerJMSTextMessage(newMessage(idGenerator, TEXT_TYPE), 0); + } + + public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator, String text) throws JMSException { + ServerJMSTextMessage message = createTextMessage(idGenerator); + message.setText(text); + return message; + } + + public static ServerJMSObjectMessage createObjectMessage(IDGenerator idGenerator) { + return new ServerJMSObjectMessage(newMessage(idGenerator, OBJECT_TYPE), 0); + } + + public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, Binary serializedForm) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(idGenerator); + message.setSerializedForm(serializedForm); + return message; + } + + public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, byte[] array, int offset, int length) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(idGenerator); + message.setSerializedForm(new Binary(array, offset, length)); + return message; + } + + public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator) { + return new ServerJMSMapMessage(newMessage(idGenerator, MAP_TYPE), 0); + } + + public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator, Map<String, Object> content) throws JMSException { + ServerJMSMapMessage message = createMapMessage(idGenerator); + final Set<Map.Entry<String, Object>> set = content.entrySet(); + for (Map.Entry<String, Object> entry : set) { + Object value = entry.getValue(); + if (value instanceof Binary) { + Binary binary = (Binary) value; + value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength()); + } + message.setObject(entry.getKey(), value); + } + return message; + } + + private static ServerMessageImpl newMessage(IDGenerator idGenerator, byte messageType) { + ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512); + message.setType(messageType); + ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); + return message; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java index 9b0635a..70c755a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java @@ -16,8 +16,12 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; +@Deprecated public class AMQPMessageTypes { + // TODO - Remove in future release as these are no longer used by the + // inbound JMS Transformer. + public static final String AMQP_TYPE_KEY = "amqp:type"; public static final String AMQP_SEQUENCE = "amqp:sequence"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java index 8a5d17c..7028547 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,12 +16,13 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import javax.jms.Message; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.utils.IDGenerator; public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { - public AMQPNativeInboundTransformer(JMSVendor vendor) { - super(vendor); + public AMQPNativeInboundTransformer(IDGenerator idGenerator) { + super(idGenerator); } @Override @@ -31,16 +32,13 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { @Override public InboundTransformer getFallbackTransformer() { - return new AMQPRawInboundTransformer(getVendor()); + return new AMQPRawInboundTransformer(idGenerator); } @Override - public Message transform(EncodedMessage amqpMessage) throws Exception { + public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception { org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); - Message rc = super.transform(amqpMessage); - - populateMessage(rc, amqp); - return rc; + return populateMessage(super.transform(amqpMessage), amqp); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java index ac18a94..8e89bb3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,45 +16,65 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import javax.jms.BytesMessage; +import java.io.UnsupportedEncodingException; + import javax.jms.JMSException; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.ProtonJMessage; public class AMQPNativeOutboundTransformer extends OutboundTransformer { - public AMQPNativeOutboundTransformer(JMSVendor vendor) { - super(vendor); + public AMQPNativeOutboundTransformer(IDGenerator idGenerator) { + super(idGenerator); } - public static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { - byte[] data = new byte[(int) msg.getBodyLength()]; - msg.readBytes(data); - msg.reset(); - int count = msg.getIntProperty("JMSXDeliveryCount"); - - // decode... - ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); - int offset = 0; - int len = data.length; - while (len > 0) { - final int decoded = amqp.decode(data, offset, len); - assert decoded > 0 : "Make progress decoding the message"; - offset += decoded; - len -= decoded; + @Override + public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException { + if (message == null || !(message instanceof ServerJMSBytesMessage)) { + return 0; } - // Update the DeliveryCount header... + return transform(this, (ServerJMSBytesMessage) message, buffer); + } + + public static long transform(OutboundTransformer options, ServerJMSBytesMessage message, WritableBuffer buffer) throws JMSException { + byte[] data = new byte[(int) message.getBodyLength()]; + message.readBytes(data); + message.reset(); + // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. - if (amqp.getHeader() == null) { - amqp.setHeader(new Header()); - } + int amqpDeliveryCount = message.getDeliveryCount() - 1; + if (amqpDeliveryCount >= 1) { - amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); + // decode... + ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); + int offset = 0; + int len = data.length; + while (len > 0) { + final int decoded = amqp.decode(data, offset, len); + assert decoded > 0 : "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + // Update the DeliveryCount header which might require adding a Header + if (amqp.getHeader() == null && amqpDeliveryCount > 0) { + amqp.setHeader(new Header()); + } + + amqp.getHeader().setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); + + amqp.encode(buffer); + } else { + buffer.put(data, 0, data.length); + } - return amqp; + return 0; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java index e6bf171..445eaca 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,14 +16,21 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import javax.jms.BytesMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; + import javax.jms.DeliveryMode; import javax.jms.Message; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.utils.IDGenerator; + public class AMQPRawInboundTransformer extends InboundTransformer { - public AMQPRawInboundTransformer(JMSVendor vendor) { - super(vendor); + public AMQPRawInboundTransformer(IDGenerator idGenerator) { + super(idGenerator); } @Override @@ -37,24 +44,19 @@ public class AMQPRawInboundTransformer extends InboundTransformer { } @Override - public Message transform(EncodedMessage amqpMessage) throws Exception { - BytesMessage rc = vendor.createBytesMessage(); - rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); + public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception { + ServerJMSBytesMessage message = createBytesMessage(idGenerator); + message.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); // We cannot decode the message headers to check so err on the side of caution // and mark all messages as persistent. - rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT); - rc.setJMSPriority(defaultPriority); - - final long now = System.currentTimeMillis(); - rc.setJMSTimestamp(now); - if (defaultTtl > 0) { - rc.setJMSExpiration(now + defaultTtl); - } + message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); + message.setJMSPriority(Message.DEFAULT_PRIORITY); + message.setJMSTimestamp(System.currentTimeMillis()); - rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); - rc.setBooleanProperty(prefixVendor + "NATIVE", true); + message.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat()); + message.setBooleanProperty(JMS_AMQP_NATIVE, true); - return rc; + return message; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java index 4a80ea6..22042da 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java index ff0c035..5094af5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java @@ -16,13 +16,26 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; + import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Set; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Decimal128; import org.apache.qpid.proton.amqp.Decimal32; @@ -38,109 +51,61 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; -import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; - public abstract class InboundTransformer { - JMSVendor vendor; + protected IDGenerator idGenerator; public static final String TRANSFORMER_NATIVE = "native"; public static final String TRANSFORMER_RAW = "raw"; public static final String TRANSFORMER_JMS = "jms"; - String prefixVendor = "JMS_AMQP_"; - String prefixDeliveryAnnotations = "DA_"; - String prefixMessageAnnotations = "MA_"; - String prefixFooter = "FT_"; - - int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT; - int defaultPriority = Message.DEFAULT_PRIORITY; - long defaultTtl = Message.DEFAULT_TIME_TO_LIVE; - - public InboundTransformer(JMSVendor vendor) { - this.vendor = vendor; + public InboundTransformer(IDGenerator idGenerator) { + this.idGenerator = idGenerator; } - public abstract Message transform(EncodedMessage amqpMessage) throws Exception; + public abstract ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception; public abstract String getTransformerName(); public abstract InboundTransformer getFallbackTransformer(); - public int getDefaultDeliveryMode() { - return defaultDeliveryMode; - } - - public void setDefaultDeliveryMode(int defaultDeliveryMode) { - this.defaultDeliveryMode = defaultDeliveryMode; - } - - public int getDefaultPriority() { - return defaultPriority; - } - - public void setDefaultPriority(int defaultPriority) { - this.defaultPriority = defaultPriority; - } - - public long getDefaultTtl() { - return defaultTtl; - } - - public void setDefaultTtl(long defaultTtl) { - this.defaultTtl = defaultTtl; - } - - public String getPrefixVendor() { - return prefixVendor; - } - - public void setPrefixVendor(String prefixVendor) { - this.prefixVendor = prefixVendor; - } - - public JMSVendor getVendor() { - return vendor; - } - - public void setVendor(JMSVendor vendor) { - this.vendor = vendor; - } - - protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + @SuppressWarnings("unchecked") + protected ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception { Header header = amqp.getHeader(); - if (header == null) { - header = new Header(); - } + if (header != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER, true); - if (header.getDurable() != null) { - jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - } else { - jms.setJMSDeliveryMode(defaultDeliveryMode); - } + if (header.getDurable() != null) { + jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } else { + jms.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); + } - if (header.getPriority() != null) { - jms.setJMSPriority(header.getPriority().intValue()); - } else { - jms.setJMSPriority(defaultPriority); - } + if (header.getPriority() != null) { + jms.setJMSPriority(header.getPriority().intValue()); + } else { + jms.setJMSPriority(Message.DEFAULT_PRIORITY); + } - if (header.getFirstAcquirer() != null) { - jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); - } + if (header.getFirstAcquirer() != null) { + jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer()); + } - if (header.getDeliveryCount() != null) { - vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); + if (header.getDeliveryCount() != null) { + // AMQP Delivery Count counts only failed delivers where JMS + // Delivery Count should include the original delivery in the count. + jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1); + } + } else { + jms.setJMSPriority((byte) Message.DEFAULT_PRIORITY); + jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); } final MessageAnnotations ma = amqp.getMessageAnnotations(); if (ma != null) { for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) { String key = entry.getKey().toString(); - if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { - // Legacy annotation, JMSType value will be replaced by Subject further down if also present. - jms.setJMSType(entry.getValue().toString()); - } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { long deliveryTime = ((Number) entry.getValue()).longValue(); jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { @@ -149,41 +114,15 @@ public abstract class InboundTransformer { jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); } } - //todo - /*else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) { - int repeat = ((Number) entry.getValue()).intValue(); - if (repeat > 0) { - jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); - } - } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) { - long period = ((Number) entry.getValue()).longValue(); - if (period > 0) { - jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); - } - } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) { - String cronEntry = (String) entry.getValue(); - if (cronEntry != null) { - jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry); - } - }*/ - setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); + setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); } } final ApplicationProperties ap = amqp.getApplicationProperties(); if (ap != null) { for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) { - String key = entry.getKey().toString(); - if ("JMSXGroupID".equals(key)) { - vendor.setJMSXGroupID(jms, entry.getValue().toString()); - } else if ("JMSXGroupSequence".equals(key)) { - vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue()); - } else if ("JMSXUserID".equals(key)) { - vendor.setJMSXUserID(jms, entry.getValue().toString()); - } else { - setProperty(jms, key, entry.getValue()); - } + setProperty(jms, entry.getKey().toString(), entry.getValue()); } } @@ -194,37 +133,38 @@ public abstract class InboundTransformer { } Binary userId = properties.getUserId(); if (userId != null) { - vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); + // TODO - Better Way to set this? + jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); } if (properties.getTo() != null) { - jms.setJMSDestination(vendor.createDestination(properties.getTo())); + jms.setJMSDestination(new ServerDestination(properties.getTo())); } if (properties.getSubject() != null) { jms.setJMSType(properties.getSubject()); } if (properties.getReplyTo() != null) { - jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); + jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo())); } if (properties.getCorrelationId() != null) { jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); } if (properties.getContentType() != null) { - jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); + jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString()); } if (properties.getContentEncoding() != null) { - jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); + jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString()); } if (properties.getCreationTime() != null) { jms.setJMSTimestamp(properties.getCreationTime().getTime()); } if (properties.getGroupId() != null) { - vendor.setJMSXGroupID(jms, properties.getGroupId()); + jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId()); } if (properties.getGroupSequence() != null) { - vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); + jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue()); } if (properties.getReplyToGroupId() != null) { - jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); + jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId()); } if (properties.getAbsoluteExpiryTime() != null) { jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); @@ -232,9 +172,9 @@ public abstract class InboundTransformer { } // If the jms expiration has not yet been set... - if (jms.getJMSExpiration() == 0) { + if (header != null && jms.getJMSExpiration() == 0) { // Then lets try to set it based on the message ttl. - long ttl = defaultTtl; + long ttl = Message.DEFAULT_TIME_TO_LIVE; if (header.getTtl() != null) { ttl = header.getTtl().longValue(); } @@ -250,9 +190,11 @@ public abstract class InboundTransformer { if (fp != null) { for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) { String key = entry.getKey().toString(); - setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); + setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); } } + + return jms; } private void setProperty(Message msg, String key, Object value) throws JMSException {
