Repository: qpid-jms
Updated Branches:
  refs/heads/master 75da189a3 -> f1584d25c


A functional durable subscription removal mechanism.  Show where we need
some cleanup work and also looks to show a problem with handling on the
broker side when the durable sub is not removable due to in use.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f1584d25
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f1584d25
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f1584d25

Branch: refs/heads/master
Commit: f1584d25c4e5b795435fe00abaffc3e593ab636a
Parents: 75da189
Author: Timothy Bish <[email protected]>
Authored: Mon Oct 20 18:10:21 2014 -0400
Committer: Timothy Bish <[email protected]>
Committed: Mon Oct 20 18:10:31 2014 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpAbstractResource.java |  41 +++--
 .../provider/amqp/AmqpConnectionSession.java    | 169 ++++++++++++++++++-
 .../qpid/jms/provider/amqp/AmqpResource.java    |  12 ++
 .../qpid/jms/provider/amqp/AmqpSession.java     |  11 ++
 .../jms/consumer/JmsDurableSubscriberTest.java  | 108 ++++++++++++
 5 files changed, 330 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 83249c2..28e16b6 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -122,13 +122,13 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
 
     @Override
     public void closed() {
+        this.endpoint.close();
+        this.endpoint.free();
+
         if (this.closeRequest != null) {
             this.closeRequest.onSuccess();
             this.closeRequest = null;
         }
-
-        this.endpoint.close();
-        this.endpoint.free();
     }
 
     @Override
@@ -149,6 +149,21 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
         }
     }
 
+    @Override
+    public void remotelyClosed() {
+        if (isAwaitingOpen()) {
+            Exception error = getRemoteError();
+            if (error == null) {
+                error = new IOException("Remote has closed without error 
information");
+            }
+
+            openRequest.onFailure(error);
+            openRequest = null;
+        }
+
+        // TODO - We need a way to signal that the remote closed unexpectedly.
+    }
+
     public E getEndpoint() {
         return this.endpoint;
     }
@@ -172,14 +187,21 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
     }
 
     @Override
+    public boolean hasRemoteError() {
+        return endpoint.getRemoteCondition().getCondition() != null;
+    }
+
+    @Override
     public Exception getRemoteError() {
         String message = getRemoteErrorMessage();
         Exception remoteError = null;
         Symbol error = endpoint.getRemoteCondition().getCondition();
-        if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
-            remoteError = new JMSSecurityException(message);
-        } else {
-            remoteError = new JMSException(message);
+        if (error != null) {
+            if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+                remoteError = new JMSSecurityException(message);
+            } else {
+                remoteError = new JMSException(message);
+            }
         }
 
         return remoteError;
