Repository: activemq Updated Branches: refs/heads/master 1a0f73ed1 -> ca456c460
https://issues.apache.org/jira/browse/AMQ-5559 Fix and tests for filter handling on attach. We only support JMS selector and NoLocal type filters for receivers so only report those back, all others are dropped to indicate we will not honor them. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ca456c46 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ca456c46 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ca456c46 Branch: refs/heads/master Commit: ca456c4601c5e659f9864041af87f489a0e63e4b Parents: 1a0f73e Author: Timothy Bish <[email protected]> Authored: Tue Mar 17 18:44:24 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Mar 17 18:44:24 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 26 +++++--- .../activemq/transport/amqp/AmqpSupport.java | 11 ++-- .../transport/amqp/client/AmqpClient.java | 2 +- .../amqp/client/AmqpJmsSelectorFilter.java | 48 +++++++++++++++ .../amqp/client/AmqpJmsSelectorType.java | 47 --------------- .../amqp/client/AmqpNoLocalFilter.java | 45 ++++++++++++++ .../transport/amqp/client/AmqpNoLocalType.java | 44 -------------- .../transport/amqp/client/AmqpReceiver.java | 36 +++++++++-- .../transport/amqp/client/AmqpSession.java | 35 +++++++++++ .../amqp/client/AmqpUnknownFilterType.java | 7 ++- .../amqp/client/util/UnmodifiableLink.java | 4 +- .../amqp/interop/AmqpConnectionsTest.java | 2 +- .../amqp/interop/AmqpReceiverTest.java | 63 ++++++++++++++++++-- 13 files changed, 249 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 3661f3d..5a73a25 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -1417,15 +1417,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource(); try { + final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>(); final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++); final ConsumerContext consumerContext = new ConsumerContext(id, sender); sender.setContext(consumerContext); + boolean noLocal = false; String selector = null; + if (source != null) { - DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); + Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); if (filter != null) { - selector = filter.getDescribed().toString(); + selector = filter.getValue().getDescribed().toString(); // Validate the Selector. try { SelectorParser.parse(selector); @@ -1436,6 +1439,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerContext.closed = true; return; } + + supportedFilters.put(filter.getKey(), filter.getValue()); + } + + filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); + if (filter != null) { + noLocal = true; + supportedFilters.put(filter.getKey(), filter.getValue()); } } @@ -1449,7 +1460,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source.setAddress(destination.getQualifiedName()); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); - sender.setSource(source); } else { consumerContext.closed = true; sender.setSource(null); @@ -1465,7 +1475,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source = new org.apache.qpid.proton.amqp.messaging.Source(); source.setAddress(destination.getQualifiedName()); source.setDynamic(true); - sender.setSource(source); consumerContext.addCloseAction(new Runnable() { @Override @@ -1477,6 +1486,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { destination = createDestination(source); } + source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); + sender.setSource(source); + int senderCredit = sender.getRemoteCredit(); subscriptionsByConsumerId.put(id, consumerContext); @@ -1486,6 +1498,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); consumerInfo.setDispatchAsync(true); + consumerInfo.setNoLocal(noLocal); if (source.getDistributionMode() == COPY && destination.isQueue()) { consumerInfo.setBrowser(true); @@ -1495,11 +1508,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSubscriptionName(sender.getName()); } - DescribedType filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); - if (filter != null) { - consumerInfo.setNoLocal(true); - } - consumerContext.info = consumerInfo; consumerContext.setDestination(destination); consumerContext.credit = senderCredit; http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index c0cfb94..7af4c2c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.Map; import org.apache.qpid.proton.amqp.Binary; @@ -86,7 +87,7 @@ public class AmqpSupport { * * @return the filter if found in the mapping or null if not found. */ - public static DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) { + public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) { if (filterIds == null || filterIds.length == 0) { throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds); @@ -96,14 +97,14 @@ public class AmqpSupport { return null; } - for (Object value : filters.values()) { - if (value instanceof DescribedType) { - DescribedType describedType = ((DescribedType) value); + for (Map.Entry<Symbol, Object> filter : filters.entrySet()) { + if (filter.getValue() instanceof DescribedType) { + DescribedType describedType = ((DescribedType) filter.getValue()); Object descriptor = describedType.getDescriptor(); for (Object filterId : filterIds) { if (descriptor.equals(filterId)) { - return describedType; + return new AbstractMap.SimpleImmutableEntry<Symbol, DescribedType>(filter.getKey(), describedType); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index e7d3eaf..2762732 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -187,7 +187,7 @@ public class AmqpClient { * @param stateInspector * the new state inspector to use. */ - public void setStateInspector(AmqpValidator stateInspector) { + public void setValidator(AmqpValidator stateInspector) { if (stateInspector == null) { stateInspector = new AmqpValidator(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java new file mode 100644 index 0000000..9fad2ef --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_CODE; + +import org.apache.qpid.proton.amqp.DescribedType; + +/** + * A Described Type wrapper for JMS selector values. + */ +public class AmqpJmsSelectorFilter implements DescribedType { + + private final String selector; + + public AmqpJmsSelectorFilter(String selector) { + this.selector = selector; + } + + @Override + public Object getDescriptor() { + return JMS_SELECTOR_CODE; + } + + @Override + public Object getDescribed() { + return this.selector; + } + + @Override + public String toString() { + return "AmqpJmsSelectorType{" + selector + "}"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java deleted file mode 100644 index d93e052..0000000 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client; - -import org.apache.qpid.proton.amqp.DescribedType; -import org.apache.qpid.proton.amqp.UnsignedLong; - -/** - * A Described Type wrapper for JMS selector values. - */ -public class AmqpJmsSelectorType implements DescribedType { - - private final String selector; - - public AmqpJmsSelectorType(String selector) { - this.selector = selector; - } - - @Override - public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000004L); - } - - @Override - public Object getDescribed() { - return this.selector; - } - - @Override - public String toString() { - return "AmqpJmsSelectorType{" + selector + "}"; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java new file mode 100644 index 0000000..0bdd71e --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE; + +import org.apache.qpid.proton.amqp.DescribedType; + +/** + * A Described Type wrapper for JMS no local option for MessageConsumer. + */ +public class AmqpNoLocalFilter implements DescribedType { + + public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter(); + + private final String noLocal; + + public AmqpNoLocalFilter() { + this.noLocal = "NoLocalFilter{}"; + } + + @Override + public Object getDescriptor() { + return NO_LOCAL_CODE; + } + + @Override + public Object getDescribed() { + return this.noLocal; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java deleted file mode 100644 index 2d61b83..0000000 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client; - -import org.apache.qpid.proton.amqp.DescribedType; -import org.apache.qpid.proton.amqp.UnsignedLong; - -/** - * A Described Type wrapper for JMS no local option for MessageConsumer. - */ -public class AmqpNoLocalType implements DescribedType { - - public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType(); - - private final String noLocal; - - public AmqpNoLocalType() { - this.noLocal = "NoLocalFilter{}"; - } - - @Override - public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000003L); - } - - @Override - public Object getDescribed() { - return this.noLocal; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index ff530b9..1290d27 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -76,6 +76,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { private String selector; private boolean presettle; private boolean noLocal; + private Source userSpecifiedSource; /** * Create a new receiver instance. @@ -94,6 +95,28 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { } /** + * Create a new receiver instance. + * + * @param session + * The parent session that created the receiver. + * @param source + * The Source instance to use instead of creating and configuring one. + * @param receiverId + * The unique ID assigned to this receiver. + */ + public AmqpReceiver(AmqpSession session, Source source, String receiverId) { + + if (source == null) { + throw new IllegalArgumentException("User specified Source cannot be null"); + } + + this.session = session; + this.userSpecifiedSource = source; + this.address = source.getAddress(); + this.receiverId = receiverId; + } + + /** * Close the receiver, a closed receiver will throw exceptions if any further send * calls are made. * @@ -423,11 +446,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { @Override protected void doOpen() { - Source source = new Source(); - source.setAddress(address); + Source source = userSpecifiedSource; Target target = new Target(); - configureSource(source); + if (userSpecifiedSource == null) { + source = new Source(); + source.setAddress(address); + configureSource(source); + } String receiverName = receiverId + ":" + address; @@ -523,11 +549,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { source.setDefaultOutcome(modified); if (isNoLocal()) { - filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL); + filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); } if (getSelector() != null && !getSelector().trim().equals("")) { - filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(getSelector())); + filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector())); } if (!filters.isEmpty()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index c747dc6..8b039b6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -152,6 +153,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> { } /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param source + * the caller created and configured Source used to create the receiver link. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(Source source) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return receiver; + } + + /** * Create a receiver instance using the given address that creates a durable subscription. * * @param address http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java index c86a2c9..9f3c840 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.client; import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedLong; /** @@ -26,6 +27,10 @@ public class AmqpUnknownFilterType implements DescribedType { public static final AmqpUnknownFilterType UNKOWN_FILTER = new AmqpUnknownFilterType(); + public static final UnsignedLong UNKNOWN_FILTER_CODE = UnsignedLong.valueOf(0x0000468C00000099L); + public static final Symbol UNKNOWN_FILTER_NAME = Symbol.valueOf("apache.org:unkown-filter:string"); + public static final Object[] UNKNOWN_FILTER_IDS = new Object[] { UNKNOWN_FILTER_CODE, UNKNOWN_FILTER_NAME }; + private final String payload; public AmqpUnknownFilterType() { @@ -34,7 +39,7 @@ public class AmqpUnknownFilterType implements DescribedType { @Override public Object getDescriptor() { - return UnsignedLong.valueOf(0x0000468C00000099L); + return UNKNOWN_FILTER_CODE; } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java index c7a99d3..fd44dcd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java @@ -147,13 +147,13 @@ public class UnmodifiableLink implements Link { @Override public Source getRemoteSource() { // TODO Figure out a simple way to wrap the odd Source types in Proton-J - return link.getSource(); + return link.getRemoteSource(); } @Override public Target getRemoteTarget() { // TODO Figure out a simple way to wrap the odd Target types in Proton-J - return link.getTarget(); + return link.getRemoteTarget(); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java index 2f9935f..dfe3a4b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -62,7 +62,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); assertNotNull(client); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @Override public void inspectOpenedResource(Connection connection) { http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index cdecab0..13b5904 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -33,10 +34,14 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Receiver; import org.junit.Ignore; import org.junit.Test; @@ -74,18 +79,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { public void testCreateQueueReceiverWithJMSSelector() throws Exception { AmqpClient client = createAmqpClient(); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @SuppressWarnings("unchecked") @Override public void inspectOpenedResource(Receiver receiver) { LOG.info("Receiver opened: {}", receiver); - if (receiver.getSource() == null) { + if (receiver.getRemoteSource() == null) { markAsInvalid("Link opened with null source."); } - Source source = (Source) receiver.getSource(); + Source source = (Source) receiver.getRemoteSource(); Map<Symbol, Object> filters = source.getFilter(); if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) { @@ -111,18 +116,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { public void testCreateQueueReceiverWithNoLocalSet() throws Exception { AmqpClient client = createAmqpClient(); - client.setStateInspector(new AmqpValidator() { + client.setValidator(new AmqpValidator() { @SuppressWarnings("unchecked") @Override public void inspectOpenedResource(Receiver receiver) { LOG.info("Receiver opened: {}", receiver); - if (receiver.getSource() == null) { + if (receiver.getRemoteSource() == null) { markAsInvalid("Link opened with null source."); } - Source source = (Source) receiver.getSource(); + Source source = (Source) receiver.getRemoteSource(); Map<Symbol, Object> filters = source.getFilter(); if (findFilter(filters, NO_LOCAL_FILTER_IDS) == null) { @@ -363,4 +368,50 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + LOG.info("Receiver opened: {}", receiver); + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map<Symbol, Object> filters = source.getFilter(); + + if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) { + markAsInvalid("Broker should not return unsupported filter on attach."); + } + } + }); + + Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>(); + filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER); + + Source source = new Source(); + source.setAddress("queue://" + getTestName()); + source.setFilter(filters); + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + session.createReceiver(source); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + connection.getStateInspector().assertValid(); + connection.close(); + } }
