Author: jstrachan
Date: Thu Aug 3 04:16:18 2006
New Revision: 428339
URL: http://svn.apache.org/viewvc?rev=428339&view=rev
Log:
Added a modified version of the patch donated to (LINGO-22) submitted by Jim
Beattie which tests that we leave no threads around after shutting down the JMS
client and broker, together with making the Session Executor /
TaskRunnerFactory part of the connection. We could still make it a singleton if
required and just use reference counting to ensure its shutdown properly after
all connections are closed
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Aug 3 04:16:18 2006
@@ -97,7 +97,7 @@
public class ActiveMQConnection implements Connection, TopicConnection,
QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
- public static final TaskRunnerFactory SESSION_TASK_RUNNER = new
TaskRunnerFactory("ActiveMQ Session
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
+ private TaskRunnerFactory sessionTaskRunner = new
TaskRunnerFactory("ActiveMQ Session
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
private final ThreadPoolExecutor asyncConnectionThread;
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
@@ -572,9 +572,10 @@
started.set(false);
- // TODO : ActiveMQConnectionFactory.onConnectionClose() not
- // yet implemented.
+ // TODO if we move the TaskRunnerFactory to the connection
factory
+ // then we may need to call
// factory.onConnectionClose(this);
+ sessionTaskRunner.shutdown();
closed.set(true);
closing.set(false);
@@ -856,6 +857,15 @@
public void removeTransportListener(TransportListener transportListener) {
transportListeners.remove(transportListener);
}
+
+ public TaskRunnerFactory getSessionTaskRunner() {
+ return sessionTaskRunner;
+ }
+
+ public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
+ this.sessionTaskRunner = sessionTaskRunner;
+ }
+
// Implementation methods
//
-------------------------------------------------------------------------
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Thu Aug 3 04:16:18 2006
@@ -102,7 +102,7 @@
if( !messageQueue.isRunning() ) {
messageQueue.start();
if( session.isSessionAsyncDispatch() || dispatchedBySessionPool ) {
- taskRunner =
ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ
Session: "+session.getSessionId());
+ taskRunner =
session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ
Session: "+session.getSessionId());
}
wakeup();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=428339&r1=428338&r2=428339&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Thu Aug 3 04:16:18 2006
@@ -18,6 +18,7 @@
package org.apache.activemq.thread;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
@@ -37,7 +38,7 @@
*/
public class TaskRunnerFactory {
- private Executor executor;
+ private ExecutorService executor;
private int maxIterationsPerRun;
private String name;
private int priority;
@@ -61,9 +62,14 @@
} else {
executor = createDefaultExecutor();
}
-
}
+ public void shutdown() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
public TaskRunner createTaskRunner(Task task, String name) {
if( executor!=null ) {
return new PooledTaskRunner(executor, task, maxIterationsPerRun);
@@ -72,8 +78,7 @@
}
}
- protected Executor createDefaultExecutor() {
-
+ protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, name);
@@ -84,7 +89,6 @@
});
rc.allowCoreThreadTimeOut(true);
return rc;
-
}
}
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java?rev=428339&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
Thu Aug 3 04:16:18 2006
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.spring.ConsumerBean;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public class StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest extends
TestCase {
+
+ public static interface Task {
+ public void execute() throws Exception;
+ }
+
+ public void setUp() throws Exception {
+ }
+
+ public void testStartAndStopClientAndBrokerAndCheckNoThreadsAreLeft()
throws Exception {
+ runTest(new Task() {
+
+ public void execute() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.start();
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost");
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getName());
+
+ // consumer
+ MessageConsumer consumer = session.createConsumer(destination);
+ ConsumerBean listener = new ConsumerBean();
+ consumer.setMessageListener(listener);
+
+ // producer
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage("Hello
World!");
+ producer.send(message);
+ producer.close();
+
+ listener.assertMessagesArrived(1);
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ broker.stop();
+ }
+ });
+ }
+
+ public void runTest(Task task) throws Exception {
+ int numThreads = Thread.currentThread().getThreadGroup().activeCount();
+ Thread.currentThread().getThreadGroup().list();
+
+ task.execute();
+
+ Thread.yield();
+ Thread.sleep(2000); // Wait for the threads to exit on their own
+
+ Thread.currentThread().getThreadGroup().list();
+ assertEquals(numThreads,
Thread.currentThread().getThreadGroup().activeCount());
+ }
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java
------------------------------------------------------------------------------
svn:eol-style = native