Repository: qpid-jms
Updated Branches:
  refs/heads/master 470c87aeb -> 7188d9c76


Implement proper durable topic unsubscribe.  Also ensures that for a
durable subscriber we detach and not close when the MessageConsumer is
closed.  Tests will continue to work with ActiveMQ as it removes
subscriptions on the first reattach attempt and does not currently do a
remove on close.  Some of the AmqpResource code is a bit messy now and
needs to be refactored a bit after this change.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7188d9c7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7188d9c7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7188d9c7

Branch: refs/heads/master
Commit: 7188d9c76d12f99e0a49ffa56d13e6cbcbc9c131
Parents: 470c87a
Author: Timothy Bish <[email protected]>
Authored: Wed Oct 29 16:51:31 2014 -0400
Committer: Timothy Bish <[email protected]>
Committed: Wed Oct 29 16:51:31 2014 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpAbstractResource.java |  21 ++-
 .../provider/amqp/AmqpAnonymousProducer.java    |   8 -
 .../qpid/jms/provider/amqp/AmqpConnection.java  |  14 +-
 .../provider/amqp/AmqpConnectionSession.java    | 147 +++----------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |   6 +
 .../jms/provider/amqp/AmqpFixedProducer.java    |   5 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    |   8 +
 .../qpid/jms/provider/amqp/AmqpSession.java     |  21 +++
 .../provider/amqp/AmqpTemporaryDestination.java |   4 +
 .../provider/amqp/AmqpTransactionContext.java   |   5 +-
 .../transactions/JmsTransactedConsumerTest.java |  24 +++
 11 files changed, 115 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 28e16b6..6def062 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -76,7 +76,6 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
         this.openRequest = request;
         doOpen();
         this.endpoint.setContext(this);
-        this.endpoint.open();
     }
 
     @Override
@@ -107,7 +106,6 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
 
         this.closeRequest = request;
         doClose();
-        this.endpoint.close();
     }
 
     @Override
@@ -254,8 +252,21 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
     public void processFlowUpdates() throws IOException {
     }
 
-    protected abstract void doOpen();
-
-    protected abstract void doClose();
+    /**
+     * Perform the open operation on the managed endpoint.  A subclass may
+     * override this method to provide additional open actions or configuration
+     * updates.
+     */
+    protected void doOpen() {
+        endpoint.open();
+    }
 
