Author: rajdavies
Date: Mon Mar 17 06:32:31 2008
New Revision: 637881
URL: http://svn.apache.org/viewvc?rev=637881&view=rev
Log:
Interrupt the writing thread on failure
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=637881&r1=637880&r2=637881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Mon Mar 17 06:32:31 2008
@@ -56,7 +56,7 @@
private final AtomicBoolean inReceive = new AtomicBoolean(false);
private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask;
-
+ private Thread writeThread;
private long readCheckTime;
private long writeCheckTime;
@@ -154,13 +154,16 @@
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " +
toString() + "! Throwing InactivityIOException.");
}
-
-
- // TODO: use a thread pool for this..
- ASYNC_TASKS.execute(new Runnable() {
+ ASYNC_TASKS.execute(new Runnable() {
public void run() {
- onException(new InactivityIOException("Channel was
inactive for too long: "+next.getRemoteAddress()));
+ Thread t = writeThread;
+ if (t != null) {
+ t.interrupt();
+ }
+ onException(new InactivityIOException("Channel was
inactive for too long: "+next.getRemoteAddress()));
+
};
+
});
} else {
@@ -221,9 +224,11 @@
}
}
synchronized (writeChecker) {
+ writeThread=Thread.currentThread();
next.oneway(o);
}
} finally {
+ writeThread=null;
commandSent.set(true);
inSend.set(false);
}