Updated Branches: refs/heads/trunk 190a44bf2 -> a059bf4a9
https://issues.apache.org/jira/browse/AMQ-5051 fix potential NPE Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a059bf4a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a059bf4a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a059bf4a Branch: refs/heads/trunk Commit: a059bf4a9013539899bfdf49aebd868b867e2b82 Parents: 190a44b Author: Timothy Bish <[email protected]> Authored: Thu Feb 13 16:39:12 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Thu Feb 13 16:39:12 2014 -0500 ---------------------------------------------------------------------- .../activemq/transport/mqtt/MQTTInactivityMonitor.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a059bf4a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java index adaf38b..685bb60 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -63,6 +63,7 @@ public class MQTTInactivityMonitor extends TransportFilter { private final Runnable readChecker = new Runnable() { long lastReceiveTime = System.currentTimeMillis(); + @Override public void run() { long now = System.currentTimeMillis(); @@ -86,6 +87,7 @@ public class MQTTInactivityMonitor extends TransportFilter { LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException."); } ASYNC_TASKS.execute(new Runnable() { + @Override public void run() { onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress())); } @@ -102,16 +104,19 @@ public class MQTTInactivityMonitor extends TransportFilter { super(next); } + @Override public void start() throws Exception { next.start(); startMonitorThread(); } + @Override public void stop() throws Exception { stopMonitorThread(); next.stop(); } + @Override public void onCommand(Object command) { inReceive.set(true); try { @@ -121,6 +126,7 @@ public class MQTTInactivityMonitor extends TransportFilter { } } + @Override public void oneway(Object o) throws IOException { // To prevent the inactivity monitor from sending a message while we // are performing a send we take the lock. @@ -140,13 +146,13 @@ public class MQTTInactivityMonitor extends TransportFilter { next.oneway(command); } + @Override public void onException(IOException error) { if (failed.compareAndSet(false, true)) { stopMonitorThread(); if (protocolConverter != null) { protocolConverter.onTransportError(); } - protocolConverter.onTransportError(); transportListener.onException(error); } } @@ -236,7 +242,8 @@ public class MQTTInactivityMonitor extends TransportFilter { } } - private ThreadFactory factory = new ThreadFactory() { + private final ThreadFactory factory = new ThreadFactory() { + @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable); thread.setDaemon(true);
