Repository: qpid-jms
Updated Branches:
  refs/heads/master cc00816c4 -> d9807984a


QPIDJMS-300 Fix for race on remote and local close

Possible race on local session close and remote producer close
can lead to final task to cleanup async completions not being
run and the executor in a failed shutdown state.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d9807984
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d9807984
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d9807984

Branch: refs/heads/master
Commit: d9807984ab90605ab1a4cb774439b4f7d4fece11
Parents: cc00816
Author: Timothy Bish <tabish...@gmail.com>
Authored: Fri Aug 4 12:38:50 2017 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Fri Aug 4 12:38:50 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 66 ++++++++++++--------
 .../qpid/jms/util/QpidJMSThreadFactory.java     | 22 ++++++-
 .../integration/ProducerIntegrationTest.java    |  1 +
 3 files changed, 61 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a821c18..bde6bd6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -25,8 +25,10 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -109,8 +111,8 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
     private final AtomicBoolean started = new AtomicBoolean();
     private final JmsSessionInfo sessionInfo;
     private final ReentrantLock sendLock = new ReentrantLock();
-    private volatile ExecutorService deliveryExecutor;
-    private volatile ExecutorService completionExcecutor;
+    private volatile ThreadPoolExecutor deliveryExecutor;
+    private volatile ThreadPoolExecutor completionExcecutor;
     private AtomicReference<Thread> deliveryThread = new 
AtomicReference<Thread>();
     private AtomicReference<Thread> completionThread = new 
AtomicReference<Thread>();
 
@@ -155,10 +157,6 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
 
             throw e;
         }
-
-        // Start the completion executor now as it's needed throughout the
-        // lifetime of the Session.
-        getCompletionExecutor();
     }
 
     int acknowledgementMode() {
@@ -308,6 +306,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
             sessionInfo.setState(ResourceState.CLOSED);
             setFailureCause(cause);
             stop();
+
             for (JmsMessageConsumer consumer : new 
ArrayList<JmsMessageConsumer>(this.consumers.values())) {
                 consumer.shutdown(cause);
             }
@@ -324,13 +323,14 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
                     cause = new JMSException("Session closed remotely before 
message transfer result was notified");
                 }
 
-                completionExcecutor.execute(new 
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
-                completionExcecutor.shutdown();
-                try {
-                    
completionExcecutor.awaitTermination(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    LOG.trace("Session close awaiting send completions was 
interrupted");
-                }
+                getCompletionExecutor().execute(new 
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
+                getCompletionExecutor().shutdown();
+            }
+
+            try {
+                
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.trace("Session close awaiting send completions was 
interrupted");
             }
         }
     }
@@ -1041,7 +1041,7 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
     }
 
     Executor getDispatcherExecutor() {
-        ExecutorService exec = deliveryExecutor;
+        ThreadPoolExecutor exec = deliveryExecutor;
         if (exec == null) {
             synchronized (sessionInfo) {
                 if (deliveryExecutor == null) {
@@ -1059,18 +1059,24 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
         return exec;
     }
 
-    Executor getCompletionExecutor() {
-        ExecutorService exec = completionExcecutor;
+    private ExecutorService getCompletionExecutor() {
+        ThreadPoolExecutor exec = completionExcecutor;
         if (exec == null) {
             synchronized (sessionInfo) {
-                if (completionExcecutor == null) {
-                    if (!closed.get()) {
-                        completionExcecutor = exec = 
createExecutor("completion dispatcher", completionThread);;
-                    } else {
-                        return NoOpExecutor.INSTANCE;
+                exec = completionExcecutor;
+                if (exec == null) {
+                    exec = createExecutor("completion dispatcher", 
completionThread);
+
+                    // Ensure work thread is fully up before allowing other 
threads
+                    // to attempt to execute on this instance.
+                    Future<?> starter = exec.submit(() -> {});
+                    try {
+                        starter.get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        LOG.trace("Completion Executor starter task failed: 
{}", e.getMessage());
                     }
-                } else {
-                    exec = completionExcecutor;
+
+                    completionExcecutor = exec;
                 }
             }
         }
@@ -1078,11 +1084,21 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
         return exec;
     }
 
-    private ExecutorService createExecutor(final String threadNameSuffix, 
AtomicReference<Thread> threadTracker) {
+    private ThreadPoolExecutor createExecutor(final String threadNameSuffix, 
AtomicReference<Thread> threadTracker) {
         ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
             new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] 
" + threadNameSuffix, true, threadTracker));
 
-        executor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardOldestPolicy());
+        executor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardOldestPolicy() {
+
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+                // Completely ignore the task if the session has closed.
+                if (!closed.get()) {
+                    LOG.trace("Task {} rejected from executor: {}", r, e);
+                    super.rejectedExecution(r, e);
+                }
+            }
+        });
 
         return executor;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
index 2e0e829..b4e9f06 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
@@ -16,17 +16,23 @@
  */
 package org.apache.qpid.jms.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Simple ThreadFactory object
  */
 public class QpidJMSThreadFactory implements ThreadFactory {
 
-    private String threadName;
-    private boolean daemon;
-    private AtomicReference<Thread> threadTracker;
+    private static final Logger LOG = 
LoggerFactory.getLogger(QpidJMSThreadFactory.class);
+
+    private final String threadName;
+    private final boolean daemon;
+    private final AtomicReference<Thread> threadTracker;
 
     /**
      * Creates a new Thread factory that will create threads with the
@@ -40,6 +46,7 @@ public class QpidJMSThreadFactory implements ThreadFactory {
     public QpidJMSThreadFactory(String threadName, boolean daemon) {
         this.threadName = threadName;
         this.daemon = daemon;
+        this.threadTracker = null;
     }
 
     /**
@@ -86,6 +93,15 @@ public class QpidJMSThreadFactory implements ThreadFactory {
 
         Thread thread = new Thread(runner, threadName);
         thread.setDaemon(daemon);
+        thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread target, Throwable error) {
+                LOG.warn("Thread: {} failed due to an uncaught exception: {}", 
target.getName(), error.getMessage());
+                LOG.trace("Uncaught Stacktrace: ", error);
+            }
+        });
+
         return thread;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 24f5b8a..7cda887 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -1019,6 +1019,7 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void testRemotelyCloseProducerWithSendWaitingForCredit() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to