QPID-7434: [Java Broker] Improve AMQP 0-x to Internal conversion and add unit tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/939cda5b Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/939cda5b Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/939cda5b Branch: refs/heads/master Commit: 939cda5bf529bc0ecf911f6af586eb48573bbcca Parents: 385167e Author: Lorenz Quack <[email protected]> Authored: Thu Aug 10 17:15:41 2017 +0100 Committer: Lorenz Quack <[email protected]> Committed: Fri Aug 11 15:14:53 2017 +0100 ---------------------------------------------------------------------- .../mimecontentconverter/ConversionUtils.java | 34 ++ .../mimecontentconverter/TextPlainToString.java | 2 +- .../mimecontentconverter/TextXmlToString.java | 2 +- .../JmsMapMessageToMap.java | 6 + .../JmsStreamMessageToList.java | 6 + .../MessageConverter_v0_10_to_Internal.java | 97 ++++- .../AmqpListToListConverter.java | 6 + .../AmqpMapToMapConverter.java | 6 + .../MessageConverter_0_10_to_InternalTest.java | 416 +++++++++++++++++++ broker-plugins/amqp-0-8-protocol/pom.xml | 6 + .../v0_8/MessageConverter_v0_8_to_Internal.java | 95 ++++- .../MessageConverter_0_8_to_InternalTest.java | 414 ++++++++++++++++++ .../v1_0/MessageConverter_from_1_0.java | 19 +- .../protocol/v1_0/MessageConverter_to_1_0.java | 34 +- .../v1_0/MessageConverter_v1_0_to_Internal.java | 21 +- .../MessageConverter_0_10_to_1_0Test.java | 5 +- 16 files changed, 1112 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java new file mode 100644 index 0000000..98a9ba8 --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.message.mimecontentconverter; + +import java.util.regex.Pattern; + +public class ConversionUtils +{ + public static final Pattern + TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$"); + public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$"); + public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$"); + public static final Pattern + OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$"); + public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$"); +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java index d71882a..01e8ca9 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java @@ -48,6 +48,6 @@ public class TextPlainToString implements MimeContentToObjectConverter<String> @Override public String toObject(final byte[] data) { - return new String(data, StandardCharsets.UTF_8); + return data == null ? "" : new String(data, StandardCharsets.UTF_8); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java index aeba9dd..97026d5 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java @@ -48,6 +48,6 @@ public class TextXmlToString implements MimeContentToObjectConverter<String> @Override public String toObject(final byte[] data) { - return new String(data, StandardCharsets.UTF_8); + return data == null ? "" : new String(data, StandardCharsets.UTF_8); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java index 37b1db7..76db633 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java +++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.typedmessage.mimecontentconverter; import java.io.EOFException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -54,6 +55,11 @@ public class JmsMapMessageToMap implements MimeContentToObjectConverter<Map> @Override public Map toObject(final byte[] data) { + if (data == null || data.length == 0) + { + return Collections.emptyMap(); + } + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); LinkedHashMap map = new LinkedHashMap(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java index 7be7d9f..657f66e 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java +++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.typedmessage.mimecontentconverter; import java.io.EOFException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; @@ -54,6 +55,11 @@ public class JmsStreamMessageToList implements MimeContentToObjectConverter<List @Override public List toObject(final byte[] data) { + if (data == null || data.length == 0) + { + return Collections.emptyList(); + } + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); List<Object> list = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java index e507ce2..6868b14 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java @@ -20,13 +20,24 @@ */ package org.apache.qpid.server.protocol.v0_10; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.LIST_MESSAGE_CONTENT_TYPES; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.MAP_MESSAGE_CONTENT_TYPES; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES; + import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.message.internal.InternalMessageHeader; +import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils; import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry; import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; import org.apache.qpid.server.model.NamedAddressSpace; @@ -75,9 +86,8 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess } Object body = convertMessageBody(mimeType, data); - MessageProperties messageProps = serverMessage.getHeader().getMessageProperties(); - AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo(), encoding); - return InternalMessage.convert(serverMessage, fixedHeader, body); + final AMQMessageHeader convertedHeader = convertHeader(serverMessage, addressSpace, body, encoding); + return InternalMessage.convert(serverMessage, convertedHeader, body); } @Override @@ -86,6 +96,55 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess } + private AMQMessageHeader convertHeader(final MessageTransferMessage serverMessage, + final NamedAddressSpace addressSpace, + final Object convertedBodyObject, final String encoding) + { + final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject); + final AMQMessageHeader messageHeader = serverMessage.getMessageHeader(); + + Map<String, Object> headers = new HashMap<>(); + messageHeader.getHeaderNames() + .forEach(headerName -> headers.put(headerName, messageHeader.getHeader(headerName))); + + final InternalMessageHeader header = new InternalMessageHeader(headers, + messageHeader.getCorrelationId(), + messageHeader.getExpiration(), + messageHeader.getUserId(), + messageHeader.getAppId(), + messageHeader.getMessageId(), + convertedMimeType, + messageHeader.getEncoding(), + messageHeader.getPriority(), + messageHeader.getTimestamp(), + messageHeader.getNotValidBefore(), + messageHeader.getType(), + messageHeader.getReplyTo(), + serverMessage.getArrivalTime()); + MessageProperties messageProps = serverMessage.getHeader().getMessageProperties(); + final ReplyTo replyTo = messageProps == null ? null : messageProps.getReplyTo(); + return new DelegatingMessageHeader(header, replyTo, encoding); + } + + private String getInternalConvertedMimeType(final MessageTransferMessage serverMessage, final Object convertedBodyObject) + { + String originalMimeType = serverMessage.getMessageHeader().getMimeType(); + if (originalMimeType != null) + { + if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches() + || ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()) + { + return null; + } + else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()) + { + return "application/x-java-serialized-object"; + } + } + + return originalMimeType; + } + private static class DelegatingMessageHeader implements AMQMessageHeader { private final AMQMessageHeader _delegate; @@ -206,14 +265,38 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess private static Object convertMessageBody(String mimeType, byte[] data) { MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType); - if (converter != null) + if (data != null && data.length != 0) + { + if (converter != null) + { + return converter.toObject(data); + } + else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches()) + { + return new String(data, UTF_8); + } + } + else if (mimeType == null) + { + return null; + } + else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) + { + return new byte[0]; + } + else if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(mimeType).matches()) + { + return ""; + } + else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) { - return converter.toObject(data); + return Collections.emptyMap(); } - else + else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) { - return data; + return Collections.emptyList(); } + return data; } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java index 8139db1..1cfb640 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; @@ -51,6 +52,11 @@ public class AmqpListToListConverter implements MimeContentToObjectConverter<Lis @Override public List toObject(final byte[] data) { + if (data == null || data.length == 0) + { + return Collections.emptyList(); + } + BBDecoder decoder = new BBDecoder(); decoder.init(ByteBuffer.wrap(data)); return decoder.readList(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java index 4280481..ce9d098 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; @@ -51,6 +52,11 @@ public class AmqpMapToMapConverter implements MimeContentToObjectConverter<Map> @Override public Map toObject(final byte[] data) { + if (data == null || data.length == 0) + { + return Collections.emptyMap(); + } + BBDecoder decoder = new BBDecoder(); decoder.init(ByteBuffer.wrap(data)); return decoder.readMap(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java new file mode 100644 index 0000000..5a28f4f --- /dev/null +++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.protocol.v0_10.transport.Header; +import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; +import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter; +import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.test.utils.QpidTestCase; + +public class MessageConverter_0_10_to_InternalTest extends QpidTestCase +{ + private final MessageConverter_v0_10_to_Internal _converter = new MessageConverter_v0_10_to_Internal(); + + private final StoredMessage<MessageMetaData_0_10> _handle = mock(StoredMessage.class); + + private final MessageMetaData_0_10 _metaData = mock(MessageMetaData_0_10.class); + private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class); + private final Header _header = mock(Header.class); + private MessageProperties _messageProperties; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _messageProperties = new MessageProperties(); + + when(_handle.getMetaData()).thenReturn(_metaData); + when(_header.getMessageProperties()).thenReturn(_messageProperties); + when(_metaData.getHeader()).thenReturn(_header); + when(_metaData.getMessageHeader()).thenReturn(_amqpHeader); + when(_metaData.getMessageProperties()).thenReturn(_messageProperties); + } + + public void testConvertStringMessageBody() throws Exception + { + doTestTextMessage("helloworld", "text/plain"); + } + + public void testConvertEmptyStringMessageBody() throws Exception + { + doTestTextMessage(null, "text/plain"); + } + + public void testConvertStringXmlMessageBody() throws Exception + { + doTestTextMessage("<helloworld></helloworld>", "text/xml"); + } + + public void testConvertEmptyStringXmlMessageBody() throws Exception + { + doTestTextMessage(null, "text/xml"); + } + + public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception + { + doTestTextMessage(null, "application/xml"); + } + + public void testConvertStringWithContentTypeText() throws Exception + { + doTestTextMessage("foo","text/foobar"); + } + + public void testConvertStringWithContentTypeApplicationXml() throws Exception + { + doTestTextMessage("<helloworld></helloworld>","application/xml"); + } + + public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception + { + doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd"); + } + + public void testConvertStringWithContentTypeApplicationFooXml() throws Exception + { + doTestTextMessage("<helloworld></helloworld>","application/foo+xml"); + } + + public void testConvertStringWithContentTypeApplicationJson() throws Exception + { + doTestTextMessage("[]","application/json"); + } + + public void testConvertStringWithContentTypeApplicationFooJson() throws Exception + { + doTestTextMessage("[]","application/foo+json"); + } + + public void testConvertStringWithContentTypeApplicationJavascript() throws Exception + { + doTestTextMessage("var foo","application/javascript"); + } + + public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception + { + doTestTextMessage("var foo","application/ecmascript"); + } + + public void testConvertBytesMessageBody() throws Exception + { + doTestBytesMessage("helloworld".getBytes()); + } + + public void testConvertBytesMessageBodyNoContentType() throws Exception + { + final byte[] messageContent = "helloworld".getBytes(); + doTest(messageContent, null, messageContent, null); + } + + public void testConvertMessageBodyUnknownContentType() throws Exception + { + final byte[] messageContent = "helloworld".getBytes(); + final String mimeType = "my/bytes"; + doTest(messageContent, mimeType, messageContent, mimeType); + } + + + public void testConvertEmptyBytesMessageBody() throws Exception + { + doTestBytesMessage(new byte[0]); + } + + public void testConvertJmsStreamMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D); + final byte[] messageBytes = getJmsStreamMessageBytes(expected); + + final String mimeType = "jms/stream-message"; + doTestStreamMessage(messageBytes, mimeType, expected); + } + + public void testConvertEmptyJmsStreamMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList(); + final String mimeType = "jms/stream-message"; + doTestStreamMessage(null, mimeType, expected); + } + + public void testConvertAmqpListMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D); + final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected); + + doTestStreamMessage(messageBytes, "amqp/list", expected); + } + + public void testConvertEmptyAmqpListMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList(); + doTestStreamMessage(null, "amqp/list", expected); + } + + public void testConvertJmsMapMessageBody() throws Exception + { + final Map<String, Object> expected = Collections.singletonMap("key", "value"); + final byte[] messageBytes = getJmsMapMessageBytes(expected); + + doTestMapMessage(messageBytes, "jms/map-message", expected); + } + + public void testConvertEmptyJmsMapMessageBody() throws Exception + { + doTestMapMessage(null, "jms/map-message", Collections.emptyMap()); + } + + public void testConvertAmqpMapMessageBody() throws Exception + { + final Map<String, Object> expected = Collections.singletonMap("key", "value"); + final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected); + + doTestMapMessage(messageBytes, "amqp/map", expected); + } + + public void testConvertEmptyAmqpMapMessageBody() throws Exception + { + doTestMapMessage(null, "amqp/map", Collections.emptyMap()); + } + + public void testConvertObjectStreamMessageBody() throws Exception + { + final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID()); + doTestObjectMessage(messageBytes, "application/java-object-stream", messageBytes); + } + + public void testConvertObjectStream2MessageBody() throws Exception + { + final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID()); + doTestObjectMessage(messageBytes, "application/x-java-serialized-object", messageBytes); + } + + public void testConvertEmptyObjectStreamMessageBody() throws Exception + { + doTestObjectMessage(null, "application/java-object-stream", new byte[0]); + } + + public void testConvertEmptyMessageWithoutContentType() throws Exception + { + doTest(null, null, null, null); + } + + public void testConvertEmptyMessageWithUnknownContentType() throws Exception + { + doTest(null, "foo/bar", new byte[0], "foo/bar"); + } + + public void testConvertMessageWithoutContentType() throws Exception + { + final byte[] expectedContent = "someContent".getBytes(UTF_8); + doTest(expectedContent, null, expectedContent, null); + } + + + private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception + { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos)) + { + oos.writeObject(o); + return bos.toByteArray(); + } + } + + private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception + { + TypedBytesContentWriter writer = new TypedBytesContentWriter(); + for (Object o : objects) + { + writer.writeObject(o); + } + return getBytes(writer); + } + + private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception + { + TypedBytesContentWriter writer = new TypedBytesContentWriter(); + writer.writeIntImpl(map.size()); + for (Map.Entry<String, Object> entry : map.entrySet()) + { + writer.writeNullTerminatedStringImpl(entry.getKey()); + writer.writeObject(entry.getValue()); + } + return getBytes(writer); + } + + private byte[] getBytes(final TypedBytesContentWriter writer) + { + ByteBuffer buf = writer.getData(); + final byte[] expected = new byte[buf.remaining()]; + buf.get(expected); + return expected; + } + + private MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType) + { + configureMessageContent(expected); + configureMessageHeader(mimeType); + + return new MessageTransferMessage(_handle, new Object()); + } + + private void configureMessageHeader(final String mimeType) + { + when(_amqpHeader.getMimeType()).thenReturn(mimeType); + _messageProperties.setContentType(mimeType); + } + + private void configureMessageContent(byte[] section) + { + if (section == null) + { + section = new byte[0]; + } + final QpidByteBuffer combined = QpidByteBuffer.wrap(section); + when(_handle.getContentSize()).thenReturn(section.length); + final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class); + final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class); + + when(_handle.getContent(offsetCaptor.capture(), + sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>() + { + @Override + public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable + { + final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue()); + return Collections.singleton(view); + } + }); + } + + private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception + { + + final byte[] contentBytes; + final String expectedContent; + if (originalContent == null) + { + contentBytes = null; + expectedContent = ""; + } + else + { + contentBytes = originalContent.getBytes(UTF_8); + expectedContent = originalContent; + } + doTest(contentBytes, mimeType, expectedContent, mimeType); + } + + + private void doTestMapMessage(final byte[] messageBytes, + final String mimeType, + final Map<String, Object> expected) throws Exception + { + doTest(messageBytes, mimeType, expected, null); + } + + private void doTestBytesMessage(final byte[] messageContent) throws Exception + { + doTest(messageContent,"application/octet-stream", messageContent, "application/octet-stream"); + } + + private void doTestStreamMessage(final byte[] messageBytes, + final String mimeType, + final List<Object> expected) throws Exception + { + doTest(messageBytes, mimeType, expected, null); + } + + private void doTestObjectMessage(final byte[] messageBytes, + final String mimeType, + final byte[] expectedBytes) + throws Exception + { + doTest(messageBytes, mimeType, expectedBytes, "application/x-java-serialized-object"); + } + + private void doTest(final byte[] messageBytes, + final String mimeType, + final Object expectedContent, + final String expectedMimeType) throws Exception + { + final MessageTransferMessage sourceMessage = getAmqMessage(messageBytes, mimeType); + final InternalMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + if (expectedContent instanceof byte[]) + { + assertArrayEquals("Unexpected content", + ((byte[]) expectedContent), + ((byte[]) convertedMessage.getMessageBody())); + } + else if (expectedContent instanceof List) + { + assertEquals("Unexpected content", + new ArrayList<Object>((Collection) expectedContent), + new ArrayList<Object>((Collection) convertedMessage.getMessageBody())); + } + else if (expectedContent instanceof Map) + { + assertEquals("Unexpected content", + new HashMap<Object,Object>((Map) expectedContent), + new HashMap<Object,Object>((Map) convertedMessage.getMessageBody())); + } + else + { + assertEquals("Unexpected content", expectedContent, convertedMessage.getMessageBody()); + } + String convertedMimeType = convertedMessage.getMessageHeader().getMimeType(); + assertEquals("Unexpected content type", expectedMimeType, convertedMimeType); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/pom.xml b/broker-plugins/amqp-0-8-protocol/pom.xml index a40955f..010c9aa 100644 --- a/broker-plugins/amqp-0-8-protocol/pom.xml +++ b/broker-plugins/amqp-0-8-protocol/pom.xml @@ -59,6 +59,12 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java index fb43b6a..dbd2194 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java @@ -20,14 +20,26 @@ */ package org.apache.qpid.server.protocol.v0_8; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.LIST_MESSAGE_CONTENT_TYPES; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.MAP_MESSAGE_CONTENT_TYPES; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES; + import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.message.internal.InternalMessageHeader; +import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils; import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry; import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; import org.apache.qpid.server.model.NamedAddressSpace; @@ -75,10 +87,55 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe } Object body = convertMessageBody(mimeType, data); + final AMQMessageHeader convertedHeader = convertHeader(serverMessage, addressSpace, body, encoding); + return InternalMessage.convert(serverMessage, convertedHeader, body); + } + + private AMQMessageHeader convertHeader(final AMQMessage serverMessage, + final NamedAddressSpace addressSpace, + final Object convertedBodyObject, final String encoding) + { + final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject); + final AMQMessageHeader messageHeader = serverMessage.getMessageHeader(); + + Map<String, Object> headers = new HashMap<>(); + messageHeader.getHeaderNames() + .forEach(headerName -> headers.put(headerName, messageHeader.getHeader(headerName))); + + final InternalMessageHeader header = new InternalMessageHeader(headers, + messageHeader.getCorrelationId(), + messageHeader.getExpiration(), + messageHeader.getUserId(), + messageHeader.getAppId(), + messageHeader.getMessageId(), + convertedMimeType, + messageHeader.getEncoding(), + messageHeader.getPriority(), + messageHeader.getTimestamp(), + messageHeader.getNotValidBefore(), + messageHeader.getType(), + messageHeader.getReplyTo(), + serverMessage.getArrivalTime()); + return new DelegatingMessageHeader(header, encoding); + } + + private String getInternalConvertedMimeType(final AMQMessage serverMessage, final Object convertedBodyObject) + { + String originalMimeType = serverMessage.getMessageHeader().getMimeType(); + if (originalMimeType != null) + { + if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches() + || ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()) + { + return null; + } + else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()) + { + return "application/x-java-serialized-object"; + } + } - return InternalMessage.convert(serverMessage, - new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding), - body); + return originalMimeType; } @Override @@ -298,14 +355,38 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe private static Object convertMessageBody(String mimeType, byte[] data) { MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType); - if (converter != null) + if (data != null && data.length != 0) + { + if (converter != null) + { + return converter.toObject(data); + } + else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches()) + { + return new String(data, UTF_8); + } + } + else if (mimeType == null) + { + return null; + } + else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) + { + return new byte[0]; + } + else if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(mimeType).matches()) + { + return ""; + } + else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) { - return converter.toObject(data); + return Collections.emptyMap(); } - else + else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) { - return data; + return Collections.emptyList(); } + return data; } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java new file mode 100644 index 0000000..0f32481 --- /dev/null +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter; +import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.test.utils.QpidTestCase; + +public class MessageConverter_0_8_to_InternalTest extends QpidTestCase +{ + private final MessageConverter_v0_8_to_Internal _converter = new MessageConverter_v0_8_to_Internal(); + + private final StoredMessage<MessageMetaData> _handle = mock(StoredMessage.class); + + private final MessageMetaData _metaData = mock(MessageMetaData.class); + private final AMQMessageHeader _header = mock(AMQMessageHeader.class); + private final ContentHeaderBody _contentHeaderBody = mock(ContentHeaderBody.class); + private final BasicContentHeaderProperties _basicContentHeaderProperties = mock(BasicContentHeaderProperties.class); + + @Override + public void setUp() throws Exception + { + super.setUp(); + when(_handle.getMetaData()).thenReturn(_metaData); + when(_metaData.getMessageHeader()).thenReturn(_header); + when(_metaData.getMessagePublishInfo()).thenReturn(new MessagePublishInfo()); + when(_metaData.getContentHeaderBody()).thenReturn(_contentHeaderBody); + when(_contentHeaderBody.getProperties()).thenReturn(_basicContentHeaderProperties); + } + + public void testConvertStringMessageBody() throws Exception + { + doTestTextMessage("helloworld", "text/plain"); + } + + public void testConvertEmptyStringMessageBody() throws Exception + { + doTestTextMessage(null, "text/plain"); + } + + public void testConvertStringXmlMessageBody() throws Exception + { + doTestTextMessage("<helloworld></helloworld>", "text/xml"); + } + + public void testConvertEmptyStringXmlMessageBody() throws Exception + { + doTestTextMessage(null, "text/xml"); + } + + public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception + { + doTestTextMessage(null, "application/xml"); + } + + public void testConvertStringWithContentTypeText() throws Exception + { + doTestTextMessage("foo","text/foobar"); + } + + public void testConvertStringWithContentTypeApplicationXml() throws Exception + { + doTestTextMessage("<helloworld></helloworld>","application/xml"); + } + + public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception + { + doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd"); + } + + public void testConvertStringWithContentTypeApplicationFooXml() throws Exception + { + doTestTextMessage("<helloworld></helloworld>","application/foo+xml"); + } + + public void testConvertStringWithContentTypeApplicationJson() throws Exception + { + doTestTextMessage("[]","application/json"); + } + + public void testConvertStringWithContentTypeApplicationFooJson() throws Exception + { + doTestTextMessage("[]","application/foo+json"); + } + + public void testConvertStringWithContentTypeApplicationJavascript() throws Exception + { + doTestTextMessage("var foo","application/javascript"); + } + + public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception + { + doTestTextMessage("var foo","application/ecmascript"); + } + + public void testConvertBytesMessageBody() throws Exception + { + doTestBytesMessage("helloworld".getBytes()); + } + + public void testConvertBytesMessageBodyNoContentType() throws Exception + { + final byte[] messageContent = "helloworld".getBytes(); + doTest(messageContent, null, messageContent, null); + } + + public void testConvertMessageBodyUnknownContentType() throws Exception + { + final byte[] messageContent = "helloworld".getBytes(); + final String mimeType = "my/bytes"; + doTest(messageContent, mimeType, messageContent, mimeType); + } + + + public void testConvertEmptyBytesMessageBody() throws Exception + { + doTestBytesMessage(new byte[0]); + } + + public void testConvertJmsStreamMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D); + final byte[] messageBytes = getJmsStreamMessageBytes(expected); + + final String mimeType = "jms/stream-message"; + doTestStreamMessage(messageBytes, mimeType, expected); + } + + public void testConvertEmptyJmsStreamMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList(); + final String mimeType = "jms/stream-message"; + doTestStreamMessage(null, mimeType, expected); + } + + public void testConvertAmqpListMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D); + final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected); + + doTestStreamMessage(messageBytes, "amqp/list", expected); + } + + public void testConvertEmptyAmqpListMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList(); + doTestStreamMessage(null, "amqp/list", expected); + } + + public void testConvertJmsMapMessageBody() throws Exception + { + final Map<String, Object> expected = Collections.singletonMap("key", "value"); + final byte[] messageBytes = getJmsMapMessageBytes(expected); + + doTestMapMessage(messageBytes, "jms/map-message", expected); + } + + public void testConvertEmptyJmsMapMessageBody() throws Exception + { + doTestMapMessage(null, "jms/map-message", Collections.emptyMap()); + } + + public void testConvertAmqpMapMessageBody() throws Exception + { + final Map<String, Object> expected = Collections.singletonMap("key", "value"); + final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected); + + doTestMapMessage(messageBytes, "amqp/map", expected); + } + + public void testConvertEmptyAmqpMapMessageBody() throws Exception + { + doTestMapMessage(null, "amqp/map", Collections.emptyMap()); + } + + public void testConvertObjectStreamMessageBody() throws Exception + { + final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID()); + doTestObjectMessage(messageBytes, "application/java-object-stream", messageBytes); + } + + public void testConvertObjectStream2MessageBody() throws Exception + { + final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID()); + doTestObjectMessage(messageBytes, "application/x-java-serialized-object", messageBytes); + } + + public void testConvertEmptyObjectStreamMessageBody() throws Exception + { + doTestObjectMessage(null, "application/java-object-stream", new byte[0]); + } + + public void testConvertEmptyMessageWithoutContentType() throws Exception + { + doTest(null, null, null, null); + } + + public void testConvertEmptyMessageWithUnknownContentType() throws Exception + { + doTest(null, "foo/bar", new byte[0], "foo/bar"); + } + + public void testConvertMessageWithoutContentType() throws Exception + { + final byte[] expectedContent = "someContent".getBytes(UTF_8); + doTest(expectedContent, null, expectedContent, null); + } + + + private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception + { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos)) + { + oos.writeObject(o); + return bos.toByteArray(); + } + } + + private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception + { + TypedBytesContentWriter writer = new TypedBytesContentWriter(); + for (Object o : objects) + { + writer.writeObject(o); + } + return getBytes(writer); + } + + private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception + { + TypedBytesContentWriter writer = new TypedBytesContentWriter(); + writer.writeIntImpl(map.size()); + for (Map.Entry<String, Object> entry : map.entrySet()) + { + writer.writeNullTerminatedStringImpl(entry.getKey()); + writer.writeObject(entry.getValue()); + } + return getBytes(writer); + } + + private byte[] getBytes(final TypedBytesContentWriter writer) + { + ByteBuffer buf = writer.getData(); + final byte[] expected = new byte[buf.remaining()]; + buf.get(expected); + return expected; + } + + protected AMQMessage getAmqMessage(final byte[] expected, final String mimeType) + { + configureMessageContent(expected); + configureMessageHeader(mimeType); + + return new AMQMessage(_handle); + } + + private void configureMessageHeader(final String mimeType) + { + when(_header.getMimeType()).thenReturn(mimeType); + when(_basicContentHeaderProperties.getContentTypeAsString()).thenReturn(mimeType); + } + + private void configureMessageContent(byte[] section) + { + if (section == null) + { + section = new byte[0]; + } + final QpidByteBuffer combined = QpidByteBuffer.wrap(section); + when(_handle.getContentSize()).thenReturn(section.length); + final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class); + final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class); + + when(_handle.getContent(offsetCaptor.capture(), + sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>() + { + @Override + public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable + { + final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue()); + return Collections.singleton(view); + } + }); + } + + private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception + { + + final byte[] contentBytes; + final String expectedContent; + if (originalContent == null) + { + contentBytes = null; + expectedContent = ""; + } + else + { + contentBytes = originalContent.getBytes(UTF_8); + expectedContent = originalContent; + } + doTest(contentBytes, mimeType, expectedContent, mimeType); + } + + + private void doTestMapMessage(final byte[] messageBytes, + final String mimeType, + final Map<String, Object> expected) throws Exception + { + doTest(messageBytes, mimeType, expected, null); + } + + private void doTestBytesMessage(final byte[] messageContent) throws Exception + { + doTest(messageContent,"application/octet-stream", messageContent, "application/octet-stream"); + } + + private void doTestStreamMessage(final byte[] messageBytes, + final String mimeType, + final List<Object> expected) throws Exception + { + doTest(messageBytes, mimeType, expected, null); + } + + private void doTestObjectMessage(final byte[] messageBytes, + final String mimeType, + final byte[] expectedBytes) + throws Exception + { + doTest(messageBytes, mimeType, expectedBytes, "application/x-java-serialized-object"); + } + + private void doTest(final byte[] messageBytes, + final String mimeType, + final Object expectedContent, + final String expectedMimeType) throws Exception + { + final AMQMessage sourceMessage = getAmqMessage(messageBytes, mimeType); + final InternalMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + if (expectedContent instanceof byte[]) + { + assertArrayEquals("Unexpected content", + ((byte[]) expectedContent), + ((byte[]) convertedMessage.getMessageBody())); + } + else if (expectedContent instanceof List) + { + assertEquals("Unexpected content", + new ArrayList((Collection) expectedContent), + new ArrayList((Collection) convertedMessage.getMessageBody())); + } + else if (expectedContent instanceof Map) + { + assertEquals("Unexpected content", + new HashMap((Map) expectedContent), + new HashMap((Map) convertedMessage.getMessageBody())); + } + else + { + assertEquals("Unexpected content", expectedContent, convertedMessage.getMessageBody()); + } + String convertedMimeType = convertedMessage.getMessageHeader().getMimeType(); + assertEquals("Unexpected content type", expectedMimeType, convertedMimeType); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java index d5581fa..2e27f47 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -36,9 +36,9 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.regex.Pattern; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils; import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter; import org.apache.qpid.server.protocol.converter.MessageConversionException; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; @@ -77,13 +77,6 @@ public class MessageConverter_from_1_0 UUID.class, Date.class)); - public static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$"); - public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$"); - public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$"); - public static final Pattern - OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$"); - public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$"); - static Object convertBodyToObject(final Message_1_0 serverMessage) { final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize()); @@ -296,29 +289,29 @@ public class MessageConverter_from_1_0 Class<?> contentTypeClassHint = null; String type = contentType.toString(); String supportedContentType = null; - if (TEXT_CONTENT_TYPES.matcher(type).matches()) + if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = String.class; // the AMQP 0-x client does not accept arbitrary "text/*" mimeTypes so use "text/plain" supportedContentType = "text/plain"; } - else if (MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = Map.class; supportedContentType = contentType.toString(); } - else if (LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = List.class; supportedContentType = contentType.toString(); } - else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = Serializable.class; // the AMQP 0-x client does not accept the "application/x-java-serialized-object" mimeTypes so use fall back supportedContentType = "application/java-object-stream"; } - else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = byte[].class; supportedContentType = "application/octet-stream"; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 23eee80..6d4b653 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.protocol.v1_0; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.*; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES; +import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES; import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.BYTES_MESSAGE; import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MAP_MESSAGE; import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MESSAGE; @@ -42,6 +45,7 @@ import java.util.Map; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils; import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry; import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; import org.apache.qpid.server.model.NamedAddressSpace; @@ -76,23 +80,23 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement Symbol contentType = null; if (contentMimeType != null) { - if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches()) + if (TEXT_CONTENT_TYPES.matcher(contentMimeType).matches()) { contentType = Symbol.valueOf(contentMimeType); } - else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { contentType = Symbol.valueOf("application/octet-stream"); } - else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { contentType = null; } - else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { contentType = null; } - else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { contentType = Symbol.valueOf("application/x-java-serialized-object"); } @@ -111,22 +115,22 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement final Symbol key = Symbol.valueOf("x-opt-jms-msg-type"); if (contentMimeType != null) { - if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches()) + if (TEXT_CONTENT_TYPES.matcher(contentMimeType).matches()) { messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, TEXT_MESSAGE.getType())); } - else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, BYTES_MESSAGE.getType())); } - else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { if (isSectionValidForJmsMap(bodySection)) { messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, MAP_MESSAGE.getType())); } } - else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { if (isSectionValidForJmsList(bodySection)) { @@ -134,7 +138,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement new MessageAnnotations(Collections.singletonMap(key, STREAM_MESSAGE.getType())); } } - else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) { messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, OBJECT_MESSAGE.getType())); } @@ -269,7 +273,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement return new AmqpSequence(fixListValues((List<Object>) bodyObject)); } } - else if (mimeType != null && MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches()) + else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches()) { return new AmqpValue(new String(data, UTF_8)); } @@ -278,19 +282,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement { return new AmqpValue(null); } - else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) + else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) { return new Data(new Binary(SERIALIZED_NULL)); } - else if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches()) + else if (TEXT_CONTENT_TYPES.matcher(mimeType).matches()) { return new AmqpValue(""); } - else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) + else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) { return new AmqpValue(Collections.emptyMap()); } - else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) + else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) { return new AmqpSequence(Collections.emptyList()); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java index 749c1a6..6e015d1 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.message.internal.InternalMessageHeader; +import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; @@ -72,7 +73,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa final NamedAddressSpace addressSpace, final Object convertedBodyObject) { - final String convertedMimeType = getInternalConvertedContentAndMimeType(serverMessage, convertedBodyObject); + final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject); final MessageMetaData_1_0.MessageHeader_1_0 messageHeader = serverMessage.getMessageHeader(); final InternalMessageHeader header = new InternalMessageHeader(messageHeader.getHeadersAsMap(), messageHeader.getCorrelationId(), @@ -103,8 +104,8 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa return "v1-0 to Internal"; } - private static String getInternalConvertedContentAndMimeType(final Message_1_0 serverMsg, - final Object convertedBodyObject) + private static String getInternalConvertedMimeType(final Message_1_0 serverMsg, + final Object convertedBodyObject) { MessageConverter_from_1_0.ContentHint contentHint = getInternalTypeHint(serverMsg); @@ -129,7 +130,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa } else if (contentClassHint == String.class && (originalContentType == null - || !MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(originalContentType).matches())) + || !ConversionUtils.TEXT_CONTENT_TYPES.matcher(originalContentType).matches())) { mimeType = "text/plain"; } @@ -153,7 +154,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa } else if (convertedBodyObject instanceof String && (originalContentType == null - || !MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(originalContentType).matches())) + || !ConversionUtils.TEXT_CONTENT_TYPES.matcher(originalContentType).matches())) { mimeType = "text/plain"; } @@ -222,23 +223,23 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa { Class<?> contentTypeClassHint = null; String type = contentType.toString(); - if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(type).matches()) + if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = String.class; } - else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = Map.class; } - else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = List.class; } - else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = Serializable.class; } - else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + else if (ConversionUtils.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches()) { contentTypeClassHint = byte[].class; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java index 2d53233..7cf43ce 100644 --- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java @@ -352,13 +352,12 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase return sections; } - protected MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType) + private MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType) { configureMessageContent(expected); configureMessageHeader(mimeType); - final MessageTransferMessage messageTransferMessage = new MessageTransferMessage(_handle, new Object()); - return messageTransferMessage; + return new MessageTransferMessage(_handle, new Object()); } private void configureMessageHeader(final String mimeType) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
