Repository: qpid-jms
Updated Branches:
  refs/heads/master 91e1b02ad -> e7692d9d0


QPIDJMS-242 Creating a JMS resource can hang during failover reconnect

During failover the recovery attempts to recreate JMS resources that
were previously opened but recovers some resources that are still in the
process of being created lead to a double create effect on recover that
leads to the initial create call to hang.  Need to track state of the
JMS resources so that the failover recover can skip any tracked
instances that are either not yet opened or have already been closed.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e7692d9d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e7692d9d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e7692d9d

Branch: refs/heads/master
Commit: e7692d9d06d24e9660bcf4f90d8175fdd136ad66
Parents: 91e1b02
Author: Timothy Bish <[email protected]>
Authored: Wed Jan 4 18:34:16 2017 -0500
Committer: Timothy Bish <[email protected]>
Committed: Wed Jan 4 18:34:16 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   4 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  18 +-
 .../org/apache/qpid/jms/JmsMessageProducer.java |  10 +-
 .../java/org/apache/qpid/jms/JmsSession.java    |  23 +-
 .../qpid/jms/JmsTemporaryDestination.java       |  11 +
 .../qpid/jms/meta/JmsAbstractResource.java      |  39 ++++
 .../apache/qpid/jms/meta/JmsConnectionInfo.java |   2 +-
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |   2 +-
 .../apache/qpid/jms/meta/JmsProducerInfo.java   |   2 +-
 .../org/apache/qpid/jms/meta/JmsResource.java   |  19 ++
 .../apache/qpid/jms/meta/JmsSessionInfo.java    |   2 +-
 .../qpid/jms/meta/JmsTransactionInfo.java       |   2 +-
 .../jms/provider/amqp/AmqpAbstractResource.java |   5 +
 .../amqp/builders/AmqpResourceBuilder.java      |  10 +-
 .../jms/provider/amqp/AmqpProviderTest.java     |   3 +-
 .../failover/FailoverIntegrationTest.java       | 212 +++++++++++++++++++
 16 files changed, 333 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index cb6ac50..3c0f918 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -62,6 +62,7 @@ import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
