Repository: qpid-broker-j Updated Branches: refs/heads/master 4a2d88957 -> 8ab67e0c1
QPID-7434: Add unit tests for property conversion of internal messages to AMQP 1.0 messages Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/04b19e96 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/04b19e96 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/04b19e96 Branch: refs/heads/master Commit: 04b19e96b30e92d2270458c86381d356726cc76d Parents: 4a2d889 Author: Alex Rudyy <[email protected]> Authored: Tue Aug 1 15:30:20 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Tue Aug 1 15:30:20 2017 +0100 ---------------------------------------------------------------------- .../message/internal/InternalMessage.java | 30 +- .../v1_0/MessageConverter_Internal_to_v1_0.java | 57 +++- .../v1_0/MessageConverter_from_1_0.java | 30 ++ .../PropertyConverter_Internal_to_v1_0Test.java | 307 +++++++++++++++++++ 4 files changed, 410 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/04b19e96/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 102bf09..a65b58c 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.MessageHandle; @@ -68,21 +69,28 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, { super(msg, null); _header = msg.getMetaData().getHeader(); - Collection<QpidByteBuffer> bufs = msg.getContent(0, (int)getSize()); - - try(ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(ByteBufferUtils.combine(bufs)))) + long contentSize = getSize(); + if (contentSize > 0) { - _messageBody = is.readObject(); + Collection<QpidByteBuffer> bufs = msg.getContent(0, (int) contentSize); + try (ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(ByteBufferUtils.combine(bufs)))) + { + _messageBody = is.readObject(); + } + catch (IOException e) + { + throw new ConnectionScopedRuntimeException("Unexpected IO Exception in operation in memory", e); + } + catch (ClassNotFoundException e) + { + throw new ConnectionScopedRuntimeException("Object message contained an object which could not " + + "be deserialized", e); + } } - catch (IOException e) - { - throw new ConnectionScopedRuntimeException("Unexpected IO Exception in operation in memory", e); - } - catch (ClassNotFoundException e) + else { - throw new ConnectionScopedRuntimeException("Object message contained an object which could not " + - "be deserialized", e); + _messageBody = null; } _destinationName = destinationName; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/04b19e96/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java index 2ac734a..4a9e3ae 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java @@ -27,14 +27,17 @@ import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.PluggableService; +import org.apache.qpid.server.protocol.converter.MessageConversionException; import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties; import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; @@ -71,9 +74,13 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I } Properties properties = new Properties(); - properties.setCorrelationId(serverMessage.getMessageHeader().getCorrelationId()); + if (serverMessage.getMessageHeader().getEncoding() != null) + { + properties.setContentEncoding(Symbol.valueOf(serverMessage.getMessageHeader().getEncoding())); + } + properties.setCorrelationId(getCorrelationId(serverMessage)); properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp())); - properties.setMessageId(serverMessage.getMessageHeader().getMessageId()); + properties.setMessageId(getMessageId(serverMessage)); if(bodySection instanceof Data) { properties.setContentType(Symbol.valueOf(serverMessage.getMessageHeader().getMimeType())); @@ -88,7 +95,15 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I ApplicationProperties applicationProperties = null; if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty()) { - applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() ); + try + { + applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap()); + } + catch (IllegalArgumentException e) + { + throw new MessageConversionException("Could not convert message from internal to 1.0" + + " because conversion of 'application headers' failed.", e); + } } return new MessageMetaData_1_0(header.createEncodingRetainingSection(), @@ -102,6 +117,42 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I } + private Object getMessageId(final InternalMessage serverMessage) + { + String messageIdAsString = serverMessage.getMessageHeader().getMessageId(); + return stringToMessageId(messageIdAsString); + } + + private Object getCorrelationId(final InternalMessage serverMessage) + { + String correlationIdAsString = serverMessage.getMessageHeader().getCorrelationId(); + return stringToMessageId(correlationIdAsString); + } + + private Object stringToMessageId(final String correlationIdAsString) + { + Object messageId = null; + if (correlationIdAsString != null) + { + try + { + messageId = UUID.fromString(correlationIdAsString); + } + catch (IllegalArgumentException e) + { + try + { + messageId = UnsignedLong.valueOf(correlationIdAsString); + } + catch (NumberFormatException nfe) + { + messageId = correlationIdAsString; + } + } + } + return messageId; + } + @Override protected EncodingRetainingSection<?> getBodySection(final InternalMessage serverMessage, final SectionEncoder encoder) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/04b19e96/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java index cf35014..1cc4a22 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -317,6 +317,36 @@ public class MessageConverter_from_1_0 return userId; } + public static String getReplyTo(final Message_1_0 serverMsg) + { + String replyTo = null; + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + replyTo = properties.getReplyTo(); + } + } + return replyTo; + } + + public static Symbol getContentEncoding(final Message_1_0 serverMsg) + { + Symbol contentEncoding = null; + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + contentEncoding = properties.getContentEncoding(); + } + } + return contentEncoding; + } + public static Object getCorrelationId(final Message_1_0 serverMsg) { Object correlationIdObject = null; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/04b19e96/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java new file mode 100644 index 0000000..455ab8f --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java @@ -0,0 +1,307 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.message.internal.InternalMessageHeader; +import org.apache.qpid.server.message.internal.InternalMessageMetaData; +import org.apache.qpid.server.message.internal.InternalMessageMetaDataType; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.protocol.converter.MessageConversionException; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.test.utils.QpidTestCase; + +public class PropertyConverter_Internal_to_v1_0Test extends QpidTestCase +{ + private MessageConverter_Internal_to_v1_0 _messageConverter; + private NamedAddressSpace _addressSpace; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _messageConverter = new MessageConverter_Internal_to_v1_0(); + _addressSpace = mock(NamedAddressSpace.class); + } + + public void testDurableTrueConversion() + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + InternalMessage originalMessage = createTestMessage(header, null, true); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertTrue("Unexpected persistence of message", convertedMessage.isPersistent()); + assertTrue("Unexpected persistence of meta data", + convertedMessage.getStoredMessage().getMetaData().isPersistent()); + } + + public void testDurableFalseConversion() + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + InternalMessage originalMessage = createTestMessage(header, null, false); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertFalse("Unexpected persistence of message", convertedMessage.isPersistent()); + assertFalse("Unexpected persistence of meta data", + convertedMessage.getStoredMessage().getMetaData().isPersistent()); + } + + public void testPriorityConversion() + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + byte priority = (byte) 7; + when(header.getPriority()).thenReturn(priority); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected priority", priority, convertedMessage.getMessageHeader().getPriority()); + } + + public void testExpirationConversion() throws InterruptedException + { + long ttl = 10000; + long expiryTime = System.currentTimeMillis() + ttl; + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getExpiration()).thenReturn(expiryTime); + InternalMessage originalMessage = createTestMessage(header); + Thread.sleep(1L); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Long convertedTtl = MessageConverter_from_1_0.getTtl(convertedMessage); + assertEquals("Unexpected TTL", expiryTime - originalMessage.getArrivalTime(), convertedTtl.longValue()); + } + + public void testContentEncodingConversion() + { + String contentEncoding = "my-test-encoding"; + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getEncoding()).thenReturn(contentEncoding); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Symbol convertedContentEncoding = MessageConverter_from_1_0.getContentEncoding(convertedMessage); + assertEquals("Unexpected content encoding", contentEncoding, convertedContentEncoding.toString()); + } + + public void testMessageIdStringConversion() + { + final String messageId = "testMessageId"; + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getMessageId()).thenReturn(messageId); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Object convertedMessageId = MessageConverter_from_1_0.getMessageId(convertedMessage); + assertEquals("Unexpected messageId", messageId, convertedMessageId); + } + + public void testMessageIdUuidConversion() + { + final UUID messageId = UUID.randomUUID(); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getMessageId()).thenReturn(messageId.toString()); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Object convertedMessageId = MessageConverter_from_1_0.getMessageId(convertedMessage); + assertEquals("Unexpected messageId", messageId, convertedMessageId); + } + + public void testMessageIdUnsignedLongConversion() + { + final UnsignedLong messageId = UnsignedLong.valueOf(-1L); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getMessageId()).thenReturn(messageId.toString()); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Object convertedMessageId = MessageConverter_from_1_0.getMessageId(convertedMessage); + assertEquals("Unexpected messageId", messageId, convertedMessageId); + } + + public void testCorrelationIdStringConversion() + { + final String correlationId = "testCorrelationId"; + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getCorrelationId()).thenReturn(correlationId); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Object convertedCorrelationId = MessageConverter_from_1_0.getCorrelationId(convertedMessage); + assertEquals("Unexpected messageId", correlationId, convertedCorrelationId); + } + + public void testCorrelationIdUuidConversion() + { + final UUID correlationId = UUID.randomUUID(); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getCorrelationId()).thenReturn(correlationId.toString()); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Object convertedCorrelationId = MessageConverter_from_1_0.getCorrelationId(convertedMessage); + assertEquals("Unexpected correlationId", correlationId, convertedCorrelationId); + } + + public void testCorrelationIdUnsignedLongConversion() + { + final UnsignedLong correlationId = UnsignedLong.valueOf(-1L); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getCorrelationId()).thenReturn(correlationId.toString()); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Object convertedCorrelationId = MessageConverter_from_1_0.getCorrelationId(convertedMessage); + assertEquals("Unexpected correlationId", correlationId, convertedCorrelationId); + } + + public void testUserIdConversion() + { + final String userId = "testUserId"; + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getUserId()).thenReturn(userId); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Binary convertedUserId = MessageConverter_from_1_0.getUserId(convertedMessage); + assertTrue("Unexpected userId", Arrays.equals(userId.getBytes(UTF_8), convertedUserId.getArray())); + } + + public void testReplyToConversion() + { + final String replyTo = "amq.direct/test"; + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getReplyTo()).thenReturn(replyTo); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + String convertedReplyTo = MessageConverter_from_1_0.getReplyTo(convertedMessage); + assertEquals("Unexpected replyTo", replyTo, convertedReplyTo); + } + + public void testTimestampConversion() + { + final long timestamp = System.currentTimeMillis(); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getTimestamp()).thenReturn(timestamp); + InternalMessage originalMessage = createTestMessage(header); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Date creationTime = MessageConverter_from_1_0.getCreationTime(convertedMessage); + assertNotNull("timestamp not converted", creationTime); + assertEquals("Unexpected timestamp", timestamp, creationTime.getTime()); + } + + public void testHeadersConversion() + { + final Map<String, Object> properties = new HashMap<>(); + properties.put("testProperty1", "testProperty1Value"); + properties.put("intProperty", 1); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getHeaderNames()).thenReturn(properties.keySet()); + doAnswer(invocation -> + { + final String originalArgument = (String) (invocation.getArguments())[0]; + return properties.get(originalArgument); + }).when(header).getHeader(any(String.class)); + InternalMessage originalMessage = createTestMessage(header); + + final Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + Map<String, Object> convertedHeaders = convertedMessage.getApplicationPropertiesSection().getValue(); + assertEquals("Unexpected application properties", properties, new HashMap<>(convertedHeaders)); + } + + public void testHeadersConversionWithNonSimpleTypes() + { + final Map<String, Object> properties = Collections.singletonMap("listProperty", Collections.emptyList()); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getHeaderNames()).thenReturn(properties.keySet()); + doAnswer(invocation -> + { + final String originalArgument = (String) (invocation.getArguments())[0]; + return properties.get(originalArgument); + }).when(header).getHeader(any(String.class)); + InternalMessage originalMessage = createTestMessage(header); + + try + { + _messageConverter.convert(originalMessage, _addressSpace); + fail("Expected exception not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + private InternalMessage createTestMessage(final AMQMessageHeader header) + { + return createTestMessage(header, null, false); + } + + private InternalMessage createTestMessage(final AMQMessageHeader header, + byte[] content, + final boolean persistent) + { + final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(header); + final int contentSize = content == null ? 0 : content.length; + final InternalMessageMetaData metaData = + new InternalMessageMetaData(persistent, internalMessageHeader, contentSize); + final StoredMessage<InternalMessageMetaData> storedMessage = mock(StoredMessage.class); + + when(storedMessage.getMetaData()).thenReturn(metaData); + when(storedMessage.getContentSize()).thenReturn(contentSize); + return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(storedMessage)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