@@ -213,14 +235,13 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
             if (isAwaitingClose()) {
                 LOG.debug("{} is now closed: ", this);
                 closed();
-            } else if (isAwaitingOpen()) {
+            } else if (isAwaitingOpen() && hasRemoteError()) {
                 // Error on Open, create exception and signal failure.
                 LOG.warn("Open of {} failed: ", this);
                 Exception remoteError = this.getRemoteError();
                 failed(remoteError);
             } else {
-                // TODO - Handle remote asynchronous close.
-                LOG.warn("{} was closed remotely.", this);
+                remotelyClosed();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 341893e..f640e64 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -16,8 +16,22 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.WrappedAsyncResult;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Subclass of the standard session object used solely by AmqpConnection to
@@ -25,6 +39,10 @@ import org.apache.qpid.jms.provider.AsyncResult;
  */
 public class AmqpConnectionSession extends AmqpSession {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpConnectionSession.class);
+
+    private final Map<String, AsyncResult> pendingUnsubs = new HashMap<String, 
AsyncResult>();
+
     /**
      * Create a new instance of a Connection owned Session object.
      *
@@ -46,6 +64,155 @@ public class AmqpConnectionSession extends AmqpSession {
      *        the request that awaits the completion of this action.
      */
     public void unsubscribe(String subscriptionName, AsyncResult request) {
-        request.onSuccess();
+        SubscriptionSourceRequestor requestor = new 
SubscriptionSourceRequestor(getJmsResource(), subscriptionName);
+        SubscriptionSourceRequest sourceRequest = new 
SubscriptionSourceRequest(requestor, request);
+        pendingUnsubs.put(subscriptionName, sourceRequest);
+
+        LOG.debug("Attempting remove of subscription: {}", subscriptionName);
+        requestor.open(sourceRequest);
+    }
+
+    private class SubscriptionSourceRequestor extends 
AmqpAbstractResource<JmsSessionInfo, Receiver> {
+
+        private final String subscriptionName;
+
+        public SubscriptionSourceRequestor(JmsSessionInfo resource, String 
subscriptionName) {
+            super(resource);
+            this.subscriptionName = subscriptionName;
+        }
+
+        @Override
+        protected void doOpen() {
+            endpoint = 
AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName);
+            endpoint.setTarget(new Target());
+            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        }
+
+        @Override
+        protected void doClose() {
+        }
+
+        public String getSubscriptionName() {
+            return subscriptionName;
+        }
+    }
+
+    private class SubscriptionSourceRequest extends WrappedAsyncResult {
+
+        private final SubscriptionSourceRequestor requestor;
+
+        public SubscriptionSourceRequest(SubscriptionSourceRequestor 
requestor, AsyncResult originalRequest) {
+            super(originalRequest);
+            this.requestor = requestor;
+        }
+
+        @Override
+        public void onSuccess() {
+            final Source returnedSource = (Source) 
requestor.getEndpoint().getRemoteSource();
+            if (returnedSource == null) {
+                LOG.trace("No Source returned for subscription: {}", 
requestor.getSubscriptionName());
+                pendingUnsubs.remove(requestor.getSubscriptionName());
+                super.onFailure(new IOException("Could not fetch remote 
subscription information"));
+            } else {
+                LOG.trace("Source returned for subscription: {} closing first 
stage", requestor.getSubscriptionName());
+                requestor.close(new AsyncResult() {
+
+                    @Override
+                    public void onSuccess() {
+                        RemoveDurabilityRequestor removeRequestor =
+                            new RemoveDurabilityRequestor(getJmsResource(), 
requestor.getSubscriptionName(), returnedSource);
+                        RemoveDurabilityRequest removeRequest = new 
RemoveDurabilityRequest(removeRequestor, getWrappedRequest());
+                        pendingUnsubs.put(requestor.getSubscriptionName(), 
removeRequest);
+                        LOG.trace("Second stage remove started for 
subscription: {}", requestor.getSubscriptionName());
+                        removeRequestor.open(removeRequest);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable result) {
+                        LOG.trace("Second stage remove failed for 
subscription: {}", requestor.getSubscriptionName());
+                        pendingUnsubs.remove(requestor.getSubscriptionName());
+                        getWrappedRequest().onFailure(result);
+                    }
+
+                    @Override
+                    public boolean isComplete() {
+                        return getWrappedRequest().isComplete();
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable result) {
+            pendingUnsubs.remove(requestor.getSubscriptionName());
+            super.onFailure(result);
+        }
+    }
+
+    private class RemoveDurabilityRequestor extends 
AmqpAbstractResource<JmsSessionInfo, Receiver> {
+
+        private final String subscriptionName;
+        private final Source subscriptionSource;
+
+        public RemoveDurabilityRequestor(JmsSessionInfo resource, String 
subscriptionName, Source subscriptionSource) {
+            super(resource);
+            this.subscriptionSource = subscriptionSource;
+            this.subscriptionName = subscriptionName;
+        }
+
+        @Override
+        protected void doOpen() {
+            endpoint = 
AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName);
+
+            subscriptionSource.setDurable(TerminusDurability.NONE);
+            
subscriptionSource.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+            endpoint.setSource(subscriptionSource);
+            endpoint.setTarget(new Target());
+            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        }
+
+        @Override
+        public void remotelyClosed() {
+            if (isAwaitingOpen()) {
+                openRequest.onSuccess();
+            } else {
+                AmqpConnectionSession.this.reportError(new 
IOException("Durable unsubscribe failed unexpectedly"));
+            }
+        }
+
+        @Override
+        protected void doClose() {
+        }
+
+        public String getSubscriptionName() {
+            return subscriptionName;
+        }
+    }
+
+    private class RemoveDurabilityRequest extends WrappedAsyncResult {
+
+        private final RemoveDurabilityRequestor requestor;
+
+        public RemoveDurabilityRequest(RemoveDurabilityRequestor requestor, 
AsyncResult originalRequest) {
+            super(originalRequest);
+            this.requestor = requestor;
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Second stage remove complete for subscription: {}", 
requestor.getSubscriptionName());
+            pendingUnsubs.remove(requestor.getSubscriptionName());
+            requestor.close(getWrappedRequest());
+        }
+
+        @Override
+        public void onFailure(Throwable result) {
+            LOG.trace("Second stage remove failed for subscription: {}", 
requestor.getSubscriptionName());
+            pendingUnsubs.remove(requestor.getSubscriptionName());
+            super.onFailure(result);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
index 46f8130..c4d33fe 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
@@ -85,6 +85,13 @@ public interface AmqpResource {
     void failed();
 
     /**
+     * Called to indicate that the remote end has become closed but the 
resource
+     * was not awaiting a close.  This could happen during an open request 
where
+     * the remote does not set an error condition or during normal operation.
+     */
+    void remotelyClosed();
+
+    /**
      * Sets the failed state for this Resource and triggers a failure signal 
for
      * any pending ProduverRequest.
      *
@@ -118,6 +125,11 @@ public interface AmqpResource {
     void processFlowUpdates() throws IOException;
 
     /**
+     * @returns true if the remote end has sent an error
+     */
+    boolean hasRemoteError();
+
+    /**
      * @return an Exception derived from the error state of the endpoint's 
Remote Condition.
      */
     Exception getRemoteError();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index a671ffc..39c9f9d 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -247,6 +247,17 @@ public class AmqpSession extends 
AmqpAbstractResource<JmsSessionInfo, Session> {
         return result;
     }
 
+    /**
+     * Call to send an error that occurs outside of the normal asynchronous 
processing
+     * of a session resource such as a remote close etc.
+     *
+     * @param error
+     *        The error to forward on to the Provider error event handler.
+     */
+    public void reportError(Exception error) {
+        getConnection().getProvider().fireProviderException(error);
+    }
+
     public AmqpProvider getProvider() {
         return this.connection.getProvider();
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f1584d25/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
index 5994184..74ed558 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
@@ -19,19 +19,24 @@ package org.apache.qpid.jms.consumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +71,109 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testDuableSubscriptionUnsubscribe() throws Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        session.createDurableSubscriber(topic, name.getMethodName() + 
"-subscriber").close();
+
+        BrokerViewMBean broker = getProxyToBroker();
+        assertEquals(1, broker.getInactiveDurableTopicSubscribers().length);
+
+        session.unsubscribe(name.getMethodName() + "-subscriber");
+
+        assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
+        assertEquals(0, broker.getDurableTopicSubscribers().length);
+    }
+
+    @Test(timeout = 60000)
+    public void testDuableSubscriptionUnsubscribeNoExistingSubThrowsJMSEx() 
throws Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+
+        BrokerViewMBean broker = getProxyToBroker();
+        assertEquals(0, broker.getDurableTopicSubscribers().length);
+        assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
+
+        try {
+            session.unsubscribe(name.getMethodName() + "-subscriber");
+            fail("Should have thrown a JMSException");
+        } catch (JMSException ex) {
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testDuableSubscriptionUnsubscribeInUseThrowsJMSEx() throws 
Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        assertNotNull(consumer);
+
+        BrokerViewMBean broker = getProxyToBroker();
+        assertEquals(1, broker.getDurableTopicSubscribers().length);
+        assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
+
+        try {
+            session.unsubscribe(name.getMethodName() + "-subscriber");
+            fail("Should have thrown a JMSException");
+        } catch (JMSException ex) {
+        }
+
+        assertEquals(1, broker.getDurableTopicSubscribers().length);
+        assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
+    }
+
+    @Ignore
+    @Test(timeout = 60000)
+    public void testDuableSubscriptionUnsubscribeInUseThrowsAndRecovers() 
throws Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Topic topic = session.createTopic(name.getMethodName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        assertNotNull(consumer);
+
+        BrokerViewMBean broker = getProxyToBroker();
+        assertEquals(1, broker.getDurableTopicSubscribers().length);
+        assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
+
+        try {
+            session.unsubscribe(name.getMethodName() + "-subscriber");
+            fail("Should have thrown a JMSException");
+        } catch (JMSException ex) {
+        }
+
+        assertEquals(1, broker.getDurableTopicSubscribers().length);
+        assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
+
+        consumer.close();
+
+        assertEquals(0, broker.getDurableTopicSubscribers().length);
+        assertEquals(1, broker.getInactiveDurableTopicSubscribers().length);
+
+        session.unsubscribe(name.getMethodName() + "-subscriber");
+
+        assertEquals(0, broker.getDurableTopicSubscribers().length);
+        assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
+    }
+
+    @Test(timeout = 60000)
     public void testDurableGoesOfflineAndReturns() throws Exception {
         connection = createAmqpConnection();
         connection.setClientID("DURABLE-AMQP");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to