+    /**
+     * Perform the close operation on the managed endpoint.  A subclass may
+     * override this method to provide additional close actions or alter the
+     * standard close path such as endpoint detach etc.
+     */
+    protected void doClose() {
+        endpoint.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
index e7dceb0..69bfdf7 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
@@ -115,14 +115,6 @@ public class AmqpAnonymousProducer extends AmqpProducer {
     }
 
     @Override
-    protected void doOpen() {
-    }
-
-    @Override
-    protected void doClose() {
-    }
-
-    @Override
     public boolean isAnonymous() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 448556a..8d5a458 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
 import javax.jms.Session;
 
@@ -95,10 +96,7 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
     protected void doOpen() {
         this.endpoint.setContainer(resource.getClientId());
         this.endpoint.setHostname(remoteURI.getHost());
-    }
-
-    @Override
-    protected void doClose() {
+        super.doOpen();
     }
 
     public AmqpSession createSession(JmsSessionInfo sessionInfo) {
@@ -112,6 +110,14 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
     }
 
     public void unsubscribe(String subscriptionName, AsyncResult request) {
+
+        for (AmqpSession session : sessions.values()) {
+            if (session.containsSubscription(subscriptionName)) {
+                request.onFailure(new JMSException("Cannot remove an active 
durable subscription"));
+                return;
+            }
+        }
+
         connectionSession.unsubscribe(subscriptionName, request);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index eaddc17..3cfc0c6 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -16,17 +16,16 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.jms.JMSException;
+
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.NoOpAsyncResult;
 import org.apache.qpid.jms.provider.WrappedAsyncResult;
-import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
-import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Receiver;
@@ -64,33 +63,28 @@ public class AmqpConnectionSession extends AmqpSession {
      *        the request that awaits the completion of this action.
      */
     public void unsubscribe(String subscriptionName, AsyncResult request) {
-        SubscriptionSourceRequestor requestor = new 
SubscriptionSourceRequestor(getJmsResource(), subscriptionName);
-        SubscriptionSourceRequest sourceRequest = new 
SubscriptionSourceRequest(requestor, request);
-        pendingUnsubs.put(subscriptionName, sourceRequest);
+        DurableSubscriptionReattach subscriber = new 
DurableSubscriptionReattach(getJmsResource(), subscriptionName);
+        DurableSubscriptionReattachRequest subscribeRequest = new 
DurableSubscriptionReattachRequest(subscriber, request);
+        pendingUnsubs.put(subscriptionName, subscribeRequest);
 
         LOG.debug("Attempting remove of subscription: {}", subscriptionName);
-        requestor.open(sourceRequest);
+        subscriber.open(subscribeRequest);
     }
 
-    private class SubscriptionSourceRequestor extends 
AmqpAbstractResource<JmsSessionInfo, Receiver> {
-
+    private class DurableSubscriptionReattach extends 
AmqpAbstractResource<JmsSessionInfo, Receiver> {
         private final String subscriptionName;
 
-        public SubscriptionSourceRequestor(JmsSessionInfo resource, String 
subscriptionName) {
-            super(resource);
+        public DurableSubscriptionReattach(JmsSessionInfo resource, String 
subscriptionName) {
+            super(resource, 
AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName));
             this.subscriptionName = subscriptionName;
         }
 
         @Override
         protected void doOpen() {
-            endpoint = 
AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName);
             endpoint.setTarget(new Target());
             endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
             endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-        }
-
-        @Override
-        protected void doClose() {
+            super.doOpen();
         }
 
         public String getSubscriptionName() {
@@ -98,125 +92,32 @@ public class AmqpConnectionSession extends AmqpSession {
         }
     }
 
-    private class SubscriptionSourceRequest extends WrappedAsyncResult {
+    private class DurableSubscriptionReattachRequest extends 
WrappedAsyncResult {
 
-        private final SubscriptionSourceRequestor requestor;
+        private final DurableSubscriptionReattach subscriber;
 
-        public SubscriptionSourceRequest(SubscriptionSourceRequestor 
requestor, AsyncResult originalRequest) {
+        public DurableSubscriptionReattachRequest(DurableSubscriptionReattach 
subscriber, AsyncResult originalRequest) {
             super(originalRequest);
-            this.requestor = requestor;
+            this.subscriber = subscriber;
         }
 
         @Override
         public void onSuccess() {
-            Object result = requestor.getEndpoint().getRemoteSource();
-            if (result == null || !(result instanceof Source)) {
-                LOG.trace("No Source returned for subscription: {}", 
requestor.getSubscriptionName());
-                pendingUnsubs.remove(requestor.getSubscriptionName());
-                requestor.closed();
-                super.onFailure(new IOException("Could not fetch remote 
subscription information"));
-            } else {
-                final Source remoteSource = (Source) result;
-                LOG.trace("Source returned for subscription: {} closing first 
stage", requestor.getSubscriptionName());
-                requestor.close(new AsyncResult() {
-
-                    @Override
-                    public void onSuccess() {
-                        RemoveDurabilityRequestor removeRequestor =
-                            new RemoveDurabilityRequestor(getJmsResource(), 
requestor.getSubscriptionName(), remoteSource);
-                        RemoveDurabilityRequest removeRequest = new 
RemoveDurabilityRequest(removeRequestor, getWrappedRequest());
-                        pendingUnsubs.put(requestor.getSubscriptionName(), 
removeRequest);
-                        LOG.trace("Second stage remove started for 
subscription: {}", requestor.getSubscriptionName());
-                        removeRequestor.open(removeRequest);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable result) {
-                        LOG.trace("Second stage remove failed for 
subscription: {}", requestor.getSubscriptionName());
-                        pendingUnsubs.remove(requestor.getSubscriptionName());
-                        getWrappedRequest().onFailure(result);
-                    }
-
-                    @Override
-                    public boolean isComplete() {
-                        return getWrappedRequest().isComplete();
-                    }
-                });
-            }
-        }
-
-        @Override
-        public void onFailure(Throwable result) {
-            pendingUnsubs.remove(requestor.getSubscriptionName());
-            requestor.closed();
-            super.onFailure(result);
-        }
-    }
-
-    private class RemoveDurabilityRequestor extends 
AmqpAbstractResource<JmsSessionInfo, Receiver> {
-
-        private final String subscriptionName;
-        private final Source subscriptionSource;
-
-        public RemoveDurabilityRequestor(JmsSessionInfo resource, String 
subscriptionName, Source subscriptionSource) {
-            super(resource);
-            this.subscriptionSource = subscriptionSource;
-            this.subscriptionName = subscriptionName;
-        }
-
-        @Override
-        protected void doOpen() {
-            endpoint = 
AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName);
-
-            subscriptionSource.setDurable(TerminusDurability.NONE);
-            
subscriptionSource.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-
-            endpoint.setSource(subscriptionSource);
-            endpoint.setTarget(new Target());
-            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-            endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-        }
-
-        @Override
-        public void remotelyClosed() {
-            if (isAwaitingOpen()) {
-                openRequest.onSuccess();
+            LOG.trace("Reattached to subscription: {}", 
subscriber.getSubscriptionName());
+            pendingUnsubs.remove(subscriber.getSubscriptionName());
+            if (subscriber.getEndpoint().getRemoteSource() != null) {
+                subscriber.close(getWrappedRequest());
             } else {
-                closed();
-                AmqpConnectionSession.this.reportError(new 
IOException("Durable unsubscribe failed unexpectedly"));
+                subscriber.close(NoOpAsyncResult.INSTANCE);
+                getWrappedRequest().onFailure(new JMSException("Cannot remove 
a subscription that does not exist"));
             }
         }
 
         @Override
-        protected void doClose() {
-        }
-
-        public String getSubscriptionName() {
-            return subscriptionName;
-        }
-    }
-
-    private class RemoveDurabilityRequest extends WrappedAsyncResult {
-
-        private final RemoveDurabilityRequestor requestor;
-
-        public RemoveDurabilityRequest(RemoveDurabilityRequestor requestor, 
AsyncResult originalRequest) {
-            super(originalRequest);
-            this.requestor = requestor;
-        }
-
-        @Override
-        public void onSuccess() {
-            LOG.trace("Second stage remove complete for subscription: {}", 
requestor.getSubscriptionName());
-            pendingUnsubs.remove(requestor.getSubscriptionName());
-            requestor.close(getWrappedRequest());
-        }
-
-        @Override
         public void onFailure(Throwable result) {
-            LOG.trace("Second stage remove failed for subscription: {}", 
requestor.getSubscriptionName());
-            pendingUnsubs.remove(requestor.getSubscriptionName());
-            requestor.closed();
+            LOG.trace("Failed to reattach to subscription: {}", 
subscriber.getSubscriptionName());
+            pendingUnsubs.remove(subscriber.getSubscriptionName());
+            subscriber.closed();
             super.onFailure(result);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 3caec3c..cb766bf 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -119,6 +119,7 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
         }
         endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        super.doOpen();
     }
 
     @Override
@@ -346,6 +347,11 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     @Override
     protected void doClose() {
+        if (resource.isDurable()) {
+            this.endpoint.detach();
+        } else {
+            this.endpoint.close();
+        }
     }
 
     public AmqpConnection getConnection() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 56ed79f..8174817 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -258,10 +258,7 @@ public class AmqpFixedProducer extends AmqpProducer {
             endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
         }
         endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-    }
-
-    @Override
-    protected void doClose() {
+        super.doOpen();
     }
 
     public AmqpSession getSession() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 200a367..c90c773 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -698,7 +698,15 @@ public class AmqpProvider extends AbstractProvider 
implements TransportListener
                         session.processStateChange();
                         break;
                     case LINK_REMOTE_CLOSE:
+                        LOG.info("Link closed: {}", 
protonEvent.getLink().getContext());
+                        AmqpResource cloedResource = (AmqpResource) 
protonEvent.getLink().getContext();
+                        cloedResource.processStateChange();
+                        break;
                     case LINK_REMOTE_DETACH:
+                        LOG.info("Link detach: {}", 
protonEvent.getLink().getContext());
+                        AmqpResource detachedResource = (AmqpResource) 
protonEvent.getLink().getContext();
+                        detachedResource.processStateChange();
+                        break;
                     case LINK_REMOTE_OPEN:
                         AmqpResource resource = (AmqpResource) 
protonEvent.getLink().getContext();
                         resource.processStateChange();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 39c9f9d..ca28165 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -69,11 +69,13 @@ public class AmqpSession extends 
AmqpAbstractResource<JmsSessionInfo, Session> {
     protected void doOpen() {
         this.endpoint.setIncomingCapacity(Integer.MAX_VALUE);
         this.connection.addSession(this);
+        super.doOpen();
     }
 
     @Override
     protected void doClose() {
         this.connection.removeSession(this);
+        super.doClose();
     }
 
     /**
@@ -248,6 +250,25 @@ public class AmqpSession extends 
AmqpAbstractResource<JmsSessionInfo, Session> {
     }
 
     /**
+     * Query the Session to see if there are any registered consumer instances 
that have
+     * a durable subscription with the given subscription name.
+     *
+     * @param subscriptionName
+     *        the name of the subscription being searched for.
+     *
+     * @return true if there is a consumer that has the given subscription.
+     */
+    public boolean containsSubscription(String subscriptionName) {
+        for (AmqpConsumer consumer : consumers.values()) {
+            if 
(subscriptionName.equals(consumer.getJmsResource().getSubscriptionName())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * Call to send an error that occurs outside of the normal asynchronous 
processing
      * of a session resource such as a remote close etc.
      *

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 3c6893d..2cf1cb9 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -107,11 +107,15 @@ public class AmqpTemporaryDestination extends 
AmqpAbstractResource<JmsDestinatio
         endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
         this.connection.addTemporaryDestination(this);
+
+        super.doOpen();
     }
 
     @Override
     protected void doClose() {
         this.connection.removeTemporaryDestination(this);
+
+        super.doClose();
     }
 
     public AmqpConnection getConnection() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 6bfbaa5..5492976 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -140,10 +140,7 @@ public class AmqpTransactionContext extends 
AmqpAbstractResource<JmsSessionInfo,
         endpoint.setTarget(coordinator);
         endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
         endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-    }
-
-    @Override
-    protected void doClose() {
+        super.doOpen();
     }
 
     public void begin(JmsTransactionId txId, AsyncResult request) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index 49121ad..cfe6328 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -71,6 +72,29 @@ public class JmsTransactedConsumerTest extends 
AmqpTestSupport {
         assertEquals(0, proxy.getQueueSize());
     }
 
+    @Test(timeout=30000)
+    public void testRollbackRececeivedMessageAndClose() throws Exception {
+
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("TestMessage-0"));
+        producer.close();
+        session.commit();
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        Message msg = consumer.receive(2000);
+        assertNotNull(msg);
+
+        session.rollback();
+
+        connection.close();
+    }
+
     @Test(timeout = 60000)
     public void testReceiveAndRollback() throws Exception {
         connection = createAmqpConnection();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to