QPIDJMS-220, QPIDJMS-207: initial work on support for shared topic subscriptions
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/952de60a Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/952de60a Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/952de60a Branch: refs/heads/master Commit: 952de60aec6abb295d5285c3e4474a04014b1038 Parents: 06a7216 Author: Robert Gemmell <[email protected]> Authored: Fri Nov 11 17:48:41 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Fri Nov 11 17:48:41 2016 +0000 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 10 +- .../org/apache/qpid/jms/JmsMessageConsumer.java | 8 +- .../java/org/apache/qpid/jms/JmsSession.java | 22 +- .../jms/JmsSharedDurableMessageConsumer.java | 36 + .../qpid/jms/JmsSharedMessageConsumer.java | 36 + .../apache/qpid/jms/meta/JmsConnectionInfo.java | 9 +- .../apache/qpid/jms/meta/JmsConsumerInfo.java | 36 +- .../qpid/jms/provider/amqp/AmqpConnection.java | 20 +- .../provider/amqp/AmqpConnectionProperties.java | 23 + .../provider/amqp/AmqpConnectionSession.java | 33 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 9 + .../qpid/jms/provider/amqp/AmqpProvider.java | 56 +- .../qpid/jms/provider/amqp/AmqpSession.java | 19 - .../provider/amqp/AmqpSubscriptionTracker.java | 285 ++++ .../qpid/jms/provider/amqp/AmqpSupport.java | 6 + .../amqp/builders/AmqpConsumerBuilder.java | 83 +- .../amqp/builders/AmqpResourceBuilder.java | 15 + .../org/apache/qpid/jms/JmsSessionTest.java | 51 +- .../jms/integration/IntegrationTestFixture.java | 4 + .../SubscriptionsIntegrationTest.java | 1480 ++++++++++++++++++ .../qpid/jms/meta/JmsConnectionInfoTest.java | 11 +- .../qpid/jms/meta/JmsConsumerInfoTest.java | 37 +- .../amqp/AmqpSubscriptionTrackerTest.java | 285 ++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 85 +- 24 files changed, 2555 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 33cb169..82bdeff 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -101,7 +101,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection private final ThreadPoolExecutor executor; private volatile IOException firstFailureError; - private boolean clientIdSet; private ExceptionListener exceptionListener; private JmsMessageFactory messageFactory; private Provider provider; @@ -295,7 +294,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection public synchronized void setClientID(String clientID) throws JMSException { checkClosedOrFailed(); - if (clientIdSet) { + if (connectionInfo.isExplicitClientID()) { throw new IllegalStateException("The clientID has already been set"); } if (clientID == null || clientID.isEmpty()) { @@ -305,8 +304,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection throw new IllegalStateException("Cannot set the client id once connected."); } - this.connectionInfo.setClientId(clientID); - this.clientIdSet = true; + this.connectionInfo.setClientId(clientID, true); // We weren't connected if we got this far, we should now connect to ensure the // configured clientID is valid. @@ -478,7 +476,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) { - connectionInfo.setClientId(clientIdGenerator.generateId()); + connectionInfo.setClientId(clientIdGenerator.generateId(), false); } createResource(connectionInfo); @@ -560,7 +558,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } protected synchronized boolean isExplicitClientID() { - return clientIdSet; + return connectionInfo.isExplicitClientID(); } //----- Provider interface methods ---------------------------------------// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index b224574..3d2fba9 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -93,9 +93,11 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy(); consumerInfo = new JmsConsumerInfo(consumerId); - consumerInfo.setClientId(connection.getClientID()); + consumerInfo.setExplicitClientID(connection.isExplicitClientID()); consumerInfo.setSelector(selector); + consumerInfo.setDurable(isDurableSubscription()); consumerInfo.setSubscriptionName(name); + consumerInfo.setShared(isSharedSubscription()); consumerInfo.setDestination(destination); consumerInfo.setAcknowledgementMode(acknowledgementMode); consumerInfo.setNoLocal(noLocal); @@ -639,6 +641,10 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe return false; } + public boolean isSharedSubscription() { + return false; + } + public boolean isBrowser() { return false; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index e6c85cc..a3af20d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -510,8 +510,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public MessageConsumer createSharedConsumer(Topic topic, String name) throws JMSException { checkClosed(); - // TODO Auto-generated method stub - throw new JMSException("Not yet implemented"); + return createSharedConsumer(topic, name, null); } /** @@ -520,8 +519,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public MessageConsumer createSharedConsumer(Topic topic, String name, String selector) throws JMSException { checkClosed(); - // TODO Auto-generated method stub - throw new JMSException("Not yet implemented"); + checkDestination(topic); + selector = checkSelector(selector); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic); + JmsMessageConsumer result = new JmsSharedMessageConsumer(getNextConsumerId(), this, dest, name, selector); + result.init(); + return result; } /** @@ -530,8 +533,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException { checkClosed(); - // TODO Auto-generated method stub - throw new JMSException("Not yet implemented"); + return createSharedDurableConsumer(topic, name, null); } /** @@ -540,8 +542,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String selector) throws JMSException { checkClosed(); - // TODO Auto-generated method stub - throw new JMSException("Not yet implemented"); + checkDestination(topic); + selector = checkSelector(selector); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic); + JmsMessageConsumer result = new JmsSharedDurableMessageConsumer(getNextConsumerId(), this, dest, name, selector); + result.init(); + return result; } ////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java new file mode 100644 index 0000000..8229ef5 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.meta.JmsConsumerId; + +/** + * Implementation of a JmsMessageConsumer that is part of a Shared Durable Subscription + */ +public class JmsSharedDurableMessageConsumer extends JmsSharedMessageConsumer implements AutoCloseable { + + public JmsSharedDurableMessageConsumer(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, String selector) throws JMSException { + super(id, s, destination, name, selector); + } + + @Override + public boolean isDurableSubscription() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java new file mode 100644 index 0000000..7aa09db --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.meta.JmsConsumerId; + +/** + * Implementation of a JmsMessageConsumer that is part of a Shared Subscription + */ +public class JmsSharedMessageConsumer extends JmsMessageConsumer implements AutoCloseable { + + public JmsSharedMessageConsumer(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, String selector) throws JMSException { + super(id, s, destination, name, selector, false); + } + + @Override + public boolean isSharedSubscription() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java index bc723c0..9768132 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java @@ -47,6 +47,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne private URI configuredURI; private URI connectedURI; private String clientId; + private boolean explicitClientID; private String username; private String password; private boolean forceAsyncSend; @@ -89,6 +90,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne private void copy(JmsConnectionInfo copy) { copy.clientId = clientId; + copy.explicitClientID = explicitClientID; copy.username = username; copy.password = password; copy.forceAsyncSend = forceAsyncSend; @@ -148,8 +150,13 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne return clientId; } - public void setClientId(String clientId) { + public void setClientId(String clientId, boolean explicitClientID) { this.clientId = clientId; + this.explicitClientID = explicitClientID; + } + + public boolean isExplicitClientID() { + return explicitClientID; } public String getUsername() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java index b22b3b5..e8f793f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java @@ -29,8 +29,10 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume private int prefetchSize; private boolean browser; private String selector; - private String clientId; + private boolean explicitClientID; private String subscriptionName; + private boolean durable; + private boolean shared; private boolean noLocal; private int acknowledgementMode; private boolean localMessageExpiry; @@ -69,9 +71,11 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume info.prefetchSize = prefetchSize; info.browser = browser; info.selector = selector; - info.clientId = clientId; + info.explicitClientID = explicitClientID; + info.durable = durable; info.subscriptionName = subscriptionName; info.noLocal = noLocal; + info.shared = shared; info.acknowledgementMode = acknowledgementMode; info.lastDeliveredSequenceId = lastDeliveredSequenceId; info.redeliveryPolicy = getRedeliveryPolicy().copy(); @@ -79,10 +83,6 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume info.listener = listener; } - public boolean isDurable() { - return subscriptionName != null; - } - @Override public JmsConsumerId getId() { return consumerId; @@ -128,12 +128,20 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume this.selector = selector; } - public String getClientId() { - return clientId; + public boolean isExplicitClientID() { + return explicitClientID; + } + + public void setExplicitClientID(boolean explicitClientID) { + this.explicitClientID = explicitClientID; + } + + public boolean isDurable() { + return durable; } - public void setClientId(String clientId) { - this.clientId = clientId; + public void setDurable(boolean durable) { + this.durable = durable; } public String getSubscriptionName() { @@ -144,6 +152,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume this.subscriptionName = durableSubscriptionId; } + public boolean isShared() { + return shared; + } + + public void setShared(boolean shared) { + this.shared = shared; + } + public boolean isNoLocal() { return noLocal; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index cf4e441..e7c9b62 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -41,6 +41,8 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); + private AmqpSubscriptionTracker subTracker = new AmqpSubscriptionTracker(); + private final AmqpJmsMessageFactory amqpMessageFactory; private final URI remoteURI; @@ -80,15 +82,15 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn } public void unsubscribe(String subscriptionName, AsyncResult request) { - - for (AmqpSession session : sessions.values()) { - if (session.containsSubscription(subscriptionName)) { - request.onFailure(new JMSException("Cannot remove an active durable subscription")); - return; - } + // Check if there is an active (i.e open subscriber) shared or exclusive durable subscription using this name + if(subTracker.isActiveDurableSub(subscriptionName)) { + request.onFailure(new JMSException("Can't remove an active durable subscription: " + subscriptionName)); + return; } - connectionSession.unsubscribe(subscriptionName, request); + boolean hasClientID = getResourceInfo().isExplicitClientID(); + + connectionSession.unsubscribe(subscriptionName, hasClientID, request); } @Override @@ -221,6 +223,10 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn return properties; } + public AmqpSubscriptionTracker getSubTracker() { + return subTracker; + } + /** * Allows a connection resource to schedule a task for future execution. * http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java index c090853..815104a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java @@ -20,6 +20,7 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX; import java.util.Arrays; @@ -44,6 +45,7 @@ public class AmqpConnectionProperties { private boolean delayedDeliverySupported = false; private boolean anonymousRelaySupported = false; + private boolean sharedSubsSupported = false; private boolean connectionOpenFailed = false; /** @@ -84,6 +86,10 @@ public class AmqpConnectionProperties { if (list.contains(DELAYED_DELIVERY)) { delayedDeliverySupported = true; } + + if (list.contains(SHARED_SUBS)) { + sharedSubsSupported = true; + } } protected void processProperties(Map<Symbol, Object> properties) { @@ -110,6 +116,23 @@ public class AmqpConnectionProperties { } /** + * @return true if the connection supports shared subscriptions features. + */ + public boolean isSharedSubsSupported() { + return sharedSubsSupported; + } + + /** + * Sets if the connection supports shared subscriptions features. + * + * @param sharedSubsSupported + * true if the shared subscriptions features are supported. + */ + public void setSharedSubsSupported(boolean sharedSubsSupported) { + this.sharedSubsSupported = sharedSubsSupported; + } + + /** * @return true if the connection supports sending message with delivery delays. */ public boolean isDelayedDeliverySupported() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java index 0a7c689..7ca27aa 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java @@ -63,12 +63,17 @@ public class AmqpConnectionSession extends AmqpSession { * * @param subscriptionName * the subscription name that is to be removed. + * @param hasClientID + * whether the connection has a clientID set. * @param request * the request that awaits the completion of this action. */ - public void unsubscribe(String subscriptionName, AsyncResult request) { - DurableSubscriptionReattachBuilder builder = new DurableSubscriptionReattachBuilder(this, getResourceInfo(), subscriptionName); - DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(builder, request); + public void unsubscribe(String subscriptionName, boolean hasClientID, AsyncResult request) { + AmqpSubscriptionTracker subTracker = getConnection().getSubTracker(); + String linkName = subTracker.getFirstDurableSubscriptionLinkName(subscriptionName, hasClientID); + + DurableSubscriptionReattachBuilder builder = new DurableSubscriptionReattachBuilder(this, getResourceInfo(), linkName); + DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(subscriptionName, builder, request); pendingUnsubs.put(subscriptionName, subscribeRequest); LOG.debug("Attempting remove of subscription: {}", subscriptionName); @@ -81,24 +86,24 @@ public class AmqpConnectionSession extends AmqpSession { super(resource, receiver, parent); } - public String getSubscriptionName() { + public String getLinkName() { return getEndpoint().getName(); } } private final class DurableSubscriptionReattachBuilder extends AmqpResourceBuilder<DurableSubscriptionReattach, AmqpSession, JmsSessionInfo, Receiver> { - private final String subscriptionName; + private final String linkName; - public DurableSubscriptionReattachBuilder(AmqpSession parent, JmsSessionInfo resourceInfo, String subscriptionName) { + public DurableSubscriptionReattachBuilder(AmqpSession parent, JmsSessionInfo resourceInfo, String linkName) { super(parent, resourceInfo); - this.subscriptionName = subscriptionName; + this.linkName = linkName; } @Override protected Receiver createEndpoint(JmsSessionInfo resourceInfo) { - Receiver receiver = getParent().getEndpoint().receiver(subscriptionName); + Receiver receiver = getParent().getEndpoint().receiver(linkName); receiver.setTarget(new Target()); receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); @@ -121,18 +126,20 @@ public class AmqpConnectionSession extends AmqpSession { private final class DurableSubscriptionReattachRequest extends WrappedAsyncResult { + private final String subscriptionName; private final DurableSubscriptionReattachBuilder subscriberBuilder; - public DurableSubscriptionReattachRequest(DurableSubscriptionReattachBuilder subscriberBuilder, AsyncResult originalRequest) { + public DurableSubscriptionReattachRequest(String subscriptionName, DurableSubscriptionReattachBuilder subscriberBuilder, AsyncResult originalRequest) { super(originalRequest); + this.subscriptionName = subscriptionName; this.subscriberBuilder = subscriberBuilder; } @Override public void onSuccess() { DurableSubscriptionReattach subscriber = subscriberBuilder.getResource(); - LOG.trace("Reattached to subscription: {}", subscriber.getSubscriptionName()); - pendingUnsubs.remove(subscriber.getSubscriptionName()); + LOG.trace("Reattached to subscription '{}' using link name '{}'", subscriptionName, subscriber.getLinkName()); + pendingUnsubs.remove(subscriptionName); if (subscriber.getEndpoint().getRemoteSource() != null) { subscriber.close(getWrappedRequest()); } else { @@ -144,8 +151,8 @@ public class AmqpConnectionSession extends AmqpSession { @Override public void onFailure(Throwable result) { DurableSubscriptionReattach subscriber = subscriberBuilder.getResource(); - LOG.trace("Failed to reattach to subscription: {}", subscriber.getSubscriptionName()); - pendingUnsubs.remove(subscriber.getSubscriptionName()); + LOG.trace("Failed to reattach to subscription '{}' using link name '{}'", subscriptionName, subscriber.getLinkName()); + pendingUnsubs.remove(subscriptionName); subscriber.resourceClosed(); super.onFailure(result); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index b60bb83..30e1cef 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -565,6 +565,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver public void postRollback() { } + @Override + public void handleResourceClosure(AmqpProvider provider, Exception error) { + AmqpConnection connection = session.getConnection(); + AmqpSubscriptionTracker subTracker = connection.getSubTracker(); + JmsConsumerInfo consumerInfo = getResourceInfo(); + + subTracker.consumerRemoved(consumerInfo); + } + //----- Inner classes used in message pull operations --------------------// protected static final class ScheduledRequest implements AsyncResult { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index ca8a0e1..76dbc90 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -415,8 +415,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP @Override public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception { - AmqpSession session = connection.getSession(sessionInfo.getId()); - session.close(request); + final AmqpSession session = connection.getSession(sessionInfo.getId()); + session.close(new AsyncResult() { + // TODO: bit of a hack, but works. Similarly below for locally initiated consumer close. + @Override + public void onSuccess() { + onComplete(); + request.onSuccess(); + } + + @Override + public void onFailure(Throwable result) { + onComplete(); + request.onFailure(result); + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } + + void onComplete() { + // Mark the sessions resources closed, which in turn calls + // the subscription cleanup. + session.handleResourceClosure(AmqpProvider.this, null); + } + }); } @Override @@ -427,10 +451,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } @Override - public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception { + public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception { AmqpSession session = connection.getSession(consumerInfo.getParentId()); - AmqpConsumer consumer = session.getConsumer(consumerInfo); - consumer.close(request); + final AmqpConsumer consumer = session.getConsumer(consumerInfo); + consumer.close(new AsyncResult() { + // TODO: bit of a hack, but works. Similarly above for locally initiated session close. + @Override + public void onSuccess() { + onComplete(); + request.onSuccess(); + } + + @Override + public void onFailure(Throwable result) { + onComplete(); + request.onFailure(result); + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } + + void onComplete() { + connection.getSubTracker().consumerRemoved(consumerInfo); + } + }); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index efe52d6..5f441e9 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -241,25 +241,6 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i } /** - * Query the Session to see if there are any registered consumer instances that have - * a durable subscription with the given subscription name. - * - * @param subscriptionName - * the name of the subscription being searched for. - * - * @return true if there is a consumer that has the given subscription. - */ - public boolean containsSubscription(String subscriptionName) { - for (AmqpConsumer consumer : consumers.values()) { - if (subscriptionName.equals(consumer.getResourceInfo().getSubscriptionName())) { - return true; - } - } - - return false; - } - - /** * Call to send an error that occurs outside of the normal asynchronous processing * of a session resource such as a remote close etc. * http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java new file mode 100644 index 0000000..da6af22 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import javax.jms.JMSRuntimeException; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.meta.JmsConsumerInfo; + +/** + * Class used to track named subscriptions on a connection to allow verifying + * current usage and assigning appropriate link names. + */ +public class AmqpSubscriptionTracker { + + Set<String> exclusiveDurableSubs = new HashSet<>(); + Map<String, SubDetails> sharedDurableSubs = new HashMap<>(); + Map<String, SubDetails> sharedVolatileSubs = new HashMap<>(); + + public String reserveNextSubscriptionLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) { + validateSubscriptionName(subscriptionName); + + if(consumerInfo == null) { + throw new IllegalArgumentException("Consumer info must not be null."); + } + + if (consumerInfo.isShared()) { + if (consumerInfo.isDurable()) { + return getSharedDurableSubLinkName(subscriptionName, consumerInfo); + } else { + return getSharedVolatileSubLinkName(subscriptionName, consumerInfo); + } + } else if (consumerInfo.isDurable()) { + registerExclusiveDurableSub(subscriptionName); + return subscriptionName; + } else { + throw new IllegalStateException("Non-shared non-durable sub link naming is not handled by the tracker."); + } + } + + private void validateSubscriptionName(String subscriptionName) { + if(subscriptionName == null) { + throw new IllegalArgumentException("Subscription name must not be null."); + } + + if(subscriptionName.isEmpty()) { + throw new IllegalArgumentException("Subscription name must not be empty."); + } + + if(subscriptionName.contains(SUB_NAME_DELIMITER)) { + throw new IllegalArgumentException("Subscription name must not contain '" + SUB_NAME_DELIMITER +"' character."); + } + } + + private String getSharedDurableSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) { + JmsDestination topic = consumerInfo.getDestination(); + String selector = consumerInfo.getSelector(); + + SubDetails subDetails = null; + if(sharedDurableSubs.containsKey(subscriptionName)) { + subDetails = sharedDurableSubs.get(subscriptionName); + + if(subDetails.matches(topic, selector)){ + subDetails.addSubscriber(consumerInfo); + } else { + throw new JMSRuntimeException("Subscription details dont match existing subscriber."); + } + } else { + subDetails = new SubDetails(topic, selector, consumerInfo); + } + + sharedDurableSubs.put(subscriptionName, subDetails); + + int count = subDetails.totalSubscriberCount(); + + return getDurableSubscriptionLinkName(subscriptionName, consumerInfo.isExplicitClientID(), count); + } + + private String getDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID, int count) { + String linkName = getFirstDurableSubscriptionLinkName(subscriptionName, hasClientID); + if(count > 1) { + if(hasClientID) { + linkName += SUB_NAME_DELIMITER + count; + } else { + linkName += count; + } + } + + return linkName; + } + + public String getFirstDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID) { + validateSubscriptionName(subscriptionName); + + String receiverLinkName = subscriptionName; + if(!hasClientID) { + receiverLinkName += SUB_NAME_DELIMITER + "global"; + } + + return receiverLinkName; + } + + private String getSharedVolatileSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) { + JmsDestination topic = consumerInfo.getDestination(); + String selector = consumerInfo.getSelector(); + + SubDetails subDetails = null; + if(sharedVolatileSubs.containsKey(subscriptionName)) { + subDetails = sharedVolatileSubs.get(subscriptionName); + + if(subDetails.matches(topic, selector)){ + subDetails.addSubscriber(consumerInfo); + } else { + throw new JMSRuntimeException("Subscription details dont match existing subscriber"); + } + } else { + subDetails = new SubDetails(topic, selector, consumerInfo); + } + + sharedVolatileSubs.put(subscriptionName, subDetails); + + String receiverLinkName = subscriptionName + SUB_NAME_DELIMITER; + int count = subDetails.totalSubscriberCount(); + + if (consumerInfo.isExplicitClientID()) { + receiverLinkName += "volatile" + count; + } else { + receiverLinkName += "global-volatile" + count; + } + + return receiverLinkName; + } + + private void registerExclusiveDurableSub(String subscriptionName) { + exclusiveDurableSubs.add(subscriptionName); + } + + /** + * Checks if there is an exclusive durable subscription already + * recorded as active with the given subscription name. + * + * @param subscriptionName name of subscription to check + * @return true if there is an exclusive durable sub with this name already active + */ + public boolean isActiveExclusiveDurableSub(String subscriptionName) { + return exclusiveDurableSubs.contains(subscriptionName); + } + + /** + * Checks if there is a shared durable subscription already + * recorded as active with the given subscription name. + * + * @param subscriptionName name of subscription to check + * @return true if there is a shared durable sub with this name already active + */ + public boolean isActiveSharedDurableSub(String subscriptionName) { + return sharedDurableSubs.containsKey(subscriptionName); + } + + /** + * Checks if there is either a shared or exclusive durable subscription + * already recorded as active with the given subscription name. + * + * @param subscriptionName name of subscription to check + * @return true if there is a durable sub with this name already active + */ + public boolean isActiveDurableSub(String subscriptionName) { + return isActiveExclusiveDurableSub(subscriptionName) || isActiveSharedDurableSub(subscriptionName); + } + + /** + * Checks if there is an shared volatile subscription already + * recorded as active with the given subscription name. + * + * @param subscriptionName name of subscription to check + * @return true if there is a shared volatile sub with this name already active + */ + public boolean isActiveSharedVolatileSub(String subscriptionName) { + return sharedVolatileSubs.containsKey(subscriptionName); + } + + public void consumerRemoved(JmsConsumerInfo consumerInfo) { + String subscriptionName = consumerInfo.getSubscriptionName(); + + if (subscriptionName != null && !subscriptionName.isEmpty()) { + if (consumerInfo.isShared()) { + if (consumerInfo.isDurable()) { + if(sharedDurableSubs.containsKey(subscriptionName)) { + SubDetails subDetails = sharedDurableSubs.get(subscriptionName); + subDetails.removeSubscriber(consumerInfo); + + int count = subDetails.activeSubscribers(); + if(count < 1) { + sharedDurableSubs.remove(subscriptionName); + } + } + } else { + if(sharedVolatileSubs.containsKey(subscriptionName)) { + SubDetails subDetails = sharedVolatileSubs.get(subscriptionName); + subDetails.removeSubscriber(consumerInfo); + + int count = subDetails.activeSubscribers(); + if(count < 1) { + sharedVolatileSubs.remove(subscriptionName); + } + } + } + } else if (consumerInfo.isDurable()) { + exclusiveDurableSubs.remove(subscriptionName); + } + } + } + + private static class SubDetails { + private JmsDestination topic = null; + private String selector = null; + private Set<JmsConsumerInfo> subscribers = new HashSet<>(); + private int totalSubscriberCount; + + public SubDetails(JmsDestination topic, String selector, JmsConsumerInfo info) { + if(topic == null) { + throw new IllegalArgumentException("Topic destination must not be null"); + } + + this.topic = topic; + this.selector = selector; + addSubscriber(info); + } + + public void addSubscriber(JmsConsumerInfo info) { + if(info == null) { + throw new IllegalArgumentException("Consumer info must not be null"); + } + + totalSubscriberCount++; + subscribers.add(info); + } + + public void removeSubscriber(JmsConsumerInfo info) { + subscribers.remove(info); + } + + public int activeSubscribers() { + return subscribers.size(); + } + + public int totalSubscriberCount() { + return totalSubscriberCount; + } + + public boolean matches(JmsDestination newTopic, String newSelector) { + if(!topic.equals(newTopic)) { + return false; + } + + if (selector == null) { + return newSelector == null; + } else { + return selector.equals(newSelector); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java index 9738d68..adac112 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java @@ -44,6 +44,7 @@ public class AmqpSupport { public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container"); public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY"); + public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS"); // Symbols used to announce connection error information public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); @@ -69,6 +70,8 @@ public class AmqpSupport { public static final Symbol COPY = Symbol.getSymbol("copy"); public static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local"); public static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector"); + public static final Symbol SHARED = Symbol.valueOf("shared"); + public static final Symbol GLOBAL = Symbol.valueOf("global"); // Delivery states public static final Rejected REJECTED = new Rejected(); @@ -80,6 +83,9 @@ public class AmqpSupport { public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:"; public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:"; + // Subscription Name Delimiter + public static final String SUB_NAME_DELIMITER = "|"; + //----- Static initializer -----------------------------------------------// static { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java index 3bf8836..5cec9af 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java @@ -22,14 +22,19 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_SELECTOR_SYMBOL; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import javax.jms.InvalidDestinationException; +import javax.jms.JMSRuntimeException; import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.meta.JmsConsumerInfo; +import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.jms.provider.amqp.AmqpSession; +import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker; +import org.apache.qpid.jms.provider.amqp.AmqpSupport; import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsNoLocalType; import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsSelectorType; import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; @@ -59,22 +64,51 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS @Override protected Receiver createEndpoint(JmsConsumerInfo resourceInfo) { JmsDestination destination = resourceInfo.getDestination(); - String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection()); + String address = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection()); Source source = new Source(); - source.setAddress(subscription); + source.setAddress(address); Target target = new Target(); configureSource(source); - String receiverName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + subscription; - if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) { - // In the case of Durable Topic Subscriptions the client must use the same - // receiver name which is derived from the subscription name property. - receiverName = resourceInfo.getSubscriptionName(); + String receiverLinkName = null; + String subscriptionName = resourceInfo.getSubscriptionName(); + if (subscriptionName != null && !subscriptionName.isEmpty()) { + AmqpConnection connection = getParent().getConnection(); + + if (resourceInfo.isShared() && !connection.getProperties().isSharedSubsSupported()) { + // Don't allow shared sub if peer hasn't said it can handle them (or we haven't overridden it). + throw new JMSRuntimeException("Remote peer does not support shared subscriptions"); + } + + AmqpSubscriptionTracker subTracker = connection.getSubTracker(); + + // Validate subscriber type allowed given existing active subscriber types. + if (resourceInfo.isShared() && resourceInfo.isDurable()) { + if(subTracker.isActiveExclusiveDurableSub(subscriptionName)) { + // Don't allow shared sub if there is already an active exclusive durable sub + throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'"); + } + } else if (!resourceInfo.isShared() && resourceInfo.isDurable()) { + if (subTracker.isActiveExclusiveDurableSub(subscriptionName)) { + // Exclusive durable sub is already active + throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'"); + } else if (subTracker.isActiveSharedDurableSub(subscriptionName)) { + // Don't allow exclusive durable sub if there is already an active shared durable sub + throw new JMSRuntimeException("A shared durable subscription is already active with name '" + subscriptionName + "'"); + } + } + + // Get the link name for the subscription. Throws if certain further validations fail. + receiverLinkName = subTracker.reserveNextSubscriptionLinkName(subscriptionName, resourceInfo); } - Receiver receiver = getParent().getEndpoint().receiver(receiverName); + if(receiverLinkName == null) { + receiverLinkName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + address; + } + + Receiver receiver = getParent().getEndpoint().receiver(receiverLinkName); receiver.setSource(source); receiver.setTarget(target); if (resourceInfo.isBrowser() || resourceInfo.isPresettle()) { @@ -88,6 +122,15 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS } @Override + protected void afterClosed(AmqpConsumer resource, JmsConsumerInfo info) { + // If the resource being built is closed during the creation process + // then this is a failure, we need to ensure we don't track it. + AmqpConnection connection = getParent().getConnection(); + AmqpSubscriptionTracker subTracker = connection.getSubTracker(); + subTracker.consumerRemoved(info); + } + + @Override protected AmqpConsumer createResource(AmqpSession parent, JmsConsumerInfo resourceInfo, Receiver endpoint) { return new AmqpConsumer(parent, resourceInfo, endpoint); } @@ -118,7 +161,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL }; - if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) { + if (resourceInfo.isDurable()) { source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setDistributionMode(COPY); @@ -131,14 +174,32 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS source.setDistributionMode(COPY); } + // Capabilities + LinkedList<Symbol> capabilities = new LinkedList<>(); + Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(resourceInfo.getDestination()); - if(typeCapability != null) { - source.setCapabilities(typeCapability); + if(typeCapability != null){ + capabilities.add(typeCapability); + } + + if(resourceInfo.isShared()) { + capabilities.add(AmqpSupport.SHARED); + } + + if(!resourceInfo.isExplicitClientID()) { + capabilities.add(AmqpSupport.GLOBAL); + } + + if(!capabilities.isEmpty()) { + Symbol[] capArray = capabilities.toArray(new Symbol[capabilities.size()]); + source.setCapabilities(capArray); } + //Outcomes source.setOutcomes(outcomes); source.setDefaultOutcome(MODIFIED_FAILED); + // Filters if (resourceInfo.isNoLocal()) { filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index 5912f7f..a1a9aac 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -164,6 +164,9 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex // If the resource being built is closed during the creation process // then this is always an error. + // Perform any post processing relating to closure during creation attempt + afterClosed(getResource(), getResourceInfo()); + Throwable openError; if (hasRemoteError()) { openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()); @@ -245,6 +248,18 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex // Nothing to do here. } + /** + * Called if endpoint opening process fails in order to give the subclasses a + * place to perform any follow-on processing or teardown steps before the operation + * is deemed to have been completed and failure is signalled. + * + * @param resource the resource + * @param resourceInfo the resourceInfo + */ + protected void afterClosed(TARGET resource, INFO resourceInfo) { + // Nothing to do here. + } + protected boolean hasRemoteError() { return getEndpoint().getRemoteCondition().getCondition() != null; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java index be3e668..27fc511 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java @@ -35,7 +35,10 @@ import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.qpid.jms.JmsConnectionTestSupport; +import org.apache.qpid.jms.JmsMessageConsumer; import org.apache.qpid.jms.JmsSession; import org.apache.qpid.jms.JmsTemporaryQueue; import org.junit.Before; @@ -332,41 +335,53 @@ public class JmsSessionTest extends JmsConnectionTestSupport { } catch (RuntimeException re) {} } - //----- Not yet implemented, should all be cleared on implementation -----// - @Test(timeout = 10000) public void testCreateSharedConsumer() throws Exception { JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - session.createSharedConsumer(session.createTopic("test"), "subscription"); - fail("Should fail until implemented."); - } catch (JMSException ex) {} + + Topic topic = session.createTopic("test"); + JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedConsumer(topic, "subscription"); + + assertNotNull(consumer); + assertNull("unexpected selector", consumer.getMessageSelector()); + assertEquals("unexpected topic", topic, consumer.getDestination()); } @Test(timeout = 10000) public void testCreateSharedConsumerWithSelector() throws Exception { JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - session.createSharedConsumer(session.createTopic("test"), "subscription", "a = b"); - fail("Should fail until implemented."); - } catch (JMSException ex) {} + + String selector = "a = b"; + Topic topic = session.createTopic("test"); + JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedConsumer(topic, "subscription", selector); + + assertNotNull(consumer); + assertEquals("unexpected selector", selector, consumer.getMessageSelector()); + assertEquals("unexpected topic", topic, consumer.getDestination()); } @Test(timeout = 10000) public void testCreateSharedDurableConsumer() throws Exception { JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - session.createSharedDurableConsumer(session.createTopic("test"), "subscription"); - fail("Should fail until implemented."); - } catch (JMSException ex) {} + + Topic topic = session.createTopic("test"); + JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedDurableConsumer(topic, "subscription"); + + assertNotNull(consumer); + assertNull("unexpected selector", consumer.getMessageSelector()); + assertEquals("unexpected topic", topic, consumer.getDestination()); } @Test(timeout = 10000) public void testCreateSharedDurableConsumerWithSelector() throws Exception { JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - session.createSharedDurableConsumer(session.createTopic("test"), "subscription", "a = b"); - fail("Should fail until implemented."); - } catch (JMSException ex) {} + + String selector = "a = b"; + Topic topic = session.createTopic("test"); + JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedDurableConsumer(topic, "subscription", selector); + + assertNotNull(consumer); + assertEquals("unexpected selector", selector, consumer.getMessageSelector()); + assertEquals("unexpected topic", topic, consumer.getDestination()); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java index 7c33fc1..2ddf739 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java @@ -54,6 +54,10 @@ public class IntegrationTestFixture { return establishConnecton(testPeer, false, optionsString, serverCapabilities, serverProperties, true); } + Connection establishConnectonWithoutClientID(TestAmqpPeer testPeer, Symbol[] serverCapabilities) throws JMSException { + return establishConnecton(testPeer, false, null, serverCapabilities, null, false); + } + Connection establishConnecton(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId) throws JMSException { Symbol[] desiredCapabilities = new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
