Author: jstrachan
Date: Thu Aug  3 04:16:18 2006
New Revision: 428339

URL: http://svn.apache.org/viewvc?rev=428339&view=rev
Log:
Added a modified version of the patch donated to (LINGO-22) submitted by Jim 
Beattie which tests that we leave no threads around after shutting down the JMS 
client and broker, together with making the Session Executor / 
TaskRunnerFactory part of the connection. We could still make it a singleton if 
required and just use reference counting to ensure its shutdown properly after 
all connections are closed

Added:
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
   (with props)
Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 Thu Aug  3 04:16:18 2006
@@ -97,7 +97,7 @@
 
 public class ActiveMQConnection implements Connection, TopicConnection, 
QueueConnection, StatsCapable, Closeable,  StreamConnection, TransportListener {
 
-    public static final TaskRunnerFactory SESSION_TASK_RUNNER = new 
TaskRunnerFactory("ActiveMQ Session 
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
+    private TaskRunnerFactory sessionTaskRunner = new 
TaskRunnerFactory("ActiveMQ Session 
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
     private final ThreadPoolExecutor asyncConnectionThread;
 
     private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
@@ -572,9 +572,10 @@
 
                     started.set(false);
 
-                    // TODO : ActiveMQConnectionFactory.onConnectionClose() not
-                    // yet implemented.
+                    // TODO if we move the TaskRunnerFactory to the connection 
factory
+                    // then we may need to call
                     // factory.onConnectionClose(this);
+                    sessionTaskRunner.shutdown();
 
                     closed.set(true);
                     closing.set(false);
@@ -856,6 +857,15 @@
     public void removeTransportListener(TransportListener transportListener) {
         transportListeners.remove(transportListener);
     }
+    
+    public TaskRunnerFactory getSessionTaskRunner() {
+        return sessionTaskRunner;
+    }
+
+    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
+        this.sessionTaskRunner = sessionTaskRunner;
+    }
+
     
     // Implementation methods
     // 
-------------------------------------------------------------------------

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
 Thu Aug  3 04:16:18 2006
@@ -102,7 +102,7 @@
         if( !messageQueue.isRunning() ) {
             messageQueue.start();
             if( session.isSessionAsyncDispatch() || dispatchedBySessionPool ) {
-                taskRunner = 
ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ 
Session: "+session.getSessionId());
+                taskRunner = 
session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ 
Session: "+session.getSessionId());
             }
             wakeup();
         }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
 Thu Aug  3 04:16:18 2006
@@ -18,6 +18,7 @@
 package org.apache.activemq.thread;
 
 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
@@ -37,7 +38,7 @@
  */
 public class TaskRunnerFactory {
 
-    private Executor executor;
+    private ExecutorService executor;
     private int maxIterationsPerRun;
     private String name;
     private int priority;
@@ -61,9 +62,14 @@
         } else {
             executor = createDefaultExecutor();
         }
-    
     }
 
+    public void shutdown() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+    
     public TaskRunner createTaskRunner(Task task, String name) {
         if( executor!=null ) {
             return new PooledTaskRunner(executor, task, maxIterationsPerRun);
@@ -72,8 +78,7 @@
         }
     }
     
-    protected Executor createDefaultExecutor() {
-        
+    protected ExecutorService createDefaultExecutor() {
         ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 
10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, name);
@@ -84,7 +89,6 @@
         });
         rc.allowCoreThreadTimeOut(true);
         return rc;
-            
     }
 
 }

Added: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java?rev=428339&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
 Thu Aug  3 04:16:18 2006
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.spring.ConsumerBean;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest extends 
TestCase {
+
+    public static interface Task {
+        public void execute() throws Exception;
+    }
+
+    public void setUp() throws Exception {
+    }
+
+    public void testStartAndStopClientAndBrokerAndCheckNoThreadsAreLeft() 
throws Exception {
+        runTest(new Task() {
+
+            public void execute() throws Exception {
+                BrokerService broker = new BrokerService();
+                broker.setPersistent(false);
+                broker.start();
+
+                ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+                Connection connection = factory.createConnection();
+                connection.start();
+                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                Queue destination = session.createQueue(getName());
+
+                // consumer
+                MessageConsumer consumer = session.createConsumer(destination);
+                ConsumerBean listener = new ConsumerBean();
+                consumer.setMessageListener(listener);
+
+                // producer
+                MessageProducer producer = session.createProducer(destination);
+                TextMessage message = session.createTextMessage("Hello 
World!");
+                producer.send(message);
+                producer.close();
+
+                listener.assertMessagesArrived(1);
+
+                consumer.close();
+                session.close();
+                connection.close();
+
+                broker.stop();
+            }
+        });
+    }
+
+    public void runTest(Task task) throws Exception {
+        int numThreads = Thread.currentThread().getThreadGroup().activeCount();
+        Thread.currentThread().getThreadGroup().list();
+
+        task.execute();
+
+        Thread.yield();
+        Thread.sleep(2000); // Wait for the threads to exit on their own
+
+        Thread.currentThread().getThreadGroup().list();
+        assertEquals(numThreads, 
Thread.currentThread().getThreadGroup().activeCount());
+    }
+}

Propchange: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to