Updated Branches: refs/heads/trunk e2a7d6af5 -> 6e68a3711
More improvements for AMQ-5043. Reworked the MQTT inactivity monitor so that it's more accurate. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6e68a371 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6e68a371 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6e68a371 Branch: refs/heads/trunk Commit: 6e68a3711561ed24bd78603453c81c5bbaf99f24 Parents: e2a7d6a Author: Hiram Chirino <[email protected]> Authored: Wed Feb 12 13:26:16 2014 -0500 Committer: Hiram Chirino <[email protected]> Committed: Wed Feb 12 13:26:16 2014 -0500 ---------------------------------------------------------------------- .../transport/mqtt/MQTTInactivityMonitor.java | 104 +++++++------------ .../transport/mqtt/MQTTProtocolConverter.java | 21 ++-- 2 files changed, 49 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6e68a371/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java index c2f3041..adaf38b 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; -import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.thread.SchedulerTimerTask; import org.apache.activemq.transport.AbstractInactivityMonitor; import org.apache.activemq.transport.InactivityIOException; @@ -50,46 +49,53 @@ public class MQTTInactivityMonitor extends TransportFilter { private final AtomicBoolean monitorStarted = new AtomicBoolean(false); private final AtomicBoolean failed = new AtomicBoolean(false); - private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); private final ReentrantLock sendLock = new ReentrantLock(); private SchedulerTimerTask readCheckerTask; - private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; - private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; + private long readGraceTime = DEFAULT_CHECK_TIME_MILLS; + private long readKeepAliveTime = DEFAULT_CHECK_TIME_MILLS; private boolean keepAliveResponseRequired; private MQTTProtocolConverter protocolConverter; private final Runnable readChecker = new Runnable() { - long lastRunTime; + long lastReceiveTime = System.currentTimeMillis(); public void run() { - long now = System.currentTimeMillis(); - long elapsed = (now - lastRunTime); - - if (lastRunTime != 0 && LOG.isDebugEnabled()) { - LOG.debug("" + elapsed + " ms elapsed since last read check."); - } - // Perhaps the timer executed a read check late.. and then executes - // the next read check on time which causes the time elapsed between - // read checks to be small.. - - // If less than 90% of the read check Time elapsed then abort this readcheck. - if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression. - LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); + long now = System.currentTimeMillis(); + int currentCounter = next.getReceiveCounter(); + int previousCounter = lastReceiveCounter.getAndSet(currentCounter); + + // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that + // should be sufficient to indicate the connection is still alive. If there were random data, or something + // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle + // PINGREQ/RESP explicitly here + if (inReceive.get() || currentCounter != previousCounter) { + if (LOG.isTraceEnabled()) { + LOG.trace("Command received since last read check."); + } + lastReceiveTime = now; return; } - lastRunTime = now; - readCheck(); + if( (now-lastReceiveTime) >= readKeepAliveTime+readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { + if (LOG.isDebugEnabled()) { + LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException."); + } + ASYNC_TASKS.execute(new Runnable() { + public void run() { + onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress())); + } + }); + } } }; private boolean allowReadCheck(long elapsed) { - return elapsed > (readCheckTime * 9 / 10); + return elapsed > (readGraceTime * 9 / 10); } public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { @@ -106,39 +112,7 @@ public class MQTTInactivityMonitor extends TransportFilter { next.stop(); } - final void readCheck() { - int currentCounter = next.getReceiveCounter(); - int previousCounter = lastReceiveCounter.getAndSet(currentCounter); - - // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that - // should be sufficient to indicate the connection is still alive. If there were random data, or something - // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle - // PINGREQ/RESP explicitly here - if (inReceive.get() || currentCounter != previousCounter) { - if (LOG.isTraceEnabled()) { - LOG.trace("A receive is in progress"); - } - return; - } - if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { - if (LOG.isDebugEnabled()) { - LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); - } - ASYNC_TASKS.execute(new Runnable() { - public void run() { - onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); - } - }); - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Message received since last read check, resetting flag: "); - } - } - commandReceived.set(false); - } - public void onCommand(Object command) { - commandReceived.set(true); inReceive.set(true); try { transportListener.onCommand(command); @@ -177,20 +151,20 @@ public class MQTTInactivityMonitor extends TransportFilter { } } - public long getReadCheckTime() { - return readCheckTime; + public long getReadGraceTime() { + return readGraceTime; } - public void setReadCheckTime(long readCheckTime) { - this.readCheckTime = readCheckTime; + public void setReadGraceTime(long readGraceTime) { + this.readGraceTime = readGraceTime; } - public long getInitialDelayTime() { - return initialDelayTime; + public long getReadKeepAliveTime() { + return readKeepAliveTime; } - public void setInitialDelayTime(long initialDelayTime) { - this.initialDelayTime = initialDelayTime; + public void setReadKeepAliveTime(long readKeepAliveTime) { + this.readKeepAliveTime = readKeepAliveTime; } public boolean isKeepAliveResponseRequired() { @@ -224,11 +198,11 @@ public class MQTTInactivityMonitor extends TransportFilter { return; } - if (readCheckTime > 0) { + if (readKeepAliveTime > 0) { readCheckerTask = new SchedulerTimerTask(readChecker); } - if (readCheckTime > 0) { + if (readKeepAliveTime > 0) { monitorStarted.set(true); synchronized (AbstractInactivityMonitor.class) { if (CHECKER_COUNTER == 0) { @@ -236,8 +210,8 @@ public class MQTTInactivityMonitor extends TransportFilter { READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true); } CHECKER_COUNTER++; - if (readCheckTime > 0) { - READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); + if (readKeepAliveTime > 0) { + READ_CHECK_TIMER.schedule(readCheckerTask, readKeepAliveTime, readGraceTime); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6e68a371/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index f7c3c1e..d19a0b3 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -30,7 +30,6 @@ import javax.jms.Message; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.*; import org.apache.activemq.store.PersistenceAdapterSupport; -import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; @@ -51,7 +50,7 @@ public class MQTTProtocolConverter { private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); - private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5; + private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5; private static final int DEFAULT_CACHE_SIZE = 5000; private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); @@ -609,24 +608,24 @@ public class MQTTProtocolConverter { } try { - - long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); - // if we have a default keep-alive value, and the client is trying to turn off keep-alive, + // we'll observe the server-side configured default value (note, no grace period) - if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) { - keepAliveMSWithGracePeriod = defaultKeepAlive; + if (keepAliveMS == 0 && defaultKeepAlive > 0) { + keepAliveMS = defaultKeepAlive; } + long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); + monitor.setProtocolConverter(this); - monitor.setReadCheckTime(keepAliveMSWithGracePeriod); - monitor.setInitialDelayTime(keepAliveMS); + monitor.setReadKeepAliveTime(keepAliveMS); + monitor.setReadGraceTime(readGracePeriod); monitor.startMonitorThread(); if (LOG.isDebugEnabled()) { LOG.debug("MQTT Client " + getClientId() + - " established heart beat of " + keepAliveMSWithGracePeriod + - " ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) + + " established heart beat of " + keepAliveMS + + " ms (" + keepAliveMS + "ms + " + readGracePeriod + "ms grace period)"); } } catch (Exception ex) {
