Author: gtully
Date: Fri Aug 7 16:49:07 2009
New Revision: 802086
URL: http://svn.apache.org/viewvc?rev=802086&view=rev
Log:
make keepAliveResponseRequired optional in the inactivity monitor -
https://issues.apache.org/activemq/browse/AMQ-2351
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=802086&r1=802085&r2=802086&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Fri Aug 7 16:49:07 2009
@@ -62,7 +62,7 @@
private long readCheckTime;
private long writeCheckTime;
private long initialDelayTime;
-
+ private boolean keepAliveResponseRequired;
private WireFormat wireFormat;
private final Runnable readChecker = new Runnable() {
@@ -126,7 +126,7 @@
}
if (!commandSent.get()) {
- if(LOG.isTraceEnabled()) {
+ if (LOG.isTraceEnabled()) {
LOG.trace("No message sent since last write check, sending a
KeepAliveInfo");
}
ASYNC_TASKS.execute(new Runnable() {
@@ -135,7 +135,7 @@
try {
KeepAliveInfo info = new KeepAliveInfo();
- info.setResponseRequired(true);
+
info.setResponseRequired(keepAliveResponseRequired);
oneway(info);
} catch (IOException e) {
onException(e);
@@ -247,7 +247,11 @@
stopMonitorThreads();
transportListener.onException(error);
}
- }
+ }
+
+ public void setKeepAliveResponseRequired(boolean val) {
+ keepAliveResponseRequired = val;
+ }
private synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {
@@ -266,7 +270,7 @@
monitorStarted.set(true);
writeCheckerTask = new SchedulerTimerTask(writeChecker);
readCheckerTask = new SchedulerTimerTask(readChecker);
- writeCheckTime = readCheckTime/3;
+ writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
synchronized( InactivityMonitor.class ) {
if( CHECKER_COUNTER == 0 ) {
READ_CHECK_TIMER = new Timer("InactivityMonitor
ReadCheck",true);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=802086&r1=802085&r2=802086&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
Fri Aug 7 16:49:07 2009
@@ -99,7 +99,9 @@
boolean useInactivityMonitor = "true".equals(getOption(options,
"useInactivityMonitor", "true"));
if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
transport = new InactivityMonitor(transport, format);
+ IntrospectionSupport.setProperties(transport, options);
}
+
// Only need the WireFormatNegotiator if using openwire
if (format instanceof OpenWireFormat) {