Repository: qpid-jms Updated Branches: refs/heads/master 75da189a3 -> f1584d25c
A functional durable subscription removal mechanism. Show where we need some cleanup work and also looks to show a problem with handling on the broker side when the durable sub is not removable due to in use. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f1584d25 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f1584d25 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f1584d25 Branch: refs/heads/master Commit: f1584d25c4e5b795435fe00abaffc3e593ab636a Parents: 75da189 Author: Timothy Bish <[email protected]> Authored: Mon Oct 20 18:10:21 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Oct 20 18:10:31 2014 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpAbstractResource.java | 41 +++-- .../provider/amqp/AmqpConnectionSession.java | 169 ++++++++++++++++++- .../qpid/jms/provider/amqp/AmqpResource.java | 12 ++ .../qpid/jms/provider/amqp/AmqpSession.java | 11 ++ .../jms/consumer/JmsDurableSubscriberTest.java | 108 ++++++++++++ 5 files changed, 330 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 83249c2..28e16b6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -122,13 +122,13 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp @Override public void closed() { + this.endpoint.close(); + this.endpoint.free(); + if (this.closeRequest != null) { this.closeRequest.onSuccess(); this.closeRequest = null; } - - this.endpoint.close(); - this.endpoint.free(); } @Override @@ -149,6 +149,21 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } } + @Override + public void remotelyClosed() { + if (isAwaitingOpen()) { + Exception error = getRemoteError(); + if (error == null) { + error = new IOException("Remote has closed without error information"); + } + + openRequest.onFailure(error); + openRequest = null; + } + + // TODO - We need a way to signal that the remote closed unexpectedly. + } + public E getEndpoint() { return this.endpoint; } @@ -172,14 +187,21 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } @Override + public boolean hasRemoteError() { + return endpoint.getRemoteCondition().getCondition() != null; + } + + @Override public Exception getRemoteError() { String message = getRemoteErrorMessage(); Exception remoteError = null; Symbol error = endpoint.getRemoteCondition().getCondition(); - if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { - remoteError = new JMSSecurityException(message); - } else { - remoteError = new JMSException(message); + if (error != null) { + if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { + remoteError = new JMSSecurityException(message); + } else { + remoteError = new JMSException(message); + } } return remoteError; @@ -213,14 +235,13 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp if (isAwaitingClose()) { LOG.debug("{} is now closed: ", this); closed(); - } else if (isAwaitingOpen()) { + } else if (isAwaitingOpen() && hasRemoteError()) { // Error on Open, create exception and signal failure. LOG.warn("Open of {} failed: ", this); Exception remoteError = this.getRemoteError(); failed(remoteError); } else { - // TODO - Handle remote asynchronous close. - LOG.warn("{} was closed remotely.", this); + remotelyClosed(); } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java index 341893e..f640e64 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java @@ -16,8 +16,22 @@ */ package org.apache.qpid.jms.provider.amqp; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.WrappedAsyncResult; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Subclass of the standard session object used solely by AmqpConnection to @@ -25,6 +39,10 @@ import org.apache.qpid.jms.provider.AsyncResult; */ public class AmqpConnectionSession extends AmqpSession { + private static final Logger LOG = LoggerFactory.getLogger(AmqpConnectionSession.class); + + private final Map<String, AsyncResult> pendingUnsubs = new HashMap<String, AsyncResult>(); + /** * Create a new instance of a Connection owned Session object. * @@ -46,6 +64,155 @@ public class AmqpConnectionSession extends AmqpSession { * the request that awaits the completion of this action. */ public void unsubscribe(String subscriptionName, AsyncResult request) { - request.onSuccess(); + SubscriptionSourceRequestor requestor = new SubscriptionSourceRequestor(getJmsResource(), subscriptionName); + SubscriptionSourceRequest sourceRequest = new SubscriptionSourceRequest(requestor, request); + pendingUnsubs.put(subscriptionName, sourceRequest); + + LOG.debug("Attempting remove of subscription: {}", subscriptionName); + requestor.open(sourceRequest); + } + + private class SubscriptionSourceRequestor extends AmqpAbstractResource<JmsSessionInfo, Receiver> { + + private final String subscriptionName; + + public SubscriptionSourceRequestor(JmsSessionInfo resource, String subscriptionName) { + super(resource); + this.subscriptionName = subscriptionName; + } + + @Override + protected void doOpen() { + endpoint = AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName); + endpoint.setTarget(new Target()); + endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); + endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + } + + @Override + protected void doClose() { + } + + public String getSubscriptionName() { + return subscriptionName; + } + } + + private class SubscriptionSourceRequest extends WrappedAsyncResult { + + private final SubscriptionSourceRequestor requestor; + + public SubscriptionSourceRequest(SubscriptionSourceRequestor requestor, AsyncResult originalRequest) { + super(originalRequest); + this.requestor = requestor; + } + + @Override + public void onSuccess() { + final Source returnedSource = (Source) requestor.getEndpoint().getRemoteSource(); + if (returnedSource == null) { + LOG.trace("No Source returned for subscription: {}", requestor.getSubscriptionName()); + pendingUnsubs.remove(requestor.getSubscriptionName()); + super.onFailure(new IOException("Could not fetch remote subscription information")); + } else { + LOG.trace("Source returned for subscription: {} closing first stage", requestor.getSubscriptionName()); + requestor.close(new AsyncResult() { + + @Override + public void onSuccess() { + RemoveDurabilityRequestor removeRequestor = + new RemoveDurabilityRequestor(getJmsResource(), requestor.getSubscriptionName(), returnedSource); + RemoveDurabilityRequest removeRequest = new RemoveDurabilityRequest(removeRequestor, getWrappedRequest()); + pendingUnsubs.put(requestor.getSubscriptionName(), removeRequest); + LOG.trace("Second stage remove started for subscription: {}", requestor.getSubscriptionName()); + removeRequestor.open(removeRequest); + } + + @Override + public void onFailure(Throwable result) { + LOG.trace("Second stage remove failed for subscription: {}", requestor.getSubscriptionName()); + pendingUnsubs.remove(requestor.getSubscriptionName()); + getWrappedRequest().onFailure(result); + } + + @Override + public boolean isComplete() { + return getWrappedRequest().isComplete(); + } + }); + } + } + + @Override + public void onFailure(Throwable result) { + pendingUnsubs.remove(requestor.getSubscriptionName()); + super.onFailure(result); + } + } + + private class RemoveDurabilityRequestor extends AmqpAbstractResource<JmsSessionInfo, Receiver> { + + private final String subscriptionName; + private final Source subscriptionSource; + + public RemoveDurabilityRequestor(JmsSessionInfo resource, String subscriptionName, Source subscriptionSource) { + super(resource); + this.subscriptionSource = subscriptionSource; + this.subscriptionName = subscriptionName; + } + + @Override + protected void doOpen() { + endpoint = AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName); + + subscriptionSource.setDurable(TerminusDurability.NONE); + subscriptionSource.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + endpoint.setSource(subscriptionSource); + endpoint.setTarget(new Target()); + endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); + endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + } + + @Override + public void remotelyClosed() { + if (isAwaitingOpen()) { + openRequest.onSuccess(); + } else { + AmqpConnectionSession.this.reportError(new IOException("Durable unsubscribe failed unexpectedly")); + } + } + + @Override + protected void doClose() { + } + + public String getSubscriptionName() { + return subscriptionName; + } + } + + private class RemoveDurabilityRequest extends WrappedAsyncResult { + + private final RemoveDurabilityRequestor requestor; + + public RemoveDurabilityRequest(RemoveDurabilityRequestor requestor, AsyncResult originalRequest) { + super(originalRequest); + this.requestor = requestor; + } + + @Override + public void onSuccess() { + LOG.trace("Second stage remove complete for subscription: {}", requestor.getSubscriptionName()); + pendingUnsubs.remove(requestor.getSubscriptionName()); + requestor.close(getWrappedRequest()); + } + + @Override + public void onFailure(Throwable result) { + LOG.trace("Second stage remove failed for subscription: {}", requestor.getSubscriptionName()); + pendingUnsubs.remove(requestor.getSubscriptionName()); + super.onFailure(result); + } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java index 46f8130..c4d33fe 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java @@ -85,6 +85,13 @@ public interface AmqpResource { void failed(); /** + * Called to indicate that the remote end has become closed but the resource + * was not awaiting a close. This could happen during an open request where + * the remote does not set an error condition or during normal operation. + */ + void remotelyClosed(); + + /** * Sets the failed state for this Resource and triggers a failure signal for * any pending ProduverRequest. * @@ -118,6 +125,11 @@ public interface AmqpResource { void processFlowUpdates() throws IOException; /** + * @returns true if the remote end has sent an error + */ + boolean hasRemoteError(); + + /** * @return an Exception derived from the error state of the endpoint's Remote Condition. */ Exception getRemoteError(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index a671ffc..39c9f9d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -247,6 +247,17 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { return result; } + /** + * Call to send an error that occurs outside of the normal asynchronous processing + * of a session resource such as a remote close etc. + * + * @param error + * The error to forward on to the Provider error event handler. + */ + public void reportError(Exception error) { + getConnection().getProvider().fireProviderException(error); + } + public AmqpProvider getProvider() { return this.connection.getProvider(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java index 5994184..74ed558 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java @@ -19,19 +19,24 @@ package org.apache.qpid.jms.consumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; +import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +71,109 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { } @Test(timeout = 60000) + public void testDuableSubscriptionUnsubscribe() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber").close(); + + BrokerViewMBean broker = getProxyToBroker(); + assertEquals(1, broker.getInactiveDurableTopicSubscribers().length); + + session.unsubscribe(name.getMethodName() + "-subscriber"); + + assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); + assertEquals(0, broker.getDurableTopicSubscribers().length); + } + + @Test(timeout = 60000) + public void testDuableSubscriptionUnsubscribeNoExistingSubThrowsJMSEx() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + BrokerViewMBean broker = getProxyToBroker(); + assertEquals(0, broker.getDurableTopicSubscribers().length); + assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); + + try { + session.unsubscribe(name.getMethodName() + "-subscriber"); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + } + } + + @Test(timeout = 60000) + public void testDuableSubscriptionUnsubscribeInUseThrowsJMSEx() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + assertNotNull(consumer); + + BrokerViewMBean broker = getProxyToBroker(); + assertEquals(1, broker.getDurableTopicSubscribers().length); + assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); + + try { + session.unsubscribe(name.getMethodName() + "-subscriber"); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + } + + assertEquals(1, broker.getDurableTopicSubscribers().length); + assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); + } + + @Ignore + @Test(timeout = 60000) + public void testDuableSubscriptionUnsubscribeInUseThrowsAndRecovers() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + assertNotNull(consumer); + + BrokerViewMBean broker = getProxyToBroker(); + assertEquals(1, broker.getDurableTopicSubscribers().length); + assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); + + try { + session.unsubscribe(name.getMethodName() + "-subscriber"); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + } + + assertEquals(1, broker.getDurableTopicSubscribers().length); + assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); + + consumer.close(); + + assertEquals(0, broker.getDurableTopicSubscribers().length); + assertEquals(1, broker.getInactiveDurableTopicSubscribers().length); + + session.unsubscribe(name.getMethodName() + "-subscriber"); + + assertEquals(0, broker.getDurableTopicSubscribers().length); + assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); + } + + @Test(timeout = 60000) public void testDurableGoesOfflineAndReturns() throws Exception { connection = createAmqpConnection(); connection.setClientID("DURABLE-AMQP"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
