Author: chirino
Date: Mon Apr 17 10:04:59 2006
New Revision: 394729
URL: http://svn.apache.org/viewcvs?rev=394729&view=rev
Log:
An async error could cause a deadlock when using the VM transport since all
it's operations are sync. The error handling is now done in an async thread to
avoid the deadlock.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=394729&r1=394728&r2=394729&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Mon Apr 17 10:04:59 2006
@@ -88,12 +88,19 @@
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingDeque;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class ActiveMQConnection implements Connection, TopicConnection,
QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
public static final TaskRunnerFactory SESSION_TASK_RUNNER = new
TaskRunnerFactory("session
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
+ private final Executor asyncConnectionThread;
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
private static final IdGenerator connectionIdGenerator = new IdGenerator();
@@ -165,6 +172,14 @@
*/
protected ActiveMQConnection(Transport transport, JMSStatsImpl
factoryStats)
throws Exception {
+
+ // Configure a single threaded executor who's core thread can timeout
if idle
+ asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS,
new LinkedBlockingQueue(), new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "Connection task");
+ }});
+ asyncConnectionThread.allowCoreThreadTimeOut(true);
+
this.info = new ConnectionInfo(new
ConnectionId(connectionIdGenerator.generateId()));
this.info.setManageable(true);
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
@@ -1388,7 +1403,7 @@
/**
* @param command - the command to consume
*/
- public void onCommand(Command command) {
+ public void onCommand(final Command command) {
if (!closed.get() && command != null) {
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
@@ -1416,7 +1431,13 @@
onControlCommand((ControlCommand) command);
}
else if (command.getDataStructureType() ==
ConnectionError.DATA_STRUCTURE_TYPE) {
- onAsyncException(((ConnectionError)command).getException());
+ asyncConnectionThread.execute(new Runnable(){
+ public void run() {
+
onAsyncException(((ConnectionError)command).getException());
+ }
+ });
+ new Thread("Async error worker") {
+ }.start();
}else if (command instanceof ConnectionControl){
onConnectionControl((ConnectionControl) command);
}else if (command instanceof ConsumerControl){
@@ -1437,25 +1458,37 @@
public void onAsyncException(Throwable error) {
if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) {
+
if (!(error instanceof JMSException))
error = JMSExceptionSupport.create(error);
- this.exceptionListener.onException((JMSException) error);
+ final JMSException e = (JMSException) error;
+
+ asyncConnectionThread.execute(new Runnable(){
+ public void run() {
+
ActiveMQConnection.this.exceptionListener.onException(e);
+ }
+ });
+
} else {
log.warn("Async exception with no exception listener: " +
error, error);
}
}
}
- public void onException(IOException error) {
+ public void onException(final IOException error) {
onAsyncException(error);
- transportFailed(error);
- ServiceSupport.dispose(this.transport);
- brokerInfoReceived.countDown();
-
- for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
- TransportListener listener = (TransportListener) iter.next();
- listener.onException(error);
- }
+ asyncConnectionThread.execute(new Runnable(){
+ public void run() {
+ transportFailed(error);
+ ServiceSupport.dispose(ActiveMQConnection.this.transport);
+ brokerInfoReceived.countDown();
+
+ for (Iterator iter = transportListeners.iterator();
iter.hasNext();) {
+ TransportListener listener = (TransportListener)
iter.next();
+ listener.onException(error);
+ }
+ }
+ });
}
public void transportInterupted() {
@@ -1781,4 +1814,4 @@
-}
\ No newline at end of file
+}