Repository: qpid-broker-j Updated Branches: refs/heads/master dc2e9d425 -> db971ea2b
QPID-8139: [Broker-J][AMQP 1.0] Make sure that selector filter can handle JMSMessageID and JMSCorrelationID values with prefixes as defined in AMQP JMS mapping specification Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f1ce8666 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f1ce8666 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f1ce8666 Branch: refs/heads/master Commit: f1ce8666b80aa1ddd30445a6b2ce47db6b6943e9 Parents: dc2e9d4 Author: Alex Rudyy <[email protected]> Authored: Fri Apr 20 17:46:46 2018 +0100 Committer: Alex Rudyy <[email protected]> Committed: Thu May 24 17:15:19 2018 +0100 ---------------------------------------------------------------------- .../apache/qpid/server/filter/Filterable.java | 5 +- .../qpid/server/filter/FilterableMessage.java | 4 +- .../filter/JMSMessagePropertyExpression.java | 8 +- .../qpid/server/filter/JMSSelectorFilter.java | 11 +- .../qpid/server/message/AMQMessageHeader.java | 4 +- .../qpid/server/message/MessageInfoImpl.java | 5 +- .../message/internal/InternalMessageHeader.java | 8 +- .../server/exchange/HeadersBindingTest.java | 4 +- .../protocol/v0_10/MessageConverter_v0_10.java | 5 +- .../MessageConverter_v0_10_to_Internal.java | 4 +- .../protocol/v0_10/MessageTransferHeader.java | 2 +- .../v0_8/MessageConverter_v0_8_to_Internal.java | 4 +- .../v1_0/MessageConverter_v1_0_to_Internal.java | 2 - .../protocol/v1_0/MessageMetaData_1_0.java | 8 +- .../protocol/v1_0/SendingLinkEndpoint.java | 5 +- .../v1_0/selector/AmqpMessageIdHelper.java | 391 +++++++++++++++++++ .../selector/JMSMessagePropertyExpression.java | 119 ++++++ .../JMSMessagePropertyExpressionTest.java | 345 ++++++++++++++++ .../qpid/tests/protocol/v1_0/Interaction.java | 8 + .../tests/protocol/v1_0/MessageDecoder.java | 11 + .../JMSMessagePropertiesFilterTest.java | 356 +++++++++++++++++ 21 files changed, 1273 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java index a15bdc1..589238c 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java +++ b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.filter; -import org.apache.qpid.server.filter.FilterableMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; @@ -104,7 +103,7 @@ public interface Filterable extends FilterableMessage } @Override - public String getMessageId() + public Object getMessageId() { return message.getMessageHeader().getMessageId(); } @@ -116,7 +115,7 @@ public interface Filterable extends FilterableMessage } @Override - public String getCorrelationId() + public Object getCorrelationId() { return message.getMessageHeader().getCorrelationId(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java index d41c33b..05ac9ee 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java +++ b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java @@ -31,11 +31,11 @@ public interface FilterableMessage byte getPriority(); - String getMessageId(); + Object getMessageId(); long getTimestamp(); - String getCorrelationId(); + Object getCorrelationId(); long getExpiration(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java index c94dd0d..b5cf4eb 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java +++ b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java @@ -213,9 +213,9 @@ public class JMSMessagePropertyExpression implements PropertyExpression<Filterab public Object evaluate(FilterableMessage message) { - String messageId = message.getMessageId(); + Object messageId = message.getMessageId(); - return messageId; + return messageId == null ? null : String.valueOf(messageId); } } @@ -236,9 +236,9 @@ public class JMSMessagePropertyExpression implements PropertyExpression<Filterab public Object evaluate(FilterableMessage message) { - String correlationId = message.getCorrelationId(); + Object correlationId = message.getCorrelationId(); - return correlationId; + return correlationId == null ? null : String.valueOf(correlationId); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 3e85861..5ba3da8 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -39,9 +39,16 @@ public class JMSSelectorFilter implements MessageFilter public JMSSelectorFilter(String selector) throws ParseException, TokenMgrError, SelectorParsingException { + this(selector, JMSMessagePropertyExpression.FACTORY); + } + + public JMSSelectorFilter(String selector, + PropertyExpressionFactory<? extends FilterableMessage> propertyExpressionFactory) + throws ParseException, TokenMgrError, SelectorParsingException + { _selector = selector; - SelectorParser<FilterableMessage> selectorParser = new SelectorParser<>(); - selectorParser.setPropertyExpressionFactory(JMSMessagePropertyExpression.FACTORY); + SelectorParser selectorParser = new SelectorParser<>(); + selectorParser.setPropertyExpressionFactory(propertyExpressionFactory); _matcher = selectorParser.parse(selector); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java index c7d28d1..4b159a7 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java @@ -25,7 +25,7 @@ import java.util.Set; public interface AMQMessageHeader { - String getCorrelationId(); + Object getCorrelationId(); long getExpiration(); @@ -35,7 +35,7 @@ public interface AMQMessageHeader String getGroupId(); - String getMessageId(); + Object getMessageId(); String getMimeType(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java index 7626dc6..cda04d8 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java @@ -62,10 +62,11 @@ public class MessageInfoImpl implements MessageInfo _arrivalTime = message.getArrivalTime() == 0L ? null : new Date(message.getArrivalTime()); _messageType = message.getMessageType(); _persistent = message.isPersistent(); - _messageId = messageHeader.getMessageId(); + _messageId = messageHeader.getMessageId() == null ? null : String.valueOf(messageHeader.getMessageId()); _expirationTime = messageHeader.getExpiration() == 0L ? null : new Date(messageHeader.getExpiration()); _applicationId = messageHeader.getAppId(); - _correlationId = messageHeader.getCorrelationId(); + _correlationId = + messageHeader.getCorrelationId() == null ? null : String.valueOf(messageHeader.getCorrelationId()); _encoding = messageHeader.getEncoding(); _mimeType = messageHeader.getMimeType(); _priority = messageHeader.getPriority(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java index 46f4a2e..0a85e3b 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java @@ -49,11 +49,11 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab private long _arrivalTime; public InternalMessageHeader(final Map<String, Object> headers, - final String correlationId, + final Object correlationId, final long expiration, final String userId, final String appId, - final String messageId, + final Object messageId, final String mimeType, final String encoding, final byte priority, @@ -65,11 +65,11 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab { _headers = headers == null ? new LinkedHashMap<>() : new LinkedHashMap<>(headers); - _correlationId = correlationId; + _correlationId = correlationId == null ? null : String.valueOf(correlationId); _expiration = expiration; _userId = userId; _appId = appId; - _messageId = messageId; + _messageId = messageId == null ? null : String.valueOf(messageId); _mimeType = mimeType; _encoding = encoding; _priority = priority; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 126c07d..05796df 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -56,7 +56,7 @@ public class HeadersBindingTest extends UnitTestBase private final Map<String, Object> _headers = new HashMap<String, Object>(); @Override - public String getCorrelationId() + public Object getCorrelationId() { return null; } @@ -86,7 +86,7 @@ public class HeadersBindingTest extends UnitTestBase } @Override - public String getMessageId() + public Object getMessageId() { return null; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 8c9db87..42afc4f 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -139,9 +139,10 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); messageProps.setContentLength(size); messageProps.setContentType(serverMsg.getMessageHeader().getMimeType()); - if(serverMsg.getMessageHeader().getCorrelationId() != null) + if (serverMsg.getMessageHeader().getCorrelationId() != null) { - messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes(UTF_8)); + messageProps.setCorrelationId(String.valueOf(serverMsg.getMessageHeader().getCorrelationId()) + .getBytes(UTF_8)); } Header header = new Header(deliveryProps, messageProps, null); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java index 8121dd6..ce8bf62 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java @@ -157,7 +157,7 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess } @Override - public String getCorrelationId() + public Object getCorrelationId() { return _delegate.getCorrelationId(); } @@ -187,7 +187,7 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess } @Override - public String getMessageId() + public Object getMessageId() { return _delegate.getMessageId(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java index 6156781..ff92fd9 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java @@ -110,7 +110,7 @@ class MessageTransferHeader implements AMQMessageHeader { UUID id = _messageProps == null ? null : _messageProps.getMessageId(); - return id == null ? null : "ID:"+String.valueOf(id); + return id == null ? null : "ID:" + String.valueOf(id); } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java index 28efa43..5ad2ebe 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java @@ -203,7 +203,7 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe } @Override - public String getCorrelationId() + public Object getCorrelationId() { return _delegate.getCorrelationId(); } @@ -233,7 +233,7 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe } @Override - public String getMessageId() + public Object getMessageId() { return _delegate.getMessageId(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java index 15039d6..c486358 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java @@ -33,8 +33,6 @@ import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection; -import org.apache.qpid.server.util.ServerScopedRuntimeException; @PluggableService public class MessageConverter_v1_0_to_Internal implements MessageConverter<Message_1_0, InternalMessage> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index d1ea39a..9f3d83d 100755 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -485,7 +485,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData private final AtomicReference<String> _decodedUserId = new AtomicReference<>(); @Override - public String getCorrelationId() + public Object getCorrelationId() { if (_propertiesSection == null || _propertiesSection.getValue().getCorrelationId() == null) { @@ -493,7 +493,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData } else { - return _propertiesSection.getValue().getCorrelationId().toString(); + return _propertiesSection.getValue().getCorrelationId(); } } @@ -505,7 +505,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData } @Override - public String getMessageId() + public Object getMessageId() { if (_propertiesSection == null || _propertiesSection.getValue().getMessageId() == null) { @@ -513,7 +513,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData } else { - return _propertiesSection.getValue().getMessageId().toString(); + return _propertiesSection.getValue().getMessageId(); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java index a69b96a..0bd9b4b 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java @@ -51,6 +51,7 @@ import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.NotFoundException; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.LinkModel; +import org.apache.qpid.server.protocol.v1_0.selector.JMSMessagePropertyExpression; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.BaseSource; import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; @@ -170,8 +171,8 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target> org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue(); try { - messageFilter = new JMSSelectorFilter(selectorFilter.getValue()); - + messageFilter = new JMSSelectorFilter(selectorFilter.getValue(), + JMSMessagePropertyExpression.FACTORY); actualFilters.put(entry.getKey(), entry.getValue()); } catch (ParseException | SelectorParsingException | TokenMgrError e) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java new file mode 100644 index 0000000..75f05ad --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java @@ -0,0 +1,391 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0.selector; + +// +// Based on like named file from 3c8d67a6eee38934d233f39b2d614ac4c28b39be of the Apache Qpid JMS <https://git-wip-us.apache.org/repos/asf/qpid-jms.git> +// + +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.qpid.server.protocol.converter.MessageConversionException; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; + + +/** + * Helper class for identifying and converting message-id and correlation-id values between + * the AMQP types and the Strings values used by JMS. + * <p> + * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary, + * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these + * for interoperability with other AMQP clients, the following encoding can be used after removing or + * before adding the "ID:" prefix used for a JMSMessageID value:<br> + * <p> + * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br> + * {@literal "AMQP_UUID:<string representation of uuid>"}<br> + * {@literal "AMQP_ULONG:<string representation of ulong>"}<br> + * {@literal "AMQP_STRING:<string>"}<br> + * <p> + * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin + * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise. + * <p> + * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or + * ulong but can't be converted into the indicated format, an exception will be thrown. + */ +public class AmqpMessageIdHelper +{ + public static final AmqpMessageIdHelper INSTANCE = new AmqpMessageIdHelper(); + + public static final String AMQP_STRING_PREFIX = "AMQP_STRING:"; + public static final String AMQP_UUID_PREFIX = "AMQP_UUID:"; + public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:"; + public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:"; + public static final String AMQP_NO_PREFIX = "AMQP_NO_PREFIX:"; + public static final String JMS_ID_PREFIX = "ID:"; + + private static final String AMQP_PREFIX = "AMQP_"; + private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length(); + private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length(); + private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length(); + private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length(); + private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length(); + private static final int AMQP_NO_PREFIX_LENGTH = AMQP_NO_PREFIX.length(); + private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); + + /** + * Checks whether the given string begins with "ID:" prefix used to denote a JMSMessageID + * + * @param string the string to check + * @return true if and only id the string begins with "ID:" + */ + public boolean hasMessageIdPrefix(String string) + { + if (string == null) + { + return false; + } + + return string.startsWith(JMS_ID_PREFIX); + } + + public String toMessageIdString(Object idObject) + { + if (idObject instanceof String) + { + final String stringId = (String) idObject; + + boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId); + if (!hasMessageIdPrefix) + { + // For JMSMessageID, has no "ID:" prefix, we need to record + // that for later use as a JMSCorrelationID. + return JMS_ID_PREFIX + AMQP_NO_PREFIX + stringId; + } + else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH)) + { + // We are for a JMSMessageID value, but have 'ID:' followed by + // one of the encoding prefixes. Need to escape the entire string + // to preserve for later re-use as a JMSCorrelationID. + return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId; + } + else + { + // It has "ID:" prefix and doesn't have encoding prefix, use it as-is. + return stringId; + } + } + else + { + // Not a string, convert it + return convertToIdString(idObject); + } + } + + public String toCorrelationIdString(Object idObject) + { + + if (idObject instanceof String) + { + final String stringId = (String) idObject; + + boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId); + if (!hasMessageIdPrefix) + { + // For JMSCorrelationID, has no "ID:" prefix, use it as-is. + return stringId; + } + else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH)) + { + // We are for a JMSCorrelationID value, but have 'ID:' followed by + // one of the encoding prefixes. Need to escape the entire string + // to preserve for later re-use as a JMSCorrelationID. + return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId; + } + else + { + // It has "ID:" prefix and doesn't have encoding prefix, use it as-is. + return stringId; + } + } + else + { + // Not a string, convert it + return convertToIdString(idObject); + } + } + + /** + * Takes the provided non-String AMQP message-id/correlation-id object, and + * convert it it to a String usable as either a JMSMessageID or JMSCorrelationID + * value, encoding the type information as a prefix to convey for later use + * in reversing the process if used to set JMSCorrelationID on a message. + * + * @param idObject the object to process + * @return string to be used for the actual JMS ID. + */ + private String convertToIdString(Object idObject) + { + if (idObject == null) + { + return null; + } + + if (idObject instanceof UUID) + { + return JMS_ID_PREFIX + AMQP_UUID_PREFIX + idObject.toString(); + } + else if (idObject instanceof UnsignedLong) + { + return JMS_ID_PREFIX + AMQP_ULONG_PREFIX + idObject.toString(); + } + else if (idObject instanceof Binary) + { + ByteBuffer dup = ((Binary) idObject).asByteBuffer(); + + byte[] bytes = new byte[dup.remaining()]; + dup.get(bytes); + + String hex = convertBinaryToHexString(bytes); + + return JMS_ID_PREFIX + AMQP_BINARY_PREFIX + hex; + } + else + { + throw new IllegalArgumentException("Unsupported type provided: " + idObject.getClass()); + } + } + + private boolean hasTypeEncodingPrefix(String stringId, int offset) + { + if (!stringId.startsWith(AMQP_PREFIX, offset)) + { + return false; + } + + return hasAmqpBinaryPrefix(stringId, offset) || + hasAmqpUuidPrefix(stringId, offset) || + hasAmqpUlongPrefix(stringId, offset) || + hasAmqpStringPrefix(stringId, offset) || + hasAmqpNoPrefix(stringId, offset); + } + + private boolean hasAmqpStringPrefix(String stringId, int offset) + { + return stringId.startsWith(AMQP_STRING_PREFIX, offset); + } + + private boolean hasAmqpUlongPrefix(String stringId, int offset) + { + return stringId.startsWith(AMQP_ULONG_PREFIX, offset); + } + + private boolean hasAmqpUuidPrefix(String stringId, int offset) + { + return stringId.startsWith(AMQP_UUID_PREFIX, offset); + } + + private boolean hasAmqpBinaryPrefix(String stringId, int offset) + { + return stringId.startsWith(AMQP_BINARY_PREFIX, offset); + } + + private boolean hasAmqpNoPrefix(String stringId, int offset) + { + return stringId.startsWith(AMQP_NO_PREFIX, offset); + } + + /** + * Takes the provided id string and return the appropriate amqp messageId style object. + * Converts the type based on any relevant encoding information found as a prefix. + * + * @param origId the object to be converted + * @return the amqp messageId style object + * @throws public class MessageConversionException extends RuntimeException + * if the provided baseId String indicates an encoded type but can't be converted to that type. + */ + public Object toIdObject(final String origId) throws MessageConversionException + { + if (origId == null) + { + return null; + } + + if (!AmqpMessageIdHelper.INSTANCE.hasMessageIdPrefix(origId)) + { + // We have a string without any "ID:" prefix, it is an + // application-specific String, use it as-is. + return origId; + } + + try + { + if (hasAmqpNoPrefix(origId, JMS_ID_PREFIX_LENGTH)) + { + // Prefix telling us there was originally no "ID:" prefix, + // strip it and return the remainder + return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_NO_PREFIX_LENGTH); + } + else if (hasAmqpUuidPrefix(origId, JMS_ID_PREFIX_LENGTH)) + { + String uuidString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_UUID_PREFIX_LENGTH); + return UUID.fromString(uuidString); + } + else if (hasAmqpUlongPrefix(origId, JMS_ID_PREFIX_LENGTH)) + { + String ulongString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_ULONG_PREFIX_LENGTH); + return UnsignedLong.valueOf(ulongString); + } + else if (hasAmqpStringPrefix(origId, JMS_ID_PREFIX_LENGTH)) + { + return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_STRING_PREFIX_LENGTH); + } + else if (hasAmqpBinaryPrefix(origId, JMS_ID_PREFIX_LENGTH)) + { + String hexString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_BINARY_PREFIX_LENGTH); + byte[] bytes = convertHexStringToBinary(hexString); + return new Binary(bytes); + } + else + { + // We have a string without any encoding prefix needing processed, + // so transmit it as-is, including the "ID:" + return origId; + } + } + catch (IllegalArgumentException e) + { + throw new MessageConversionException("Unable to convert ID value", e); + } + } + + /** + * Convert the provided hex-string into a binary representation where each byte represents + * two characters of the hex string. + * <p> + * The hex characters may be upper or lower case. + * + * @param hexString string to convert + * @return a byte array containing the binary representation + * @throws IllegalArgumentException if the provided String is a non-even length or contains non-hex characters + */ + public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException + { + int length = hexString.length(); + + // As each byte needs two characters in the hex encoding, the string must be an even length. + if (length % 2 != 0) + { + throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + + length + + ": " + + hexString); + } + + byte[] binary = new byte[length / 2]; + + for (int i = 0; i < length; i += 2) + { + char highBitsChar = hexString.charAt(i); + char lowBitsChar = hexString.charAt(i + 1); + + int highBits = hexCharToInt(highBitsChar, hexString) << 4; + int lowBits = hexCharToInt(lowBitsChar, hexString); + + binary[i / 2] = (byte) (highBits + lowBits); + } + + return binary; + } + + private int hexCharToInt(char ch, String orig) throws IllegalArgumentException + { + if (ch >= '0' && ch <= '9') + { + // subtract '0' to get difference in position as an int + return ch - '0'; + } + else if (ch >= 'A' && ch <= 'F') + { + // subtract 'A' to get difference in position as an int + // and then add 10 for the offset of 'A' + return ch - 'A' + 10; + } + else if (ch >= 'a' && ch <= 'f') + { + // subtract 'a' to get difference in position as an int + // and then add 10 for the offset of 'a' + return ch - 'a' + 10; + } + + throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig); + } + + /** + * Convert the provided binary into a hex-string representation where each character + * represents 4 bits of the provided binary, i.e each byte requires two characters. + * <p> + * The returned hex characters are upper-case. + * + * @param bytes binary to convert + * @return a String containing a hex representation of the bytes + */ + public String convertBinaryToHexString(byte[] bytes) + { + // Each byte is represented as 2 chars + StringBuilder builder = new StringBuilder(bytes.length * 2); + + for (byte b : bytes) + { + // The byte will be expanded to int before shifting, replicating the + // sign bit, so mask everything beyond the first 4 bits afterwards + int highBitsInt = (b >> 4) & 0xF; + // We only want the first 4 bits + int lowBitsInt = b & 0xF; + + builder.append(HEX_CHARS[highBitsInt]); + builder.append(HEX_CHARS[lowBitsInt]); + } + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java new file mode 100644 index 0000000..46c0ca0 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0.selector; + +import java.util.HashMap; + +import org.apache.qpid.server.filter.Expression; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.PropertyExpression; +import org.apache.qpid.server.filter.PropertyExpressionFactory; + +public class JMSMessagePropertyExpression implements PropertyExpression<Filterable> +{ + public static final PropertyExpressionFactory<Filterable> FACTORY = JMSMessagePropertyExpression::new; + + static final String JMS_MESSAGE_ID = "JMSMessageID"; + static final String JMS_CORRELATION_ID = "JMSCorrelationID"; + static final String JMS_REPLY_TO = "JMSReplyTo"; + static final String JMS_TYPE = "JMSType"; + static final String JMS_DELIVERY_MODE = "JMSDeliveryMode"; + static final String JMS_PRIORITY = "JMSPriority"; + static final String JMS_TIMESTAMP = "JMSTimestamp"; + static final String JMS_EXPIRATION = "JMSExpiration"; + static final String JMS_REDELIVERED = "JMSRedelivered"; + + private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<>(); + + static + { + JMS_PROPERTY_EXPRESSIONS.put(JMS_REPLY_TO, (Expression<Filterable>) Filterable::getReplyTo); + JMS_PROPERTY_EXPRESSIONS.put(JMS_TYPE, (Expression<Filterable>) Filterable::getType); + JMS_PROPERTY_EXPRESSIONS.put(JMS_DELIVERY_MODE, + (Expression<Filterable>) message -> (message.isPersistent() + ? JMSDeliveryMode.PERSISTENT + : JMSDeliveryMode.NON_PERSISTENT).toString()); + JMS_PROPERTY_EXPRESSIONS.put(JMS_PRIORITY, (Expression<Filterable>) message -> (int) message.getPriority()); + JMS_PROPERTY_EXPRESSIONS.put(JMS_MESSAGE_ID, + (Expression<Filterable>) message -> AmqpMessageIdHelper.INSTANCE.toMessageIdString( + message.getMessageId())); + JMS_PROPERTY_EXPRESSIONS.put(JMS_TIMESTAMP, (Expression<Filterable>) Filterable::getTimestamp); + JMS_PROPERTY_EXPRESSIONS.put(JMS_CORRELATION_ID, + (Expression<Filterable>) message -> AmqpMessageIdHelper.INSTANCE.toCorrelationIdString( + message.getCorrelationId())); + JMS_PROPERTY_EXPRESSIONS.put(JMS_EXPIRATION, (Expression<Filterable>) Filterable::getExpiration); + JMS_PROPERTY_EXPRESSIONS.put(JMS_REDELIVERED, (Expression<Filterable>) Filterable::isRedelivered); + } + + private final String name; + private final Expression jmsPropertyExpression; + + private JMSMessagePropertyExpression(String name) + { + this.name = name; + jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name); + } + + @Override + @SuppressWarnings("unchecked") + public Object evaluate(Filterable message) + { + if (jmsPropertyExpression != null) + { + return jmsPropertyExpression.evaluate(message); + } + else + { + return message.getHeader(name); + } + } + + public String getName() + { + return name; + } + + @Override + public String toString() + { + return name; + } + + @Override + public int hashCode() + { + return name.hashCode(); + } + + @Override + public boolean equals(Object o) + { + return (o != null) + && this.getClass().equals(o.getClass()) + && name.equals(((JMSMessagePropertyExpression) o).name); + } + + // Constants - defined the same as JMS + enum JMSDeliveryMode + { + NON_PERSISTENT, PERSISTENT + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java new file mode 100644 index 0000000..2eb578f --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java @@ -0,0 +1,345 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0.selector; + +import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_BINARY_PREFIX; +import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_NO_PREFIX; +import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_STRING_PREFIX; +import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_ULONG_PREFIX; +import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_UUID_PREFIX; +import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.JMS_ID_PREFIX; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.UUID; + +import org.junit.Test; + +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.PropertyExpression; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; + +public class JMSMessagePropertyExpressionTest +{ + + @Test + public void evaluateJMSMessageIDForStringWithIDPrefix() + { + final String id = "ID:testID"; + final Filterable message = createFilterableWithMessageId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(id))); + } + + @Test + public void evaluateJMSMessageIDForStringWithoutIDPrefix() + { + final String id = "testID"; + final Filterable message = createFilterableWithMessageId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_NO_PREFIX + id))); + } + + @Test + public void evaluateJMSMessageIDForUUID() + { + final UUID id = UUID.randomUUID(); + final Filterable message = createFilterableWithMessageId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_UUID_PREFIX + id))); + } + + @Test + public void evaluateJMSMessageIDForUnsignedLong() + { + final UnsignedLong id = UnsignedLong.valueOf(42); + final Filterable message = createFilterableWithMessageId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_ULONG_PREFIX + id))); + } + + @Test + public void evaluateJMSMessageIDForBinary() + { + byte[] data = {1, 2, 3}; + final Binary id = new Binary(data); + final Filterable message = createFilterableWithMessageId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_BINARY_PREFIX + AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)))); + } + + @Test + public void evaluateJMSMessageIDForStringWithAMQPBindingPrefixes() + { + final String id = "ID:AMQP_ULONG:string-id"; + final Filterable message = createFilterableWithMessageId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_STRING_PREFIX + id))); + } + + @Test + public void evaluateJMSCorrelationIDForStringWithIDPrefix() + { + final String id = "ID:testID"; + final Filterable message = createFilterableWithCorrelationId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(id))); + } + + @Test + public void evaluateJMSCorrelationIDForStringWithoutIDPrefix() + { + final String id = "testID"; + final Filterable message = createFilterableWithCorrelationId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(id))); + } + + @Test + public void evaluateJMSCorrelationIDForUUID() + { + final UUID id = UUID.randomUUID(); + final Filterable message = createFilterableWithCorrelationId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_UUID_PREFIX + id))); + } + + @Test + public void evaluateJMSCorrelationIDForUnsignedLong() + { + final UnsignedLong id = UnsignedLong.valueOf(42); + final Filterable message = createFilterableWithCorrelationId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_ULONG_PREFIX + id))); + } + + @Test + public void evaluateJMSCorrelationIDForBinary() + { + byte[] data = {1, 2, 3}; + final Binary id = new Binary(data); + final Filterable message = createFilterableWithCorrelationId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID); + Object value = expression.evaluate(message); + + assertThat(value, + is(equalTo(JMS_ID_PREFIX + + AMQP_BINARY_PREFIX + + AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)))); + } + + @Test + public void evaluateJMSCorrelationIDForStringWithAMQPBindingPrefixes() + { + final String id = "ID:AMQP_ULONG:string-id"; + final Filterable message = createFilterableWithCorrelationId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_STRING_PREFIX + id))); + } + + @Test + public void evaluateJMSCorrelationIDForStringWithAMQPTypePrefixes() + { + final String id = "AMQP_ULONG:foo"; + final Filterable message = createFilterableWithCorrelationId(id); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(id))); + } + + @Test + public void evaluateJMSReplyTo() + { + final String replyTo = "testReplyTo"; + final Filterable message = mock(Filterable.class); + when(message.getReplyTo()).thenReturn(replyTo); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_REPLY_TO); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(replyTo))); + } + + @Test + public void evaluateJMSType() + { + final String type = "testType"; + final Filterable message = mock(Filterable.class); + when(message.getType()).thenReturn(type); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_TYPE); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(type))); + } + + @Test + public void evaluateJMSDeliveryModeForPersistentMessage() + { + final Filterable message = mock(Filterable.class); + when(message.isPersistent()).thenReturn(true); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_DELIVERY_MODE); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMSMessagePropertyExpression.JMSDeliveryMode.PERSISTENT.toString()))); + } + + @Test + public void evaluateJMSDeliveryModeForNonPersistentMessage() + { + final Filterable message = mock(Filterable.class); + when(message.isPersistent()).thenReturn(false); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_DELIVERY_MODE); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(JMSMessagePropertyExpression.JMSDeliveryMode.NON_PERSISTENT.toString()))); + } + + @Test + public void evaluateJMSPriority() + { + int priority = 5; + final Filterable message = mock(Filterable.class); + when(message.getPriority()).thenReturn((byte) priority); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_PRIORITY); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(priority))); + } + + @Test + public void evaluateJMSTimestamp() + { + long timestamp = System.currentTimeMillis(); + final Filterable message = mock(Filterable.class); + when(message.getTimestamp()).thenReturn(timestamp); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_TIMESTAMP); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(timestamp))); + } + + @Test + public void evaluateJMSExpiration() + { + long expiration = System.currentTimeMillis() + 10000; + final Filterable message = mock(Filterable.class); + when(message.getExpiration()).thenReturn(expiration); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_EXPIRATION); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(expiration))); + } + + @Test + public void evaluateJMSRedelivered() + { + boolean redelivered = true; + final Filterable message = mock(Filterable.class); + when(message.isRedelivered()).thenReturn(redelivered); + + PropertyExpression<Filterable> expression = + JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_REDELIVERED); + Object value = expression.evaluate(message); + + assertThat(value, is(equalTo(redelivered))); + } + + private Filterable createFilterableWithCorrelationId(final Object id) + { + final Filterable message = mock(Filterable.class); + when(message.getCorrelationId()).thenReturn(id); + return message; + } + + private Filterable createFilterableWithMessageId(final Object id) + { + final Filterable message = mock(Filterable.class); + when(message.getMessageId()).thenReturn(id); + return message; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index 77f20d0..0919edc 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -58,6 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; @@ -113,6 +114,7 @@ public class Interaction extends AbstractInteraction<Interaction> private Object _decodedLatestDelivery; private UnsignedInteger _latestDeliveryId; private Map<String, Object> _latestDeliveryApplicationProperties; + private Properties _latestDeliveryProperties; Interaction(final FrameTransport frameTransport) { @@ -1083,6 +1085,7 @@ public class Interaction extends AbstractInteraction<Interaction> }); _decodedLatestDelivery = messageDecoder.getData(); _latestDeliveryApplicationProperties = messageDecoder.getApplicationProperties(); + _latestDeliveryProperties = messageDecoder.getProperties(); _latestDelivery = null; return this; } @@ -1102,6 +1105,11 @@ public class Interaction extends AbstractInteraction<Interaction> return _latestDeliveryApplicationProperties; } + public Properties getLatestDeliveryProperties() + { + return _latestDeliveryProperties; + } + private List<Transfer> receiveAllTransfers(final Class<?>... ignore) throws Exception { List<Transfer> transfers = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java index 0df1abd..8af6f3e 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSect import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; @@ -213,4 +214,14 @@ public class MessageDecoder } return Collections.emptyMap(); } + + public Properties getProperties() throws AmqpErrorException + { + parse(); + if (_propertiesSection != null) + { + return _propertiesSection.getValue(); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java new file mode 100644 index 0000000..49a2e37 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java @@ -0,0 +1,356 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms; + +import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assume.assumeThat; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.UUID; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.protocol.v1_0.FrameTransport; +import org.apache.qpid.tests.protocol.v1_0.Interaction; +import org.apache.qpid.tests.protocol.v1_0.MessageEncoder; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; +import org.apache.qpid.tests.utils.BrokerSpecific; + +@BrokerSpecific(kind = KIND_BROKER_J) +public class JMSMessagePropertiesFilterTest extends BrokerAdminUsingTestBase +{ + private static final String TEST_MESSAGE_CONTENT = "testContent"; + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter." + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSMessageID representation for UUID values : " + + "'ID:AMQP_UUID:<string representation of uuid>'") + public void selectorWithJMSMessageIDAsUUID() throws Exception + { + final UUID messageId = UUID.randomUUID(); + final Properties properties = new Properties(); + properties.setMessageId(messageId); + perform(properties, String.format("JMSMessageID='ID:AMQP_UUID:%s'", messageId), "messageId", messageId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSMessageID representation for unsigned long values : " + + "'ID:AMQP_ULONG:<string representation of ulong>'") + public void selectorWithJMSMessageIDAsUnsignedLong() throws Exception + { + final UnsignedLong messageId = UnsignedLong.ONE; + final Properties properties = new Properties(); + properties.setMessageId(messageId); + perform(properties, + String.format("JMSMessageID='ID:AMQP_ULONG:%d'", messageId.longValue()), + "messageId", + messageId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSMessageID representation for Binary values : " + + "'ID:AMQP_BINARY:<hex representation of bytes>'") + public void selectorWithJMSMessageIDAsBinary() throws Exception + { + byte[] data = {1, 2, 3}; + final Binary messageId = new Binary(data); + final Properties properties = new Properties(); + properties.setMessageId(messageId); + perform(properties, + String.format("JMSMessageID='ID:AMQP_BINARY:%s'", + AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)), + "messageId", + messageId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSMessageID representation for String values without prefix 'ID:' : " + + "'ID:AMQP_NO_PREFIX:<original-string>'") + public void selectorWithJMSMessageIDAsStringWithoutPrefix() throws Exception + { + final String messageId = "testId"; + final Properties properties = new Properties(); + properties.setMessageId(messageId); + perform(properties, String.format("JMSMessageID='ID:AMQP_NO_PREFIX:%s'", messageId), "messageId", messageId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSMessageID representation for String values with prefix 'ID:' : " + + "'ID:<original-string>'") + public void selectorWithJMSMessageIDAsStringWithPrefix() throws Exception + { + final String messageId = "ID:testId"; + final Properties properties = new Properties(); + properties.setMessageId(messageId); + perform(properties, String.format("JMSMessageID='%s'", messageId), "messageId", messageId); + } + + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter." + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSCorrelationID representation for UUID values : " + + "'ID:AMQP_UUID:<string representation of uuid>'") + public void selectorWithJMSCorrelationIDAsUUID() throws Exception + { + final UUID correlationId = UUID.randomUUID(); + final Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + perform(properties, + String.format("JMSCorrelationID='ID:AMQP_UUID:%s'", correlationId), + "correlationId", + correlationId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSCorrelationID representation for unsigned long values : " + + "'ID:AMQP_ULONG:<string representation of ulong>'") + public void selectorWithJMSCorrelationIDAsUnsignedLong() throws Exception + { + final UnsignedLong correlationId = UnsignedLong.ONE; + final Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + perform(properties, + String.format("JMSCorrelationID='ID:AMQP_ULONG:%d'", correlationId.longValue()), + "correlationId", + correlationId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSCorrelationID representation for Binary values : " + + "'ID:AMQP_BINARY:<hex representation of bytes>'") + public void selectorWithJMSCorrelationIDAsBinary() throws Exception + { + byte[] data = {1, 2, 3}; + final Binary correlationId = new Binary(data); + final Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + perform(properties, + String.format("JMSCorrelationID='ID:AMQP_BINARY:%s'", + AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)), + "correlationId", + correlationId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSCorrelationID representation for String values without prefix 'ID:' : " + + "'<original-string>'") + public void selectorWithJMSCorrelationIDAsStringWithoutPrefix() throws Exception + { + final String correlationId = "testId"; + final Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + perform(properties, + String.format("JMSCorrelationID='%s'", correlationId), + "correlationId", + correlationId); + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + @SpecificationTest(section = "3.5.1", + description = "A source can restrict the messages transferred from a source by specifying a filter.\n" + + "" + + "AMQP JMS Mapping\n" + + "" + + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n" + + "JMSCorrelationID representation for String values with prefix 'ID:' : " + + "'ID:<original-string>'") + public void selectorWithJMSCorrelationIDAsStringWithPrefix() throws Exception + { + final String correlationId = "ID:testId"; + final Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + perform(properties, String.format("JMSCorrelationID='%s'", correlationId), "correlationId", correlationId); + } + + private void perform(final Properties properties, + final String selector, + final String propertyName, + final Object propertyValue) throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class); + Flow flow = interaction.getLatestResponse(Flow.class); + assumeThat("insufficient credit for the test", flow.getLinkCredit().intValue(), is(greaterThan(1))); + + for (int i = 0; i < 2; i++) + { + + QpidByteBuffer payload = + generateMessagePayloadWithMessageProperties(properties, + TEST_MESSAGE_CONTENT); + interaction.transferPayload(payload) + .transferSettled(true) + .transfer(); + } + interaction.detachClose(true).detach().close().sync(); + } + + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + + interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachRcvSettleMode(ReceiverSettleMode.FIRST) + .attachSourceFilter(Collections.singletonMap(Symbol.valueOf("selector-filter"), + new JMSSelectorFilter(selector))) + .attach().consumeResponse() + .flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingId(UnsignedInteger.ZERO) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandleFromLinkHandle() + .flow(); + + Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery(); + assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT))); + + Properties deliveryProperties = interaction.getLatestDeliveryProperties(); + assertThat(deliveryProperties, is(notNullValue())); + + Method getter = Properties.class.getMethod("get" + + Character.toUpperCase(propertyName.charAt(0)) + + propertyName.substring(1)); + assertThat(getter.invoke(deliveryProperties), is(equalTo(propertyValue))); + + interaction.dispositionSettled(true) + .dispositionRole(Role.RECEIVER) + .dispositionState(new Accepted()) + .disposition(); + interaction.close().sync(); + } + } + + private QpidByteBuffer generateMessagePayloadWithMessageProperties(final Properties properties, String content) + { + MessageEncoder messageEncoder = new MessageEncoder(); + messageEncoder.setProperties(properties); + messageEncoder.addData(content); + return messageEncoder.getPayload(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
