Author: chirino
Date: Wed Mar 19 10:19:32 2008
New Revision: 638942
URL: http://svn.apache.org/viewvc?rev=638942&view=rev
Log:
Fix for inactivity exceptions not getting generated due to previous change.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=638942&r1=638941&r2=638942&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Wed Mar 19 10:19:32 2008
@@ -52,7 +52,6 @@
private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);
- private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
@@ -110,7 +109,7 @@
}
public void stop() throws Exception {
- closeDown();
+ stopMonitorThreads();
next.stop();
}
@@ -128,7 +127,7 @@
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
- if (stopped.get() == false) {
+ if (monitorStarted.get()) {
try {
KeepAliveInfo info = new KeepAliveInfo();
@@ -157,19 +156,15 @@
return;
}
if (!commandReceived.get()) {
- if( !failed.getAndSet(true) ) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No message received since last read check for "
+ toString() + "! Throwing InactivityIOException.");
- }
- closeDown();
- ASYNC_TASKS.execute(new Runnable() {
- public void run() {
- onException(new InactivityIOException("Channel was
inactive for too long: "+next.getRemoteAddress()));
- };
-
- });
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No message received since last read check for " +
toString() + "! Throwing InactivityIOException.");
}
-
+ ASYNC_TASKS.execute(new Runnable() {
+ public void run() {
+ onException(new InactivityIOException("Channel was
inactive for too long: "+next.getRemoteAddress()));
+ };
+
+ });
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting
flag: ");
@@ -227,7 +222,6 @@
try {
if( failed.get() ) {
- closeDown();
throw new InactivityIOException("Channel was inactive for
too long: "+next.getRemoteAddress());
}
if (o.getClass() == WireFormatInfo.class) {
@@ -245,18 +239,11 @@
}
public void onException(IOException error) {
- closeDown();
if (!failed.compareAndSet(false,true)) {
- transportListener.onException(error);
- }
- }
-
- private void closeDown() {
- stopped.set(true);
- if (monitorStarted.get()) {
- stopMonitorThreads();
+ stopMonitorThreads();
+ transportListener.onException(error);
}
- }
+ }
private synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {