Repository: qpid-jms Updated Branches: refs/heads/master cc00816c4 -> d9807984a
QPIDJMS-300 Fix for race on remote and local close Possible race on local session close and remote producer close can lead to final task to cleanup async completions not being run and the executor in a failed shutdown state. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d9807984 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d9807984 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d9807984 Branch: refs/heads/master Commit: d9807984ab90605ab1a4cb774439b4f7d4fece11 Parents: cc00816 Author: Timothy Bish <tabish...@gmail.com> Authored: Fri Aug 4 12:38:50 2017 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Fri Aug 4 12:38:50 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsSession.java | 66 ++++++++++++-------- .../qpid/jms/util/QpidJMSThreadFactory.java | 22 ++++++- .../integration/ProducerIntegrationTest.java | 1 + 3 files changed, 61 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index a821c18..bde6bd6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -25,8 +25,10 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -109,8 +111,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private final AtomicBoolean started = new AtomicBoolean(); private final JmsSessionInfo sessionInfo; private final ReentrantLock sendLock = new ReentrantLock(); - private volatile ExecutorService deliveryExecutor; - private volatile ExecutorService completionExcecutor; + private volatile ThreadPoolExecutor deliveryExecutor; + private volatile ThreadPoolExecutor completionExcecutor; private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>(); private AtomicReference<Thread> completionThread = new AtomicReference<Thread>(); @@ -155,10 +157,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe throw e; } - - // Start the completion executor now as it's needed throughout the - // lifetime of the Session. - getCompletionExecutor(); } int acknowledgementMode() { @@ -308,6 +306,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe sessionInfo.setState(ResourceState.CLOSED); setFailureCause(cause); stop(); + for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) { consumer.shutdown(cause); } @@ -324,13 +323,14 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe cause = new JMSException("Session closed remotely before message transfer result was notified"); } - completionExcecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause))); - completionExcecutor.shutdown(); - try { - completionExcecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.trace("Session close awaiting send completions was interrupted"); - } + getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause))); + getCompletionExecutor().shutdown(); + } + + try { + getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.trace("Session close awaiting send completions was interrupted"); } } } @@ -1041,7 +1041,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } Executor getDispatcherExecutor() { - ExecutorService exec = deliveryExecutor; + ThreadPoolExecutor exec = deliveryExecutor; if (exec == null) { synchronized (sessionInfo) { if (deliveryExecutor == null) { @@ -1059,18 +1059,24 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe return exec; } - Executor getCompletionExecutor() { - ExecutorService exec = completionExcecutor; + private ExecutorService getCompletionExecutor() { + ThreadPoolExecutor exec = completionExcecutor; if (exec == null) { synchronized (sessionInfo) { - if (completionExcecutor == null) { - if (!closed.get()) { - completionExcecutor = exec = createExecutor("completion dispatcher", completionThread);; - } else { - return NoOpExecutor.INSTANCE; + exec = completionExcecutor; + if (exec == null) { + exec = createExecutor("completion dispatcher", completionThread); + + // Ensure work thread is fully up before allowing other threads + // to attempt to execute on this instance. + Future<?> starter = exec.submit(() -> {}); + try { + starter.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.trace("Completion Executor starter task failed: {}", e.getMessage()); } - } else { - exec = completionExcecutor; + + completionExcecutor = exec; } } } @@ -1078,11 +1084,21 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe return exec; } - private ExecutorService createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) { + private ThreadPoolExecutor createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true, threadTracker)); - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy() { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + // Completely ignore the task if the session has closed. + if (!closed.get()) { + LOG.trace("Task {} rejected from executor: {}", r, e); + super.rejectedExecution(r, e); + } + } + }); return executor; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java index 2e0e829..b4e9f06 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java @@ -16,17 +16,23 @@ */ package org.apache.qpid.jms.util; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Simple ThreadFactory object */ public class QpidJMSThreadFactory implements ThreadFactory { - private String threadName; - private boolean daemon; - private AtomicReference<Thread> threadTracker; + private static final Logger LOG = LoggerFactory.getLogger(QpidJMSThreadFactory.class); + + private final String threadName; + private final boolean daemon; + private final AtomicReference<Thread> threadTracker; /** * Creates a new Thread factory that will create threads with the @@ -40,6 +46,7 @@ public class QpidJMSThreadFactory implements ThreadFactory { public QpidJMSThreadFactory(String threadName, boolean daemon) { this.threadName = threadName; this.daemon = daemon; + this.threadTracker = null; } /** @@ -86,6 +93,15 @@ public class QpidJMSThreadFactory implements ThreadFactory { Thread thread = new Thread(runner, threadName); thread.setDaemon(daemon); + thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + + @Override + public void uncaughtException(Thread target, Throwable error) { + LOG.warn("Thread: {} failed due to an uncaught exception: {}", target.getName(), error.getMessage()); + LOG.trace("Uncaught Stacktrace: ", error); + } + }); + return thread; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 24f5b8a..7cda887 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -1019,6 +1019,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } } + @Repeat(repetitions = 1) @Test(timeout = 20000) public void testRemotelyCloseProducerWithSendWaitingForCredit() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org