Author: rajdavies
Date: Wed Mar 19 09:07:54 2008
New Revision: 638910
URL: http://svn.apache.org/viewvc?rev=638910&view=rev
Log:
Added separately configurable initial delay for timeout tasks on
InactivityMonitor
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
Wed Mar 19 09:07:54 2008
@@ -259,9 +259,20 @@
return l == null ? 0 : l.longValue();
}
- public void seMaxInactivityDuration(long maxInactivityDuration) throws
IOException {
+ public void setMaxInactivityDuration(long maxInactivityDuration) throws
IOException {
setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
}
+
+ public long getMaxInactivityDurationInitalDelay() throws IOException {
+ Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
+ return l == null ? 0 : l.longValue();
+ }
+
+ public void setMaxInactivityDurationInitalDelay(long
maxInactivityDurationInitalDelay) throws IOException {
+ setProperty("MaxInactivityDurationInitalDelay", new
Long(maxInactivityDurationInitalDelay));
+ }
+
+
/**
* @throws IOException
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
Wed Mar 19 09:07:54 2008
@@ -36,7 +36,8 @@
private boolean cacheEnabled = true;
private boolean tightEncodingEnabled = true;
private boolean sizePrefixDisabled;
- private long maxInactivityDuration = 30 * 1000;
+ private long maxInactivityDuration = 30*1000;
+ private long maxInactivityDurationInitalDelay = 10*1000;
private int cacheSize = 1024;
public WireFormat createWireFormat() {
@@ -49,7 +50,8 @@
info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
info.setTightEncodingEnabled(tightEncodingEnabled);
info.setSizePrefixDisabled(sizePrefixDisabled);
- info.seMaxInactivityDuration(maxInactivityDuration);
+ info.setMaxInactivityDuration(maxInactivityDuration);
+
info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
info.setCacheSize(cacheSize);
} catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Could not
configure WireFormatInfo");
@@ -124,5 +126,14 @@
public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
+ }
+
+ public long getMaxInactivityDurationInitalDelay() {
+ return maxInactivityDurationInitalDelay;
+ }
+
+ public void setMaxInactivityDurationInitalDelay(
+ long maxInactivityDurationInitalDelay) {
+ this.maxInactivityDurationInitalDelay =
maxInactivityDurationInitalDelay;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Wed Mar 19 09:07:54 2008
@@ -23,6 +23,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
@@ -51,6 +52,7 @@
private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
@@ -59,6 +61,7 @@
private long readCheckTime;
private long writeCheckTime;
+ private long initialDelayTime;
private final Runnable readChecker = new Runnable() {
long lastRunTime;
@@ -107,7 +110,7 @@
}
public void stop() throws Exception {
- stopMonitorThreads();
+ closeDown();
next.stop();
}
@@ -125,12 +128,15 @@
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
- try {
- KeepAliveInfo info = new KeepAliveInfo();
- info.setResponseRequired(true);
- oneway(info);
- } catch (IOException e) {
- onException(e);
+ if (stopped.get() == false) {
+ try {
+
+ KeepAliveInfo info = new KeepAliveInfo();
+ info.setResponseRequired(true);
+ oneway(info);
+ } catch (IOException e) {
+ onException(e);
+ }
}
};
});
@@ -155,9 +161,10 @@
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for "
+ toString() + "! Throwing InactivityIOException.");
}
+ closeDown();
ASYNC_TASKS.execute(new Runnable() {
public void run() {
- handleException(new InactivityIOException("Channel was
inactive for too long: "+next.getRemoteAddress()));
+ onException(new InactivityIOException("Channel was
inactive for too long: "+next.getRemoteAddress()));
};
});
@@ -218,15 +225,17 @@
synchronized(inSend) {
inSend.set(true);
try {
+
+ if( failed.get() ) {
+ closeDown();
+ throw new InactivityIOException("Channel was inactive for
too long: "+next.getRemoteAddress());
+ }
if (o.getClass() == WireFormatInfo.class) {
synchronized (this) {
localWireFormatInfo = (WireFormatInfo)o;
startMonitorThreads();
}
}
- if( failed.get() ) {
- throw new InactivityIOException("Channel was inactive for
too long: "+next.getRemoteAddress());
- }
next.oneway(o);
} finally {
commandSent.set(true);
@@ -236,17 +245,18 @@
}
public void onException(IOException error) {
- if( !failed.getAndSet(true) ) {
- handleException(error);
- }
+ closeDown();
+ if (!failed.getAndSet(true)) {
+ transportListener.onException(error);
+ }
+ }
+
+ private void closeDown() {
+ stopped.set(true);
+ if (monitorStarted.get()) {
+ stopMonitorThreads();
+ }
}
-
- private void handleException(IOException error) {
- if (monitorStarted.get()) {
- stopMonitorThreads();
- }
- transportListener.onException(error);
- }
private synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {
@@ -260,6 +270,7 @@
}
readCheckTime =
Math.min(localWireFormatInfo.getMaxInactivityDuration(),
remoteWireFormatInfo.getMaxInactivityDuration());
+ initialDelayTime =
Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(),
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
if (readCheckTime > 0) {
monitorStarted.set(true);
writeCheckerTask = new SchedulerTimerTask(writeChecker);
@@ -271,8 +282,8 @@
WRITE_CHECK_TIMER = new Timer("InactivityMonitor
WriteCheck");
}
CHECKER_COUNTER++;
- WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask,
writeCheckTime,writeCheckTime);
- READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask,
readCheckTime,readCheckTime);
+ WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask,
initialDelayTime,writeCheckTime);
+ READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask,
initialDelayTime,readCheckTime);
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
Wed Mar 19 09:07:54 2008
@@ -45,6 +45,7 @@
}
public void setTransport(Transport transport) {
this.transport = transport;
+ this.transport.setTransportListener(this);
}
public URI getUri() {
return uri;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Wed Mar 19 09:07:54 2008
@@ -107,9 +107,12 @@
public boolean iterate() {
boolean result=false;
boolean buildBackup=true;
- if (connectedTransport.get()==null && !disposed) {
- result=doReconnect();
- buildBackup=false;
+ boolean doReconnect = !disposed;
+ synchronized(backupMutex) {
+ if (connectedTransport.get()==null && !disposed) {
+ result=doReconnect();
+ buildBackup=false;
+ }
}
if(buildBackup) {
buildBackups();
@@ -253,6 +256,10 @@
started = false;
disposed = true;
connected = false;
+ for (BackupTransport t:backups) {
+ t.setDisposed(true);
+ }
+ backups.clear();
if (connectedTransport.get() != null) {
transportToStop = connectedTransport.getAndSet(null);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
Wed Mar 19 09:07:54 2008
@@ -186,7 +186,7 @@
});
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
- info.seMaxInactivityDuration(1000);
+ info.setMaxInactivityDuration(1000);
clientTransport.oneway(info);
assertEquals(0, serverErrorCount.get());