http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java deleted file mode 100644 index 8c4612d..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; -import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE; -import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; -import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; -import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; -import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; - -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Map; -import java.util.Set; - -import javax.jms.Destination; -import javax.jms.JMSException; - -import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.message.Message; - -/** - * Support class containing constant values and static methods that are used to map to / from - * AMQP Message types being sent or received. - */ -public final class AMQPMessageSupport { - - // Message Properties used to map AMQP to JMS and back - - public static final String JMS_AMQP_PREFIX = "JMS_AMQP_"; - public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length(); - - public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT"; - public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING"; - public static final String NATIVE = "NATIVE"; - public static final String HEADER = "HEADER"; - public static final String PROPERTIES = "PROPERTIES"; - - public static final String FIRST_ACQUIRER = "FirstAcquirer"; - public static final String CONTENT_TYPE = "ContentType"; - public static final String CONTENT_ENCODING = "ContentEncoding"; - public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; - public static final String DURABLE = "DURABLE"; - public static final String PRIORITY = "PRIORITY"; - - public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; - public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; - public static final String FOOTER_PREFIX = "FT_"; - - public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; - public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE; - public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY; - public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; - public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; - public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT; - public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE; - public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER; - public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE; - public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING; - public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID; - public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX; - public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX; - public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX; - - // Message body type definitions - public static final Binary EMPTY_BINARY = new Binary(new byte[0]); - public static final Data EMPTY_BODY = new Data(EMPTY_BINARY); - - public static final short AMQP_UNKNOWN = 0; - public static final short AMQP_NULL = 1; - public static final short AMQP_DATA = 2; - public static final short AMQP_SEQUENCE = 3; - public static final short AMQP_VALUE_NULL = 4; - public static final short AMQP_VALUE_STRING = 5; - public static final short AMQP_VALUE_BINARY = 6; - public static final short AMQP_VALUE_MAP = 7; - public static final short AMQP_VALUE_LIST = 8; - - /** - * Content type used to mark Data sections as containing a serialized java object. - */ - public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object"; - - /** - * Content type used to mark Data sections as containing arbitrary bytes. - */ - public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; - - /** - * Lookup and return the correct Proton Symbol instance based on the given key. - * - * @param key - * the String value name of the Symbol to locate. - * - * @return the Symbol value that matches the given key. - */ - public static Symbol getSymbol(String key) { - return Symbol.valueOf(key); - } - - /** - * Safe way to access message annotations which will check internal structure and either - * return the annotation if it exists or null if the annotation or any annotations are - * present. - * - * @param key - * the String key to use to lookup an annotation. - * @param message - * the AMQP message object that is being examined. - * - * @return the given annotation value or null if not present in the message. - */ - public static Object getMessageAnnotation(String key, Message message) { - if (message != null && message.getMessageAnnotations() != null) { - Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue(); - return annotations.get(AMQPMessageSupport.getSymbol(key)); - } - - return null; - } - - /** - * Check whether the content-type field of the properties section (if present) in the given - * message matches the provided string (where null matches if there is no content type - * present. - * - * @param contentType - * content type string to compare against, or null if none - * @param message - * the AMQP message object that is being examined. - * - * @return true if content type matches - */ - public static boolean isContentType(String contentType, Message message) { - if (contentType == null) { - return message.getContentType() == null; - } else { - return contentType.equals(message.getContentType()); - } - } - - /** - * @param contentType - * the contentType of the received message - * @return the character set to use, or null if not to treat the message as text - */ - public static Charset getCharsetForTextualContent(String contentType) { - try { - return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType); - } catch (ActiveMQAMQPInvalidContentTypeException e) { - return null; - } - } - - public static ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) { - switch (messageType) { - case STREAM_TYPE: - return new ServerJMSStreamMessage(wrapped, deliveryCount); - case BYTES_TYPE: - return new ServerJMSBytesMessage(wrapped, deliveryCount); - case MAP_TYPE: - return new ServerJMSMapMessage(wrapped, deliveryCount); - case TEXT_TYPE: - return new ServerJMSTextMessage(wrapped, deliveryCount); - case OBJECT_TYPE: - return new ServerJMSObjectMessage(wrapped, deliveryCount); - default: - return new ServerJMSMessage(wrapped, deliveryCount); - } - } - - public static String toAddress(Destination destination) { - if (destination instanceof ActiveMQDestination) { - return ((ActiveMQDestination) destination).getAddress(); - } - return null; - } - - public static ServerJMSBytesMessage createBytesMessage(IDGenerator idGenerator) { - return new ServerJMSBytesMessage(newMessage(idGenerator, BYTES_TYPE), 0); - } - - public static ServerJMSMessage createBytesMessage(IDGenerator idGenerator, byte[] array, int arrayOffset, int length) throws JMSException { - ServerJMSBytesMessage message = createBytesMessage(idGenerator); - message.writeBytes(array, arrayOffset, length); - return message; - } - - public static ServerJMSStreamMessage createStreamMessage(IDGenerator idGenerator) { - return new ServerJMSStreamMessage(newMessage(idGenerator, STREAM_TYPE), 0); - } - - public static ServerJMSMessage createMessage(IDGenerator idGenerator) { - return new ServerJMSMessage(newMessage(idGenerator, DEFAULT_TYPE), 0); - } - - public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator) { - return new ServerJMSTextMessage(newMessage(idGenerator, TEXT_TYPE), 0); - } - - public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator, String text) throws JMSException { - ServerJMSTextMessage message = createTextMessage(idGenerator); - message.setText(text); - return message; - } - - public static ServerJMSObjectMessage createObjectMessage(IDGenerator idGenerator) { - return new ServerJMSObjectMessage(newMessage(idGenerator, OBJECT_TYPE), 0); - } - - public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, Binary serializedForm) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(idGenerator); - message.setSerializedForm(serializedForm); - return message; - } - - public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, byte[] array, int offset, int length) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(idGenerator); - message.setSerializedForm(new Binary(array, offset, length)); - return message; - } - - public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator) { - return new ServerJMSMapMessage(newMessage(idGenerator, MAP_TYPE), 0); - } - - public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator, Map<String, Object> content) throws JMSException { - ServerJMSMapMessage message = createMapMessage(idGenerator); - final Set<Map.Entry<String, Object>> set = content.entrySet(); - for (Map.Entry<String, Object> entry : set) { - Object value = entry.getValue(); - if (value instanceof Binary) { - Binary binary = (Binary) value; - value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength()); - } - message.setObject(entry.getKey(), value); - } - return message; - } - - private static ServerMessageImpl newMessage(IDGenerator idGenerator, byte messageType) { - ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512); - message.setType(messageType); - ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); - return message; - } -}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java deleted file mode 100644 index 70c755a..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -@Deprecated -public class AMQPMessageTypes { - - // TODO - Remove in future release as these are no longer used by the - // inbound JMS Transformer. - - public static final String AMQP_TYPE_KEY = "amqp:type"; - - public static final String AMQP_SEQUENCE = "amqp:sequence"; - - public static final String AMQP_LIST = "amqp:list"; -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java deleted file mode 100644 index 7028547..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; - -public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { - - public AMQPNativeInboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public String getTransformerName() { - return TRANSFORMER_NATIVE; - } - - @Override - public InboundTransformer getFallbackTransformer() { - return new AMQPRawInboundTransformer(idGenerator); - } - - @Override - public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception { - org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); - - return populateMessage(super.transform(amqpMessage), amqp); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java deleted file mode 100644 index 8e89bb3..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.io.UnsupportedEncodingException; - -import javax.jms.JMSException; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.message.ProtonJMessage; - -public class AMQPNativeOutboundTransformer extends OutboundTransformer { - - public AMQPNativeOutboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException { - if (message == null || !(message instanceof ServerJMSBytesMessage)) { - return 0; - } - - return transform(this, (ServerJMSBytesMessage) message, buffer); - } - - public static long transform(OutboundTransformer options, ServerJMSBytesMessage message, WritableBuffer buffer) throws JMSException { - byte[] data = new byte[(int) message.getBodyLength()]; - message.readBytes(data); - message.reset(); - - // The AMQP delivery-count field only includes prior failed delivery attempts, - int amqpDeliveryCount = message.getDeliveryCount() - 1; - if (amqpDeliveryCount >= 1) { - - // decode... - ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); - int offset = 0; - int len = data.length; - while (len > 0) { - final int decoded = amqp.decode(data, offset, len); - assert decoded > 0 : "Make progress decoding the message"; - offset += decoded; - len -= decoded; - } - - // Update the DeliveryCount header which might require adding a Header - if (amqp.getHeader() == null && amqpDeliveryCount > 0) { - amqp.setHeader(new Header()); - } - - amqp.getHeader().setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); - - amqp.encode(buffer); - } else { - buffer.put(data, 0, data.length); - } - - return 0; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java deleted file mode 100644 index 445eaca..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; - -import javax.jms.DeliveryMode; -import javax.jms.Message; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; - -public class AMQPRawInboundTransformer extends InboundTransformer { - - public AMQPRawInboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public String getTransformerName() { - return TRANSFORMER_RAW; - } - - @Override - public InboundTransformer getFallbackTransformer() { - return null; // No fallback from full raw transform - } - - @Override - public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception { - ServerJMSBytesMessage message = createBytesMessage(idGenerator); - message.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); - - // We cannot decode the message headers to check so err on the side of caution - // and mark all messages as persistent. - message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); - message.setJMSPriority(Message.DEFAULT_PRIORITY); - message.setJMSTimestamp(System.currentTimeMillis()); - - message.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat()); - message.setBooleanProperty(JMS_AMQP_NATIVE, true); - - return message; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java deleted file mode 100644 index 22042da..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.message.Message; - -public class EncodedMessage { - - private final Binary data; - final long messageFormat; - - public EncodedMessage(long messageFormat, byte[] data, int offset, int length) { - this.data = new Binary(data, offset, length); - this.messageFormat = messageFormat; - } - - public long getMessageFormat() { - return messageFormat; - } - - public Message decode() throws Exception { - Message amqp = Message.Factory.create(); - - int offset = getArrayOffset(); - int len = getLength(); - while (len > 0) { - final int decoded = amqp.decode(getArray(), offset, len); - assert decoded > 0 : "Make progress decoding the message"; - offset += decoded; - len -= decoded; - } - - return amqp; - } - - public int getLength() { - return data.getLength(); - } - - public int getArrayOffset() { - return data.getArrayOffset(); - } - - public byte[] getArray() { - return data.getArray(); - } - - @Override - public String toString() { - return data.toString(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java deleted file mode 100644 index 1316ab7..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; - -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Set; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Decimal128; -import org.apache.qpid.proton.amqp.Decimal32; -import org.apache.qpid.proton.amqp.Decimal64; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedByte; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.UnsignedLong; -import org.apache.qpid.proton.amqp.UnsignedShort; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; - -public abstract class InboundTransformer { - - protected IDGenerator idGenerator; - - public static final String TRANSFORMER_NATIVE = "native"; - public static final String TRANSFORMER_RAW = "raw"; - public static final String TRANSFORMER_JMS = "jms"; - - public InboundTransformer(IDGenerator idGenerator) { - this.idGenerator = idGenerator; - } - - public abstract ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception; - - public abstract String getTransformerName(); - - public abstract InboundTransformer getFallbackTransformer(); - - @SuppressWarnings("unchecked") - protected ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception { - Header header = amqp.getHeader(); - if (header != null) { - jms.setBooleanProperty(JMS_AMQP_HEADER, true); - - if (header.getDurable() != null) { - jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true); - jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - } else { - jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); - } - - if (header.getPriority() != null) { - jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true); - jms.setJMSPriority(header.getPriority().intValue()); - } else { - jms.setJMSPriority(Message.DEFAULT_PRIORITY); - } - - if (header.getFirstAcquirer() != null) { - jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer()); - } - - if (header.getDeliveryCount() != null) { - // AMQP Delivery Count counts only failed delivers where JMS - // Delivery Count should include the original delivery in the count. - jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1); - } - } else { - jms.setJMSPriority((byte) Message.DEFAULT_PRIORITY); - jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); - } - - final MessageAnnotations ma = amqp.getMessageAnnotations(); - if (ma != null) { - for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) { - String key = entry.getKey().toString(); - if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { - long deliveryTime = ((Number) entry.getValue()).longValue(); - jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); - } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { - long delay = ((Number) entry.getValue()).longValue(); - if (delay > 0) { - jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); - } - } - - setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); - } - } - - final ApplicationProperties ap = amqp.getApplicationProperties(); - if (ap != null) { - for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) { - setProperty(jms, entry.getKey().toString(), entry.getValue()); - } - } - - final Properties properties = amqp.getProperties(); - if (properties != null) { - if (properties.getMessageId() != null) { - jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId())); - } - Binary userId = properties.getUserId(); - if (userId != null) { - // TODO - Better Way to set this? - jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); - } - if (properties.getTo() != null) { - jms.setJMSDestination(new ServerDestination(properties.getTo())); - } - if (properties.getSubject() != null) { - jms.setJMSType(properties.getSubject()); - } - if (properties.getReplyTo() != null) { - jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo())); - } - if (properties.getCorrelationId() != null) { - jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); - } - if (properties.getContentType() != null) { - jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString()); - } - if (properties.getContentEncoding() != null) { - jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString()); - } - if (properties.getCreationTime() != null) { - jms.setJMSTimestamp(properties.getCreationTime().getTime()); - } - if (properties.getGroupId() != null) { - jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId()); - } - if (properties.getGroupSequence() != null) { - jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue()); - } - if (properties.getReplyToGroupId() != null) { - jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId()); - } - if (properties.getAbsoluteExpiryTime() != null) { - jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); - } - } - - // If the jms expiration has not yet been set... - if (header != null && jms.getJMSExpiration() == 0) { - // Then lets try to set it based on the message ttl. - long ttl = Message.DEFAULT_TIME_TO_LIVE; - if (header.getTtl() != null) { - ttl = header.getTtl().longValue(); - } - - if (ttl == 0) { - jms.setJMSExpiration(0); - } else { - jms.setJMSExpiration(System.currentTimeMillis() + ttl); - } - } - - final Footer fp = amqp.getFooter(); - if (fp != null) { - for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) { - String key = entry.getKey().toString(); - setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); - } - } - - return jms; - } - - private void setProperty(Message msg, String key, Object value) throws JMSException { - if (value instanceof UnsignedLong) { - long v = ((UnsignedLong) value).longValue(); - msg.setLongProperty(key, v); - } else if (value instanceof UnsignedInteger) { - long v = ((UnsignedInteger) value).longValue(); - if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { - msg.setIntProperty(key, (int) v); - } else { - msg.setLongProperty(key, v); - } - } else if (value instanceof UnsignedShort) { - int v = ((UnsignedShort) value).intValue(); - if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { - msg.setShortProperty(key, (short) v); - } else { - msg.setIntProperty(key, v); - } - } else if (value instanceof UnsignedByte) { - short v = ((UnsignedByte) value).shortValue(); - if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { - msg.setByteProperty(key, (byte) v); - } else { - msg.setShortProperty(key, v); - } - } else if (value instanceof Symbol) { - msg.setStringProperty(key, value.toString()); - } else if (value instanceof Decimal128) { - msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); - } else if (value instanceof Decimal64) { - msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); - } else if (value instanceof Decimal32) { - msg.setFloatProperty(key, ((Decimal32) value).floatValue()); - } else if (value instanceof Binary) { - msg.setStringProperty(key, value.toString()); - } else { - msg.setObjectProperty(key, value); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java deleted file mode 100644 index 629c499..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType; - -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.messaging.AmqpSequence; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.message.Message; - -public class JMSMappingInboundTransformer extends InboundTransformer { - - public JMSMappingInboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public String getTransformerName() { - return TRANSFORMER_JMS; - } - - @Override - public InboundTransformer getFallbackTransformer() { - return new AMQPNativeInboundTransformer(idGenerator); - } - - @Override - public ServerJMSMessage transform(EncodedMessage encodedMessage) throws Exception { - ServerJMSMessage transformedMessage = null; - - try { - Message amqpMessage = encodedMessage.decode(); - transformedMessage = createServerMessage(amqpMessage); - populateMessage(transformedMessage, amqpMessage); - } catch (Exception ex) { - InboundTransformer transformer = this.getFallbackTransformer(); - - while (transformer != null) { - try { - transformedMessage = transformer.transform(encodedMessage); - break; - } catch (Exception e) { - transformer = transformer.getFallbackTransformer(); - } - } - } - - // Regardless of the transformer that finally decoded the message we need to ensure that - // the AMQP Message Format value is preserved for application on retransmit. - if (transformedMessage != null && encodedMessage.getMessageFormat() != 0) { - transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, encodedMessage.getMessageFormat()); - } - - return transformedMessage; - } - - @SuppressWarnings("unchecked") - private ServerJMSMessage createServerMessage(Message message) throws Exception { - - Section body = message.getBody(); - ServerJMSMessage result; - - if (body == null) { - if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = createObjectMessage(idGenerator); - } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) { - result = createBytesMessage(idGenerator); - } else { - Charset charset = getCharsetForTextualContent(message.getContentType()); - if (charset != null) { - result = createTextMessage(idGenerator); - } else { - result = createMessage(idGenerator); - } - } - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); - } else if (body instanceof Data) { - Binary payload = ((Data) body).getValue(); - - if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = createObjectMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } else { - Charset charset = getCharsetForTextualContent(message.getContentType()); - if (StandardCharsets.UTF_8.equals(charset)) { - ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength()); - - try { - CharBuffer chars = charset.newDecoder().decode(buf); - result = createTextMessage(idGenerator, String.valueOf(chars)); - } catch (CharacterCodingException e) { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } - } else { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } - } - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); - } else if (body instanceof AmqpSequence) { - AmqpSequence sequence = (AmqpSequence) body; - ServerJMSStreamMessage m = createStreamMessage(idGenerator); - for (Object item : sequence.getValue()) { - m.writeObject(item); - } - - result = m; - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); - } else if (body instanceof AmqpValue) { - Object value = ((AmqpValue) body).getValue(); - if (value == null || value instanceof String) { - result = createTextMessage(idGenerator, (String) value); - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING); - } else if (value instanceof Binary) { - Binary payload = (Binary) value; - - if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = createObjectMessage(idGenerator, payload); - } else { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); - } else if (value instanceof List) { - ServerJMSStreamMessage m = createStreamMessage(idGenerator); - for (Object item : (List<Object>) value) { - m.writeObject(item); - } - result = m; - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST); - } else if (value instanceof Map) { - result = createMapMessage(idGenerator, (Map<String, Object>) value); - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP); - } else { - // Trigger fall-back to native encoder which generates BytesMessage with the - // original message stored in the message body. - throw new ActiveMQAMQPInternalErrorException("Unable to encode to ActiveMQ JMS Message"); - } - } else { - throw new RuntimeException("Unexpected body type: " + body.getClass()); - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java deleted file mode 100644 index 7dbc6d4..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ /dev/null @@ -1,592 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; -import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress; - -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Date; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageEOFException; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; -import org.apache.activemq.artemis.reader.MessageUtil; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedByte; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.AmqpSequence; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.AMQPDefinedTypes; -import org.apache.qpid.proton.codec.DecoderImpl; -import org.apache.qpid.proton.codec.EncoderImpl; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.jboss.logging.Logger; - -public class JMSMappingOutboundTransformer extends OutboundTransformer { - - private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class); - - public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); - public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); - - public static final byte QUEUE_TYPE = 0x00; - public static final byte TOPIC_TYPE = 0x01; - public static final byte TEMP_QUEUE_TYPE = 0x02; - public static final byte TEMP_TOPIC_TYPE = 0x03; - - // For now Proton requires that we create a decoder to create an encoder - private static class EncoderDecoderPair { - DecoderImpl decoder = new DecoderImpl(); - EncoderImpl encoder = new EncoderImpl(decoder); - { - AMQPDefinedTypes.registerAllTypes(decoder, encoder); - } - } - - private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() { - @Override - protected EncoderDecoderPair initialValue() { - return new EncoderDecoderPair(); - } - }; - - public JMSMappingOutboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException { - if (message == null) { - return 0; - } - - long messageFormat = 0; - Header header = null; - Properties properties = null; - Map<Symbol, Object> daMap = null; - Map<Symbol, Object> maMap = null; - Map<String, Object> apMap = null; - Map<Object, Object> footerMap = null; - - Section body = convertBody(message); - - if (message.getInnerMessage().isDurable()) { - if (header == null) { - header = new Header(); - } - header.setDurable(true); - } - byte priority = (byte) message.getJMSPriority(); - if (priority != Message.DEFAULT_PRIORITY) { - if (header == null) { - header = new Header(); - } - header.setPriority(UnsignedByte.valueOf(priority)); - } - String type = message.getJMSType(); - if (type != null) { - if (properties == null) { - properties = new Properties(); - } - properties.setSubject(type); - } - String messageId = message.getJMSMessageID(); - if (messageId != null) { - if (properties == null) { - properties = new Properties(); - } - try { - properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId)); - } catch (ActiveMQAMQPIllegalStateException e) { - properties.setMessageId(messageId); - } - } - Destination destination = message.getJMSDestination(); - if (destination != null) { - if (properties == null) { - properties = new Properties(); - } - properties.setTo(toAddress(destination)); - if (maMap == null) { - maMap = new HashMap<>(); - } - maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); - } - Destination replyTo = message.getJMSReplyTo(); - if (replyTo != null) { - if (properties == null) { - properties = new Properties(); - } - properties.setReplyTo(toAddress(replyTo)); - if (maMap == null) { - maMap = new HashMap<>(); - } - maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); - } - String correlationId = message.getJMSCorrelationID(); - if (correlationId != null) { - if (properties == null) { - properties = new Properties(); - } - try { - properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); - } catch (ActiveMQAMQPIllegalStateException e) { - properties.setCorrelationId(correlationId); - } - } - long expiration = message.getJMSExpiration(); - if (expiration != 0) { - long ttl = expiration - System.currentTimeMillis(); - if (ttl < 0) { - ttl = 1; - } - - if (header == null) { - header = new Header(); - } - header.setTtl(new UnsignedInteger((int) ttl)); - - if (properties == null) { - properties = new Properties(); - } - properties.setAbsoluteExpiryTime(new Date(expiration)); - } - long timeStamp = message.getJMSTimestamp(); - if (timeStamp != 0) { - if (properties == null) { - properties = new Properties(); - } - properties.setCreationTime(new Date(timeStamp)); - } - - final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage()); - for (String key : keySet) { - if (key.startsWith("JMSX")) { - if (key.equals("JMSXDeliveryCount")) { - // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. - int amqpDeliveryCount = message.getDeliveryCount() - 1; - if (amqpDeliveryCount > 0) { - if (header == null) { - header = new Header(); - } - header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); - } - continue; - } else if (key.equals("JMSXUserID")) { - String value = message.getStringProperty(key); - if (properties == null) { - properties = new Properties(); - } - properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8))); - continue; - } else if (key.equals("JMSXGroupID")) { - String value = message.getStringProperty(key); - if (properties == null) { - properties = new Properties(); - } - properties.setGroupId(value); - continue; - } else if (key.equals("JMSXGroupSeq")) { - UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key)); - if (properties == null) { - properties = new Properties(); - } - properties.setGroupSequence(value); - continue; - } - } else if (key.startsWith(JMS_AMQP_PREFIX)) { - // AMQP Message Information stored from a conversion to the Core Message - if (key.equals(JMS_AMQP_MESSAGE_FORMAT)) { - messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT); - continue; - } else if (key.equals(JMS_AMQP_NATIVE)) { - // skip..internal use only - continue; - } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) { - // skip..internal use only - continue; - } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { - if (header == null) { - header = new Header(); - } - header.setFirstAcquirer(message.getBooleanProperty(key)); - continue; - } else if (key.equals(JMS_AMQP_HEADER)) { - if (header == null) { - header = new Header(); - } - continue; - } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) { - if (header == null) { - header = new Header(); - } - header.setDurable(message.getInnerMessage().isDurable()); - continue; - } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) { - if (header == null) { - header = new Header(); - } - header.setPriority(UnsignedByte.valueOf(priority)); - continue; - } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { - if (properties == null) { - properties = new Properties(); - } - continue; - } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) { - if (daMap == null) { - daMap = new HashMap<>(); - } - String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); - daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - continue; - } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) { - if (maMap == null) { - maMap = new HashMap<>(); - } - String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); - maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - continue; - } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) { - if (properties == null) { - properties = new Properties(); - } - properties.setContentType(Symbol.getSymbol(message.getStringProperty(key))); - continue; - } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) { - if (properties == null) { - properties = new Properties(); - } - properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key))); - continue; - } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) { - if (properties == null) { - properties = new Properties(); - } - properties.setReplyToGroupId(message.getStringProperty(key)); - continue; - } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) { - if (footerMap == null) { - footerMap = new HashMap<>(); - } - String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); - footerMap.put(name, message.getObjectProperty(key)); - continue; - } - } else if (key.equals("_AMQ_GROUP_ID")) { - String value = message.getStringProperty(key); - if (properties == null) { - properties = new Properties(); - } - properties.setGroupId(value); - continue; - } else if (key.equals(NATIVE_MESSAGE_ID)) { - // skip..internal use only - continue; - } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) { - // skip..remove annotation from previous inbound transformation - continue; - } else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) { - // skip..internal use only - TODO - Remove this deprecated value in future release. - continue; - } - - if (apMap == null) { - apMap = new HashMap<>(); - } - - Object objectProperty = message.getObjectProperty(key); - if (objectProperty instanceof byte[]) { - objectProperty = new Binary((byte[]) objectProperty); - } - - apMap.put(key, objectProperty); - } - - EncoderImpl encoder = tlsCodec.get().encoder; - encoder.setByteBuffer(buffer); - - if (header != null) { - encoder.writeObject(header); - } - if (daMap != null) { - encoder.writeObject(new DeliveryAnnotations(daMap)); - } - if (maMap != null) { - encoder.writeObject(new MessageAnnotations(maMap)); - } - if (properties != null) { - encoder.writeObject(properties); - } - if (apMap != null) { - encoder.writeObject(new ApplicationProperties(apMap)); - } - if (body != null) { - encoder.writeObject(body); - } - if (footerMap != null) { - encoder.writeObject(new Footer(footerMap)); - } - - return messageFormat; - } - - private Section convertBody(ServerJMSMessage message) throws JMSException { - - Section body = null; - short orignalEncoding = AMQP_UNKNOWN; - - try { - orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING); - } catch (Exception ex) { - // Ignore and stick with UNKNOWN - } - - if (message instanceof ServerJMSBytesMessage) { - Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message); - - if (payload == null) { - payload = EMPTY_BINARY; - } - - switch (orignalEncoding) { - case AMQP_NULL: - break; - case AMQP_VALUE_BINARY: - body = new AmqpValue(payload); - break; - case AMQP_DATA: - case AMQP_UNKNOWN: - default: - body = new Data(payload); - break; - } - } else if (message instanceof ServerJMSTextMessage) { - switch (orignalEncoding) { - case AMQP_NULL: - break; - case AMQP_DATA: - body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message)); - break; - case AMQP_VALUE_STRING: - case AMQP_UNKNOWN: - default: - body = new AmqpValue(((TextMessage) message).getText()); - break; - } - } else if (message instanceof ServerJMSMapMessage) { - body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message)); - } else if (message instanceof ServerJMSStreamMessage) { - ArrayList<Object> list = new ArrayList<>(); - final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message; - try { - while (true) { - list.add(m.readObject()); - } - } catch (MessageEOFException e) { - } - - // Deprecated encoding markers - TODO - Remove on future release - if (orignalEncoding == AMQP_UNKNOWN) { - String amqpType = message.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY); - if (amqpType != null) { - if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) { - orignalEncoding = AMQP_VALUE_LIST; - } else { - orignalEncoding = AMQP_SEQUENCE; - } - } - } - - switch (orignalEncoding) { - case AMQP_SEQUENCE: - body = new AmqpSequence(list); - break; - case AMQP_VALUE_LIST: - case AMQP_UNKNOWN: - default: - body = new AmqpValue(list); - break; - } - } else if (message instanceof ServerJMSObjectMessage) { - Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message); - - if (payload == null) { - payload = EMPTY_BINARY; - } - - switch (orignalEncoding) { - case AMQP_VALUE_BINARY: - body = new AmqpValue(payload); - break; - case AMQP_DATA: - case AMQP_UNKNOWN: - default: - body = new Data(payload); - break; - } - - // For a non-AMQP message we tag the outbound content type as containing - // a serialized Java object so that an AMQP client has a hint as to what - // we are sending it. - if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) { - message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); - } - } else if (message instanceof ServerJMSMessage) { - // If this is not an AMQP message that was converted then the original encoding - // will be unknown so we check for special cases of messages with special data - // encoded into the server message body. - if (orignalEncoding == AMQP_UNKNOWN) { - MessageInternal internalMessage = message.getInnerMessage(); - int readerIndex = internalMessage.getBodyBuffer().readerIndex(); - try { - Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); - if (s != null) { - body = new AmqpValue(s.toString()); - } - } catch (Throwable ignored) { - logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored); - } finally { - internalMessage.getBodyBuffer().readerIndex(readerIndex); - } - } - } - - return body; - } - - private Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException { - byte[] data = new byte[(int) message.getBodyLength()]; - message.readBytes(data); - message.reset(); // Need to reset after readBytes or future readBytes - - return new Binary(data); - } - - private Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException { - Binary result = null; - String text = message.getText(); - if (text != null) { - result = new Binary(text.getBytes(StandardCharsets.UTF_8)); - } - - return result; - } - - private Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { - message.getInnerMessage().getBodyBuffer().resetReaderIndex(); - int size = message.getInnerMessage().getBodyBuffer().readInt(); - byte[] bytes = new byte[size]; - message.getInnerMessage().getBodyBuffer().readBytes(bytes); - - return new Binary(bytes); - } - - private Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException { - final HashMap<String, Object> map = new LinkedHashMap<>(); - - @SuppressWarnings("unchecked") - final Enumeration<String> names = message.getMapNames(); - while (names.hasMoreElements()) { - String key = names.nextElement(); - Object value = message.getObject(key); - if (value instanceof byte[]) { - value = new Binary((byte[]) value); - } - map.put(key, value); - } - - return map; - } - - private static byte destinationType(Destination destination) { - if (destination instanceof Queue) { - if (destination instanceof TemporaryQueue) { - return TEMP_QUEUE_TYPE; - } else { - return QUEUE_TYPE; - } - } else if (destination instanceof Topic) { - if (destination instanceof TemporaryTopic) { - return TEMP_TOPIC_TYPE; - } else { - return TOPIC_TYPE; - } - } - - throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java deleted file mode 100644 index 5113513..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.io.UnsupportedEncodingException; - -import javax.jms.JMSException; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.codec.WritableBuffer; - -public abstract class OutboundTransformer { - - protected IDGenerator idGenerator; - - public OutboundTransformer(IDGenerator idGenerator) { - this.idGenerator = idGenerator; - } - - /** - * Given an JMS Message perform a conversion to an AMQP Message and encode into a form that - * is ready for transmission. - * - * @param message - * the message to transform - * @param buffer - * the buffer where encoding should write to - * - * @return the message format key of the encoded message. - * - * @throws JMSException - * if an error occurs during message transformation - * @throws UnsupportedEncodingException - * if an error occurs during message encoding - */ - public abstract long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException; - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 6462315..bac3e7e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -134,6 +134,10 @@ public class AMQPConnectionContext extends ProtonInitializable { handler.flush(); } + public void flush(boolean wait) { + handler.flush(wait); + } + public void close(ErrorCondition errorCondition) { handler.close(errorCondition); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 8341de7..ea2635e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -134,6 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { Receiver receiver; + ByteBuf buffer = null; try { receiver = ((Receiver) delivery.getLink()); @@ -144,26 +145,30 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (delivery.isPartial()) { return; } + // This should be used if getDataLength was avilable +// byte[] data = new byte[delivery.getDataLength()]; - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024); - try { - synchronized (connection.getLock()) { - DeliveryUtil.readDelivery(receiver, buffer); + buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024); + Transaction tx = null; - receiver.advance(); + synchronized (connection.getLock()) { + DeliveryUtil.readDelivery(receiver, buffer); + receiver.advance(); + } - Transaction tx = null; - if (delivery.getRemoteState() instanceof TransactionalState) { + byte[] data = new byte[buffer.writerIndex()]; + buffer.readBytes(data); - TransactionalState txState = (TransactionalState) delivery.getRemoteState(); - tx = this.sessionSPI.getTransaction(txState.getTxnId()); - } - sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer); + if (delivery.getRemoteState() instanceof TransactionalState) { - flow(maxCreditAllocation, minCreditRefresh); - } - } finally { - buffer.release(); + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + tx = this.sessionSPI.getTransaction(txState.getTxnId()); + } + + sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); + + synchronized (connection.getLock()) { + flow(maxCreditAllocation, minCreditRefresh); } } catch (Exception e) { log.warn(e.getMessage(), e); @@ -174,6 +179,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements rejected.setError(condition); delivery.disposition(rejected); delivery.settle(); + } finally { + if (buffer != null) { + buffer.release(); + } } }
