QPID-7434: Add unit tests for property conversion of AMQP 0-8..0-9-1 messages to internal messages
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/392187c1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/392187c1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/392187c1 Branch: refs/heads/master Commit: 392187c183c5b4c8e897aced80ef5e3933be39f5 Parents: 04b19e9 Author: Alex Rudyy <[email protected]> Authored: Tue Aug 1 16:13:46 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Tue Aug 1 16:13:46 2017 +0100 ---------------------------------------------------------------------- .../v0_8/MessageConverter_v0_8_to_Internal.java | 6 +- .../MessageConverter_v0_8_to_InternalTest.java | 322 +++++++++++++++++++ 2 files changed, 325 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/392187c1/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java index d16a239..19a937d 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java @@ -244,19 +244,19 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe AMQBindingURL burl = new AMQBindingURL(origReplyToString); ReplyToComponents replyTo = new ReplyToComponents(); String routingKey = burl.getRoutingKey(); - if(routingKey != null) + if(routingKey != null && !"".equals(routingKey)) { replyTo.setRoutingKey(routingKey); } String exchangeName = burl.getExchangeName(); - if(exchangeName != null) + if(exchangeName != null && !"".equals(exchangeName)) { replyTo.setExchange(exchangeName); } String queueName = burl.getQueueName(); - if(queueName != null) + if(queueName != null && !"".equals(queueName)) { replyTo.setQueue(queueName); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/392187c1/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_InternalTest.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_InternalTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_InternalTest.java new file mode 100644 index 0000000..cd22fd6 --- /dev/null +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_InternalTest.java @@ -0,0 +1,322 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v0_8; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.test.utils.QpidTestCase; + +public class MessageConverter_v0_8_to_InternalTest extends QpidTestCase +{ + private MessageConverter_v0_8_to_Internal _messageConverter; + private NamedAddressSpace _addressSpace; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _messageConverter = new MessageConverter_v0_8_to_Internal(); + _addressSpace = mock(NamedAddressSpace.class); + } + + public void testDeliveryModePersistentConversion() + { + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertTrue("Unexpected persistence of message", convertedMessage.isPersistent()); + assertTrue("Unexpected persistence of meta data", + convertedMessage.getStoredMessage().getMetaData().isPersistent()); + } + + public void testDeliveryModeNonPersistentConversion() + { + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setDeliveryMode(BasicContentHeaderProperties.NON_PERSISTENT); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertFalse("Unexpected persistence of message", convertedMessage.isPersistent()); + assertFalse("Unexpected persistence of meta data", + convertedMessage.getStoredMessage().getMetaData().isPersistent()); + } + + public void testPriorityConversion() + { + byte priority = (byte) 7; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setPriority(priority); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected priority", priority, convertedMessage.getMessageHeader().getPriority()); + } + + public void testExpirationConversion() + { + long ttl = 10000; + long arrivalTime = System.currentTimeMillis(); + long expiryTime = arrivalTime + ttl; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setExpiration(expiryTime); + final AMQMessage originalMessage = createTestMessage(header, arrivalTime); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected expiration", expiryTime, convertedMessage.getMessageHeader().getExpiration()); + } + + public void testContentEncodingConversion() + { + String contentEncoding = "my-test-encoding"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setEncoding(contentEncoding); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected content encoding", contentEncoding, convertedMessage.getMessageHeader().getEncoding()); + } + + public void testMessageIdConversion() + { + final String messageId = "testMessageId"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setMessageId(messageId); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected messageId", + messageId, + convertedMessage.getMessageHeader().getMessageId()); + } + + public void testCorrelationIdStringConversion() + { + final String correlationId = "testMessageCorrelationId"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setCorrelationId(correlationId); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected correlationId", + correlationId, + convertedMessage.getMessageHeader().getCorrelationId()); + } + + public void testUserIdConversion() + { + final String userId = "testUserId"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setUserId(userId); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected userId", userId, convertedMessage.getMessageHeader().getUserId()); + } + + public void testReplyToConversionForDirectExchangeAndRoutingKey() + { + String exchangeName = "amq.direct"; + String routingKey = "testRoutingKey"; + final String replyTo = String.format("%s://%s//?routingkey='%s'", "direct", exchangeName, routingKey); + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setReplyTo(replyTo); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected replyTo", + exchangeName + "/" + routingKey, + convertedMessage.getMessageHeader().getReplyTo()); + } + + public void testReplyToConversionForFanoutExchange() + { + String exchangeName = "amq.fanout"; + final String replyTo = String.format("%s://%s//", "fanout", exchangeName); + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setReplyTo(replyTo); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected replyTo", exchangeName, convertedMessage.getMessageHeader().getReplyTo()); + } + + public void testReplyToConversionForDefaultDestination() + { + String exchangeName = ""; + String routingKey = "testRoutingKey"; + final String replyTo = String.format("%s://%s//?routingkey='%s'", "direct", exchangeName, routingKey); + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setReplyTo(replyTo); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected replyTo", routingKey, convertedMessage.getMessageHeader().getReplyTo()); + } + + public void testReplyToNonBurl() + { + final String replyTo = "test/routing"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setReplyTo(replyTo); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected replyTo", replyTo, convertedMessage.getMessageHeader().getReplyTo()); + } + + public void testTimestampConversion() + { + final long creationTime = System.currentTimeMillis(); + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setTimestamp(creationTime); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected timestamp", creationTime, convertedMessage.getMessageHeader().getTimestamp()); + } + + public void testHeadersConversion() + { + Map<String, Object> properties = new HashMap<>(); + properties.put("testProperty1", "testProperty1Value"); + properties.put("intProperty", 1); + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setHeaders(FieldTable.convertToFieldTable(properties)); + final AMQMessage originalMessage = createTestMessage(header); + + final InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + final Map<String, Object> headers = convertedMessage.getMessageHeader().getHeaderMap(); + assertEquals("Unexpected headers", properties, new HashMap<>(headers)); + } + + public void testContentTypeConversion() + { + final String contentType = "text/json"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setContentType(contentType); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected content type", contentType, convertedMessage.getMessageHeader().getMimeType()); + } + + public void testTypeConversion() + { + final String type = "JMSType"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setType(type); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected type", type, convertedMessage.getMessageHeader().getType()); + } + + public void testApplicationIdConversion() + { + final String applicationId = "appId"; + BasicContentHeaderProperties header = new BasicContentHeaderProperties(); + header.setAppId(applicationId); + final AMQMessage originalMessage = createTestMessage(header); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected applicationId", applicationId, convertedMessage.getMessageHeader().getAppId()); + } + + public void testBasicPublishConversion() + { + final String exchangeName = "amq.direct"; + final String testRoutingKey = "test-routing-key"; + + final AMQMessage originalMessage = createTestMessage(new BasicContentHeaderProperties()); + originalMessage.getMessagePublishInfo().setRoutingKey(AMQShortString.valueOf(testRoutingKey)); + originalMessage.getMessagePublishInfo().setExchange(AMQShortString.valueOf(exchangeName)); + + InternalMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected to", exchangeName, convertedMessage.getTo()); + + // TODO: QPID-7868 : add test for initialRoutingAddress + } + + private AMQMessage createTestMessage(final BasicContentHeaderProperties basicContentHeaderProperties) + { + return createTestMessage(basicContentHeaderProperties, null, 0); + } + + private AMQMessage createTestMessage(final BasicContentHeaderProperties basicContentHeaderProperties, + long arrivalTime) + { + return createTestMessage(basicContentHeaderProperties, null, arrivalTime); + } + + private AMQMessage createTestMessage(final BasicContentHeaderProperties basicContentHeaderProperties, + final byte[] content, final long arrivalTime) + { + final ContentHeaderBody contentHeaderBody = mock(ContentHeaderBody.class); + when(contentHeaderBody.getProperties()).thenReturn(basicContentHeaderProperties); + + final StoredMessage<MessageMetaData> storedMessage = mock(StoredMessage.class); + when(storedMessage.getMetaData()).thenReturn(new MessageMetaData(new MessagePublishInfo(), + contentHeaderBody, + arrivalTime)); + + if (content != null) + { + when(storedMessage.getContentSize()).thenReturn(content.length); + when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap( + content))); + } + + return new AMQMessage(storedMessage); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
