Repository: qpid-jms Updated Branches: refs/heads/master ccb53fd8e -> 18f7a894c
QPIDJMS-175 Set the resource's failure cause early to better reflect state prior to the async task closing the resource and other cleanup work. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/18f7a894 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/18f7a894 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/18f7a894 Branch: refs/heads/master Commit: 18f7a894c343ea7bd53d979603ef3892bfc4eb9a Parents: ccb53fd Author: Timothy Bish <[email protected]> Authored: Mon May 9 19:08:28 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon May 9 19:08:28 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 31 ++++++++++++++++++++ .../org/apache/qpid/jms/JmsMessageProducer.java | 21 ++++++++++--- .../java/org/apache/qpid/jms/JmsSession.java | 8 +++++ 3 files changed, 56 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index ab9fb7a..5f757c7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -61,6 +61,7 @@ import org.apache.qpid.jms.meta.JmsConnectionId; import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsConsumerId; import org.apache.qpid.jms.meta.JmsConsumerInfo; +import org.apache.qpid.jms.meta.JmsProducerId; import org.apache.qpid.jms.meta.JmsProducerInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.meta.JmsSessionId; @@ -1199,7 +1200,37 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection // Run on the connection executor to free the provider to go do more work and avoid // any chance of a deadlock if the code ever looped back to the provider. + if (!closing.get() && !closed.get()) { + + // Set the failure cause indicator now to more quickly reflect the correct + // state in the resource. The actual shutdown and clean will be done on the + // connection executor thread to avoid looping or stalling the provider thread. + if (resource instanceof JmsSessionInfo) { + JmsSession session = sessions.get(resource.getId()); + if (session != null) { + session.setFailureCause(cause); + } + } else if (resource instanceof JmsProducerInfo) { + JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId(); + JmsSession session = sessions.get(parentId); + if (session != null) { + JmsMessageProducer producer = session.lookup((JmsProducerId) resource.getId()); + if (producer != null) { + producer.setFailureCause(cause); + } + } + } else if (resource instanceof JmsConsumerInfo) { + JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId(); + JmsSession session = sessions.get(parentId); + if (session != null) { + JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId()); + if (consumer != null) { + consumer.setFailureCause(cause); + } + } + } + executor.execute(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java index 0cb1646..f8d5684 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -48,7 +49,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { protected boolean disableMessageId; protected boolean disableTimestamp; protected final AtomicLong messageSequence = new AtomicLong(); - protected Exception failureCause; + protected final AtomicReference<Exception> failureCause = new AtomicReference<>(); protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException { this.session = session; @@ -92,7 +93,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { protected void shutdown(Exception cause) throws JMSException { if (closed.compareAndSet(false, true)) { - failureCause = cause; + failureCause.set(cause); session.remove(this); } } @@ -221,11 +222,11 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { if (closed.get()) { IllegalStateException jmsEx = null; - if (failureCause == null) { + if (getFailureCause() == null) { jmsEx = new IllegalStateException("The MessageProducer is closed"); } else { jmsEx = new IllegalStateException("The MessageProducer was closed due to an unrecoverable error."); - jmsEx.initCause(failureCause); + jmsEx.initCause(failureCause.get()); } throw jmsEx; @@ -240,6 +241,18 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { return anonymousProducer; } + void setFailureCause(Exception failureCause) { + this.failureCause.set(failureCause); + } + + Exception getFailureCause() { + if (failureCause.get() == null) { + return session.getFailureCause(); + } + + return failureCause.get(); + } + //////////////////////////////////////////////////////////////////////////// // Connection interruption handlers. //////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/18f7a894/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 3569fc7..1c8f2d8 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -596,6 +596,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe consumers.remove(consumer.getConsumerId()); } + protected JmsMessageConsumer lookup(JmsConsumerId consumerId) { + return consumers.get(consumerId); + } + protected void add(JmsMessageProducer producer) { producers.put(producer.getProducerId(), producer); } @@ -604,6 +608,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe producers.remove(producer.getProducerId()); } + protected JmsMessageProducer lookup(JmsProducerId producerId) { + return producers.get(producerId); + } + protected void onException(Exception ex) { connection.onException(ex); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
