Repository: qpid-broker-j Updated Branches: refs/heads/master f267226ab -> 84256b2c7
QPID-7434: [Java Broker] Improve Internal to AMQP 1.0 content conversion and add unit tests Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/84256b2c Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/84256b2c Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/84256b2c Branch: refs/heads/master Commit: 84256b2c718a06490a7a7650c1eb0875c03dc9d4 Parents: f267226 Author: Lorenz Quack <lqu...@apache.org> Authored: Wed Aug 9 16:47:50 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Thu Aug 10 10:54:40 2017 +0100 ---------------------------------------------------------------------- .../v1_0/MessageConverter_Internal_to_v1_0.java | 139 ++++++- .../protocol/v1_0/MessageConverter_to_1_0.java | 4 +- .../MessageConverter_Internal_to_1_0Test.java | 410 +++++++++++++++++++ .../management/amqp/AmqpManagementTest.java | 17 +- 4 files changed, 546 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/84256b2c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java index 4a9e3ae..847a26a 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java @@ -20,15 +20,27 @@ */ package org.apache.qpid.server.protocol.v1_0; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.BYTES_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MAP_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.OBJECT_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.STREAM_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.TEXT_MESSAGE; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; +import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import com.google.common.collect.Sets; + import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.converter.MessageConversionException; @@ -38,11 +50,13 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties; import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations; import org.apache.qpid.server.protocol.v1_0.type.messaging.NonEncodingRetainingSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -51,6 +65,12 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<InternalMessage> { + private static final Set<Class<?>> TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE = Sets.newHashSet(String.class, + Character.class, + Boolean.class, + Number.class, + UUID.class, + Date.class); @Override public Class<InternalMessage> getInputClass() @@ -81,10 +101,9 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I properties.setCorrelationId(getCorrelationId(serverMessage)); properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp())); properties.setMessageId(getMessageId(serverMessage)); - if(bodySection instanceof Data) - { - properties.setContentType(Symbol.valueOf(serverMessage.getMessageHeader().getMimeType())); - } + Symbol contentType = getContentTypeSymbol(serverMessage.getMessageBody(), serverMessage.getMessageHeader().getMimeType()); + properties.setContentType(contentType); + final String userId = serverMessage.getMessageHeader().getUserId(); if(userId != null) { @@ -106,9 +125,14 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I } } + final MessageAnnotations messageAnnotation = createMessageAnnotation(serverMessage.getMessageBody(), + serverMessage.getMessageHeader() + .getMimeType(), + bodySection); + return new MessageMetaData_1_0(header.createEncodingRetainingSection(), null, - null, + messageAnnotation == null ? null : messageAnnotation.createEncodingRetainingSection(), properties.createEncodingRetainingSection(), applicationProperties == null ? null : applicationProperties.createEncodingRetainingSection(), null, @@ -117,6 +141,91 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I } + private MessageAnnotations createMessageAnnotation(final Object originalMessageBody, + String mimeType, + final EncodingRetainingSection<?> convertedMessageBody) + { + final Byte contentTypeAnnotationValue; + if (originalMessageBody instanceof String) + { + contentTypeAnnotationValue = TEXT_MESSAGE.getType(); + } + else if (originalMessageBody instanceof List) + { + contentTypeAnnotationValue = isSectionValidForJmsList(convertedMessageBody) ? STREAM_MESSAGE.getType() : null; + } + else if (originalMessageBody instanceof byte[]) + { + contentTypeAnnotationValue = BYTES_MESSAGE.getType(); + } + else if (originalMessageBody instanceof Map) + { + contentTypeAnnotationValue = isSectionValidForJmsMap(convertedMessageBody) ? MAP_MESSAGE.getType() : null; + } + else if (originalMessageBody != null + && TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz -> clazz.isAssignableFrom(originalMessageBody.getClass()))) + { + contentTypeAnnotationValue = null; + } + else if (originalMessageBody instanceof Serializable) + { + contentTypeAnnotationValue = OBJECT_MESSAGE.getType(); + } + else if (originalMessageBody == null && mimeType == null) + { + contentTypeAnnotationValue = MESSAGE.getType(); + } + else + { + contentTypeAnnotationValue = null; + } + + if (contentTypeAnnotationValue != null) + { + return new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), + contentTypeAnnotationValue)); + } + else + { + return null; + } + } + + private Symbol getContentTypeSymbol(final Object messageBody, final String mimeType) + { + String contentTypeAsString; + if (messageBody instanceof String) + { + contentTypeAsString = mimeType == null ? "text/plain" : mimeType; + } + else if (messageBody instanceof List) + { + contentTypeAsString = null; + } + else if (messageBody instanceof byte[]) + { + contentTypeAsString = mimeType == null ? "application/octet-stream" : mimeType; + } + else if (messageBody instanceof Map) + { + contentTypeAsString = null; + } + else if (messageBody != null + && TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz -> clazz.isAssignableFrom(messageBody.getClass()))) + { + contentTypeAsString = mimeType; + } + else if (messageBody instanceof Serializable) + { + contentTypeAsString = "application/x-java-serialized-object"; + } + else + { + contentTypeAsString = mimeType; + } + return Symbol.valueOf(contentTypeAsString); + } + private Object getMessageId(final InternalMessage serverMessage) { String messageIdAsString = serverMessage.getMessageHeader().getMessageId(); @@ -170,28 +279,28 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I public NonEncodingRetainingSection<?> convertToBody(Object object) { - if(object instanceof String) + if (object == null + || TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz -> clazz.isAssignableFrom(object.getClass()))) { return new AmqpValue(object); } - else if(object instanceof byte[]) + else if (object instanceof byte[]) { - return new Data(new Binary((byte[])object)); + return new Data(new Binary((byte[]) object)); } - else if(object instanceof Map) + else if (object instanceof Map) { - return new AmqpValue(MessageConverter_to_1_0.fixMapValues((Map)object)); + return new AmqpValue(MessageConverter_to_1_0.fixMapValues((Map) object)); } - else if(object instanceof List) + else if (object instanceof List) { - return new AmqpValue(MessageConverter_to_1_0.fixListValues((List)object)); + return new AmqpSequence(MessageConverter_to_1_0.fixListValues((List) object)); } else { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try + try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + ObjectOutputStream os = new ObjectOutputStream(bytesOut)) { - ObjectOutputStream os = new ObjectOutputStream(bytesOut); os.writeObject(object); return new Data(new Binary(bytesOut.toByteArray())); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/84256b2c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 09b9964..7060058 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -147,7 +147,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement return messageAnnotations; } - private static boolean isSectionValidForJmsList(final EncodingRetainingSection<?> section) + public static boolean isSectionValidForJmsList(final EncodingRetainingSection<?> section) { if (section instanceof AmqpSequenceSection) { @@ -174,7 +174,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement return false; } - private static boolean isSectionValidForJmsMap(final EncodingRetainingSection<?> section) + public static boolean isSectionValidForJmsMap(final EncodingRetainingSection<?> section) { if (section instanceof AmqpValueSection) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/84256b2c/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_1_0Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_1_0Test.java new file mode 100644 index 0000000..6d8c89c --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_1_0Test.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getContentType; +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.message.internal.InternalMessageHeader; +import org.apache.qpid.server.message.internal.InternalMessageMetaData; +import org.apache.qpid.server.message.internal.InternalMessageMetaDataType; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.test.utils.QpidTestCase; + +public class MessageConverter_Internal_to_1_0Test extends QpidTestCase +{ + private final MessageConverter_Internal_to_v1_0 _converter = new MessageConverter_Internal_to_v1_0(); + private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() + .registerTransportLayer() + .registerMessagingLayer() + .registerTransactionLayer() + .registerSecurityLayer(); + + private final StoredMessage<InternalMessageMetaData> _handle = mock(StoredMessage.class); + + private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class); + + @Override + public void setUp() throws Exception + { + super.setUp(); + } + + + public void testStringMessage() throws Exception + { + String content = "testContent"; + final String mimeType = "text/plain"; + doTest(content, + mimeType, + AmqpValueSection.class, + content, + Symbol.valueOf(mimeType), + JmsMessageTypeAnnotation.TEXT_MESSAGE.getType()); + } + + public void testStringMessageWithUnknownMimeType() throws Exception + { + String content = "testContent"; + final String mimeType = "foo/bar"; + doTest(content, + mimeType, + AmqpValueSection.class, + content, + Symbol.valueOf(mimeType), + JmsMessageTypeAnnotation.TEXT_MESSAGE.getType()); + } + + public void testStringMessageWithoutMimeType() throws Exception + { + String content = "testContent"; + doTest(content, + null, + AmqpValueSection.class, + content, + Symbol.valueOf("text/plain"), + JmsMessageTypeAnnotation.TEXT_MESSAGE.getType()); + } + + public void testListMessageWithMimeType() throws Exception + { + ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42); + doTest(content, + "text/plain", + AmqpSequenceSection.class, + content, + null, + JmsMessageTypeAnnotation.STREAM_MESSAGE.getType()); + } + + public void testListMessageWithoutMimeType() throws Exception + { + ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42); + doTest(content, + null, + AmqpSequenceSection.class, + content, + null, + JmsMessageTypeAnnotation.STREAM_MESSAGE.getType()); + } + + public void testListMessageWithoutMimeTypeWithNonJmsContent() throws Exception + { + ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42, Lists.newArrayList()); + doTest(content, + null, + AmqpSequenceSection.class, + content, + null, + null); + } + + public void testByteArrayMessageWithoutMimeType() throws Exception + { + byte[] content = "testContent".getBytes(UTF_8); + doTest(content, + null, + DataSection.class, + content, + Symbol.valueOf("application/octet-stream"), + JmsMessageTypeAnnotation.BYTES_MESSAGE.getType()); + } + + public void testByteArrayMessageWithMimeType() throws Exception + { + byte[] content = "testContent".getBytes(UTF_8); + final String mimeType = "foo/bar"; + doTest(content, + mimeType, + DataSection.class, + content, + Symbol.valueOf(mimeType), + JmsMessageTypeAnnotation.BYTES_MESSAGE.getType()); + } + + public void testEmptyByteArrayMessageWithMimeType() throws Exception + { + byte[] content = new byte[0]; + final String mimeType = "foo/bar"; + doTest(content, + mimeType, + DataSection.class, + content, + Symbol.valueOf(mimeType), + JmsMessageTypeAnnotation.BYTES_MESSAGE.getType()); + } + + public void testMapMessageWithMimeType() throws Exception + { + HashMap<Object, Object> content = new HashMap<>(); + content.put("key1", 37); + content.put("key2", "foo"); + final String mimeType = "foo/bar"; + doTest(content, + mimeType, + AmqpValueSection.class, + content, + null, + JmsMessageTypeAnnotation.MAP_MESSAGE.getType()); + } + + public void testMapMessageWithoutMimeType() throws Exception + { + HashMap<Object, Object> content = new HashMap<>(); + content.put("key1", 37); + content.put("key2", "foo"); + doTest(content, + null, + AmqpValueSection.class, + content, + null, + JmsMessageTypeAnnotation.MAP_MESSAGE.getType()); + } + + public void testMapMessageWithMimeTypeWithNonJmsContent() throws Exception + { + HashMap<Object, Object> content = new HashMap<>(); + content.put(37, Collections.singletonMap("foo", "bar")); + final String mimeType = "foo/bar"; + doTest(content, + mimeType, + AmqpValueSection.class, + content, + null, + null); + } + + public void testSerializableMessageWithMimeType() throws Exception + { + Serializable content = new MySerializable(); + final String mimeType = "foo/bar"; + doTest(content, + mimeType, + DataSection.class, + getObjectStreamMessageBytes(content), + Symbol.valueOf("application/x-java-serialized-object"), + JmsMessageTypeAnnotation.OBJECT_MESSAGE.getType()); + } + + public void testSerializableMessageWithoutMimeType() throws Exception + { + Serializable content = new MySerializable(); + doTest(content, + null, + DataSection.class, + getObjectStreamMessageBytes(content), + Symbol.valueOf("application/x-java-serialized-object"), + JmsMessageTypeAnnotation.OBJECT_MESSAGE.getType()); + } + + public void testNullMessageWithoutMimeType() throws Exception + { + doTest(null, + null, + AmqpValueSection.class, + null, + null, + JmsMessageTypeAnnotation.MESSAGE.getType()); + } + + public void testUuidMessageWithMimeType() throws Exception + { + UUID content = UUID.randomUUID(); + final String mimeType = "foo/bar"; + doTest(content, + mimeType, + AmqpValueSection.class, + content, + Symbol.valueOf(mimeType), + null); + } + + + + private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception + { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos)) + { + oos.writeObject(o); + return bos.toByteArray(); + } + } + + private List<EncodingRetainingSection<?>> getEncodingRetainingSections(final Collection<QpidByteBuffer> content, + final int expectedNumberOfSections) + throws Exception + { + SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry()); + final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(content)); + assertEquals("Unexpected number of sections", expectedNumberOfSections, sections.size()); + return sections; + } + + + protected InternalMessage getAmqMessage(final Serializable content, final String mimeType) throws Exception + { + final byte[] serializedContent = getObjectStreamMessageBytes(content); + configureMessageContent(serializedContent); + configureMessageHeader(mimeType); + + final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(_amqpHeader); + final int contentSize = serializedContent == null ? 0 : serializedContent.length; + final InternalMessageMetaData metaData = + new InternalMessageMetaData(false, internalMessageHeader, contentSize); + when(_handle.getMetaData()).thenReturn(metaData); + + return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(_handle)); + } + + private void configureMessageHeader(final String mimeType) + { + when(_amqpHeader.getMimeType()).thenReturn(mimeType); + } + + private void configureMessageContent(byte[] section) + { + if (section == null) + { + section = new byte[0]; + } + final QpidByteBuffer combined = QpidByteBuffer.wrap(section); + when(_handle.getContentSize()).thenReturn(section.length); + final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class); + final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class); + + when(_handle.getContent(offsetCaptor.capture(), + sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>() + { + @Override + public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable + { + final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue()); + return Collections.singleton(view); + } + }); + } + + private Byte getJmsMessageTypeAnnotation(final Message_1_0 convertedMessage) + { + MessageAnnotationsSection messageAnnotationsSection = convertedMessage.getMessageAnnotationsSection(); + if (messageAnnotationsSection != null) + { + Map<Symbol, Object> messageAnnotations = messageAnnotationsSection.getValue(); + if (messageAnnotations != null) + { + Object annotation = messageAnnotations.get(Symbol.valueOf("x-opt-jms-msg-type")); + if (annotation instanceof Byte) + { + return ((Byte) annotation); + } + } + } + return null; + } + + private void doTest(final Serializable messageBytes, + final String mimeType, + final Class<? extends EncodingRetainingSection<?>> expectedBodySection, + final Object expectedContent, + final Symbol expectedContentType, + final Byte expectedJmsTypeAnnotation) throws Exception + { + final InternalMessage sourceMessage = getAmqMessage(messageBytes, mimeType); + final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1); + EncodingRetainingSection<?> encodingRetainingSection = sections.get(0); + assertEquals("Unexpected section type", expectedBodySection, encodingRetainingSection.getClass()); + + if (expectedContent instanceof byte[]) + { + assertArrayEquals("Unexpected content", + ((byte[]) expectedContent), + ((Binary) encodingRetainingSection.getValue()).getArray()); + } + else + { + assertEquals("Unexpected content", expectedContent, encodingRetainingSection.getValue()); + } + + Symbol contentType = getContentType(convertedMessage); + if (expectedContentType == null) + { + assertNull("Content type should be null", contentType); + } + else + { + assertEquals("Unexpected content type", expectedContentType, contentType); + } + + Byte jmsMessageTypeAnnotation = getJmsMessageTypeAnnotation(convertedMessage); + if (expectedJmsTypeAnnotation == null) + { + assertEquals("Unexpected annotation 'x-opt-jms-msg-type'", null, jmsMessageTypeAnnotation); + } + else + { + assertEquals("Unexpected annotation 'x-opt-jms-msg-type'", + expectedJmsTypeAnnotation, + jmsMessageTypeAnnotation); + } + } + + private static class MySerializable implements Serializable + { + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/84256b2c/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java b/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java index 4ce6c67..58bc844 100644 --- a/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java +++ b/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java @@ -184,20 +184,23 @@ public class AmqpManagementTest extends QpidBrokerTestCase { if (isBroker10()) { - assertTrue(String.format("The response was not an Object Message. It was a : %s ", - responseMessage.getClass()), responseMessage instanceof ObjectMessage); - assertTrue("The Object Message did not contain a Map", - ((ObjectMessage) responseMessage).getObject() instanceof Map); + if (!(responseMessage instanceof MapMessage) + && !(responseMessage instanceof ObjectMessage + && ((ObjectMessage) responseMessage).getObject() instanceof Map)) + { + fail(String.format("The response was neither a Map Message nor an Object Message containing a Map. It was a : %s ", + responseMessage.getClass())); + } } else { - assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage); + assertTrue(String.format("The response was not a MapMessage. It was a '%s'.", responseMessage.getClass()), responseMessage instanceof MapMessage); } } private Object getValueFromMapResponse(final Message responseMessage, String name) throws JMSException { - if (isBroker10()) + if (isBroker10() && responseMessage instanceof ObjectMessage) { return ((Map)((ObjectMessage)responseMessage).getObject()).get(name); } @@ -210,7 +213,7 @@ public class AmqpManagementTest extends QpidBrokerTestCase @SuppressWarnings("unchecked") private Collection<String> getMapResponseKeys(final Message responseMessage) throws JMSException { - if (isBroker10()) + if (isBroker10() && responseMessage instanceof ObjectMessage) { return ((Map)((ObjectMessage)responseMessage).getObject()).keySet(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org