@@ -262,9 +263,10 @@ public class JmsConnection implements AutoCloseable, 
Connection, TopicConnection
      * Called to free all Connection resources.
      */
     protected void shutdown(Exception cause) throws JMSException {
-
         // NOTE - Once ConnectionConsumer is added we must shutdown those as 
well.
 
+        connectionInfo.setState(ResourceState.CLOSED);
+
         for (JmsSession session : sessions.values()) {
             session.shutdown(cause);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index a76398f..6c4c06b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -35,6 +35,7 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
@@ -185,6 +186,7 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
 
     protected void shutdown(Throwable cause) throws JMSException {
         if (closed.compareAndSet(false, true)) {
+            consumerInfo.setState(ResourceState.CLOSED);
             setFailureCause(cause);
             session.remove(this);
             stop(true);
@@ -668,15 +670,19 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
     }
 
     protected void onConnectionRecovery(Provider provider) throws Exception {
-        ProviderFuture request = new ProviderFuture();
-        provider.create(consumerInfo, request);
-        request.sync();
+        if (consumerInfo.isOpen()) {
+            ProviderFuture request = new ProviderFuture();
+            provider.create(consumerInfo, request);
+            request.sync();
+        }
     }
 
     protected void onConnectionRecovered(Provider provider) throws Exception {
-        ProviderFuture request = new ProviderFuture();
-        provider.start(consumerInfo, request);
-        request.sync();
+        if (consumerInfo.isOpen()) {
+            ProviderFuture request = new ProviderFuture();
+            provider.start(consumerInfo, request);
+            request.sync();
+        }
     }
 
     protected void onConnectionRestored() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index c61dad0..df01058 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -32,6 +32,7 @@ import javax.jms.MessageProducer;
 import org.apache.qpid.jms.message.JmsMessageIDBuilder;
 import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderFuture;
 
@@ -107,6 +108,7 @@ public class JmsMessageProducer implements AutoCloseable, 
MessageProducer {
 
     protected void shutdown(Throwable cause) throws JMSException {
         if (closed.compareAndSet(false, true)) {
+            producerInfo.setState(ResourceState.CLOSED);
             failureCause.set(cause);
             session.remove(this);
         }
@@ -349,9 +351,11 @@ public class JmsMessageProducer implements AutoCloseable, 
MessageProducer {
     }
 
     protected void onConnectionRecovery(Provider provider) throws Exception {
-        ProviderFuture request = new ProviderFuture();
-        provider.create(producerInfo, request);
-        request.sync();
+        if (producerInfo.isOpen()) {
+            ProviderFuture request = new ProviderFuture();
+            provider.create(producerInfo, request);
+            request.sync();
+        }
     }
 
     protected void onConnectionRecovered(Provider provider) throws Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index deca6b5..49729ec 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -75,6 +75,7 @@ import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
@@ -279,6 +280,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
 
     protected void shutdown(Throwable cause) throws JMSException {
         if (closed.compareAndSet(false, true)) {
+            sessionInfo.setState(ResourceState.CLOSED);
             setFailureCause(cause);
             stop();
             for (JmsMessageConsumer consumer : new 
ArrayList<JmsMessageConsumer>(this.consumers.values())) {
@@ -1227,19 +1229,20 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
     }
 
     protected void onConnectionRecovery(Provider provider) throws Exception {
+        if (sessionInfo.isOpen()) {
+            ProviderFuture request = new ProviderFuture();
+            provider.create(sessionInfo, request);
+            request.sync();
 
-        ProviderFuture request = new ProviderFuture();
-        provider.create(sessionInfo, request);
-        request.sync();
+            transactionContext.onConnectionRecovery(provider);
 
-        transactionContext.onConnectionRecovery(provider);
-
-        for (JmsMessageProducer producer : producers.values()) {
-            producer.onConnectionRecovery(provider);
-        }
+            for (JmsMessageProducer producer : producers.values()) {
+                producer.onConnectionRecovery(provider);
+            }
 
-        for (JmsMessageConsumer consumer : consumers.values()) {
-            consumer.onConnectionRecovery(provider);
+            for (JmsMessageConsumer consumer : consumers.values()) {
+                consumer.onConnectionRecovery(provider);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
index 33cefa2..e299b7f 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
@@ -30,6 +30,7 @@ public abstract class JmsTemporaryDestination extends 
JmsDestination implements
 
     private boolean deleted;
     private JmsTemporaryDestinationId resourceId;
+    private ResourceState state = ResourceState.INITIALIZED;
 
     public JmsTemporaryDestination() {
         this(null, false);
@@ -48,6 +49,16 @@ public abstract class JmsTemporaryDestination extends 
JmsDestination implements
         return resourceId;
     }
 
+    @Override
+       public ResourceState getState() {
+        return state;
+    }
+
+    @Override
+       public void setState(ResourceState state) {
+        this.state = state;
+    }
+
     void setConnection(JmsConnection connection) {
         this.connection = connection;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResource.java
new file mode 100644
index 0000000..548a6cf
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+/**
+ * Abstract resource class for JmsResource instances.
+ */
+public abstract class JmsAbstractResource implements JmsResource {
+
+    private ResourceState state = ResourceState.INITIALIZED;
+
+    @Override
+    public ResourceState getState() {
+        return state;
+    }
+
+    @Override
+    public void setState(ResourceState state) {
+        this.state = state;
+    }
+
+    public boolean isOpen() {
+        return ResourceState.OPEN.equals(state);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index 5af90e6..db475fe 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -36,7 +36,7 @@ import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
  * Meta object that contains the JmsConnection identification and configuration
  * options.  Providers can extend this to add Provider specific data as needed.
  */
-public final class JmsConnectionInfo implements JmsResource, 
Comparable<JmsConnectionInfo> {
+public final class JmsConnectionInfo extends JmsAbstractResource implements 
Comparable<JmsConnectionInfo> {
 
     public static final long INFINITE = -1;
     public static final long DEFAULT_CONNECT_TIMEOUT = 15000;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index e8f793f..61f353d 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -22,7 +22,7 @@ import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 
-public final class JmsConsumerInfo implements JmsResource, 
Comparable<JmsConsumerInfo> {
+public final class JmsConsumerInfo extends JmsAbstractResource implements 
Comparable<JmsConsumerInfo> {
 
     private final JmsConsumerId consumerId;
     private JmsDestination destination;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
index 1b006d0..4df6e89 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
@@ -19,7 +19,7 @@ package org.apache.qpid.jms.meta;
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.message.JmsMessageIDBuilder;
 
-public final class JmsProducerInfo implements JmsResource, 
Comparable<JmsProducerInfo> {
+public final class JmsProducerInfo extends JmsAbstractResource implements 
Comparable<JmsProducerInfo> {
 
     private final JmsProducerId producerId;
     private final JmsMessageIDBuilder messageIDBuilder;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
index 0432ead..7911902 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
@@ -22,6 +22,12 @@ package org.apache.qpid.jms.meta;
  */
 public interface JmsResource {
 
+    enum ResourceState {
+        INITIALIZED,
+        OPEN,
+        CLOSED;
+    }
+
     /**
      * Returns the assigned resource ID for this JmsResource instance.
      *
@@ -39,4 +45,17 @@ public interface JmsResource {
      */
     void visit(JmsResourceVistor visitor) throws Exception;
 
+    /**
+     * @return the current state of this resource.
+     */
+    ResourceState getState();
+
+    /**
+     * Sets or updates the current state of this resource.
+     *
+     * @param state
+     *                 The new state to apply to this resource.
+     */
+    void setState(ResourceState state);
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
index 1b7c3da..d1af5f9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
@@ -29,7 +29,7 @@ import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
 import org.apache.qpid.jms.policy.JmsPresettlePolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 
-public final class JmsSessionInfo implements JmsResource, 
Comparable<JmsSessionInfo> {
+public final class JmsSessionInfo extends JmsAbstractResource implements 
Comparable<JmsSessionInfo> {
 
     private final JmsSessionId sessionId;
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
index 5222eab..0f32b16 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
@@ -16,7 +16,7 @@
  */
 package org.apache.qpid.jms.meta;
 
-public final class JmsTransactionInfo implements JmsResource, 
Comparable<JmsTransactionInfo> {
+public final class JmsTransactionInfo extends JmsAbstractResource implements 
Comparable<JmsTransactionInfo> {
 
     private final JmsSessionId sessionId;
     private final JmsTransactionId transactionId;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 23a5782..3a8cf3d 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledFuture;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Endpoint;
@@ -84,6 +85,8 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
             parent.removeChildResource(this);
         }
 
+        resourceInfo.setState(ResourceState.CLOSED);
+
         // If already closed signal success or else the caller might never get 
notified.
         if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
             getEndpoint().getRemoteState() == EndpointState.CLOSED) {
@@ -140,6 +143,8 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
             parent.removeChildResource(this);
         }
 
+        resourceInfo.setState(ResourceState.CLOSED);
+
         if (getEndpoint() != null) {
             // TODO: if this is a producer/consumer link then we may only be 
detached,
             // rather than fully closed, and should respond appropriately.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index a1a9aac..e984e7b 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledFuture;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.AmqpEventSink;
 import org.apache.qpid.jms.provider.amqp.AmqpExceptionBuilder;
@@ -146,17 +147,14 @@ public abstract class AmqpResourceBuilder<TARGET extends 
AmqpResource, PARENT ex
         }
 
         if (isOpenedEndpointValid()) {
+            resourceInfo.setState(ResourceState.OPEN);
             getEndpoint().setContext(resource);
             getParent().addChildResource(resource);
             getRequest().onSuccess();
         } else {
-            getEndpoint().close();
-            getEndpoint().free();
-            getEndpoint().setContext(null);
-
             // TODO: Perhaps the validate method should thrown an exception so 
that we
             // can return a specific error message to the create initiator.
-            getRequest().onFailure(new IOException("Failed to open requested 
endpoint"));
+            handleClosed(provider, new IOException("Failed to open requested 
endpoint"));
         }
     }
 
@@ -164,6 +162,8 @@ public abstract class AmqpResourceBuilder<TARGET extends 
AmqpResource, PARENT ex
         // If the resource being built is closed during the creation process
         // then this is always an error.
 
+        resourceInfo.setState(ResourceState.CLOSED);
+
         // Perform any post processing relating to closure during creation 
attempt
         afterClosed(getResource(), getResourceInfo());
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 32abb66..63702c9 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -29,6 +29,7 @@ import java.net.URISyntaxException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.qpid.jms.meta.JmsAbstractResource;
 import org.apache.qpid.jms.meta.JmsAbstractResourceId;
 import org.apache.qpid.jms.meta.JmsConnectionId;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
@@ -405,7 +406,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
             provider = new AmqpProvider(getPeerURI(testPeer));
 
             final AtomicBoolean errorThrown = new AtomicBoolean();
-            JmsResource resourceInfo = new JmsResource() {
+            JmsResource resourceInfo = new JmsAbstractResource() {
                 @Override
                 public void visit(JmsResourceVistor visitor) {
                     errorThrown.set(true);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e7692d9d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 9c35006..008670c 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1286,6 +1286,218 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testCreateSessionAfterConnectionDrops() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin(nullValue(), false);
+            originalPeer.dropAfterLastHandler();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
+            ((JmsDefaultPrefetchPolicy) 
connection.getPrefetchPolicy()).setQueuePrefetch(0);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", 
originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectEnd();
+            finalPeer.expectClose();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, 
TimeUnit.SECONDS));
+
+            session.close();
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateConsumerAfterConnectionDrops() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.dropAfterLastHandler();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
+            ((JmsDefaultPrefetchPolicy) 
connection.getPrefetchPolicy()).setQueuePrefetch(0);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", 
originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(1)));
+            finalPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(1)));
+            finalPeer.expectDetach(true, true, true);
+            finalPeer.expectClose();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            assertNull(consumer.receive(500));
+            LOG.info("Receive returned");
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, 
TimeUnit.SECONDS));
+
+            LOG.info("Closing consumer");
+            consumer.close();
+
+            // Shut it down
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateProducerAfterConnectionDrops() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.dropAfterLastHandler();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
+            ((JmsDefaultPrefetchPolicy) 
connection.getPrefetchPolicy()).setQueuePrefetch(0);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", 
originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectDetach(true, true, true);
+            finalPeer.expectClose();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, 
TimeUnit.SECONDS));
+
+            LOG.info("Closing consumer");
+            producer.close();
+
+            // Shut it down
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) 
throws JMSException {
         return establishAnonymousConnecton(null, null, peers);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to