STORM-1837: Check for null callback before flushing messages, ensure flusher thread doesn't swallow exceptions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e740c32a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e740c32a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e740c32a Branch: refs/heads/master Commit: e740c32a0e19b71fda5e0a72ef6c55d1685a213b Parents: db1e914 Author: Stig Rohde Døssing <stigdoess...@gmail.com> Authored: Fri Aug 26 16:09:39 2016 +0200 Committer: Stig Rohde Døssing <s...@it-minds.dk> Committed: Mon Aug 29 09:48:18 2016 +0200 ---------------------------------------------------------------------- .../jvm/org/apache/storm/messaging/local/Context.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e740c32a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java index fd68733..b53b86b 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java @@ -17,6 +17,7 @@ */ package org.apache.storm.messaging.local; +import java.lang.Thread.UncaughtExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,7 @@ public class Context implements IContext { private static final Logger LOG = LoggerFactory.getLogger(Context.class); private static class LocalServer implements IConnection { - IConnectionCallback _cb; + volatile IConnectionCallback _cb; final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>(); @Override @@ -105,8 +106,12 @@ public class Context implements IContext { _pendingFlusher.scheduleAtFixedRate(new Runnable(){ @Override public void run(){ - //Ensure messages are flushed even if no more sends are performed - flushPending(); + try { + //Ensure messages are flushed even if no more sends are performed + flushPending(); + } catch (Throwable t) { + LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t); + } } }, 5, 5, TimeUnit.SECONDS); } @@ -117,7 +122,7 @@ public class Context implements IContext { } private void flushPending(){ - if (!_pendingDueToUnregisteredServer.isEmpty()) { + if (_server._cb != null && !_pendingDueToUnregisteredServer.isEmpty()) { ArrayList<TaskMessage> ret = new ArrayList<>(); _pendingDueToUnregisteredServer.drainTo(ret); _server._cb.recv(ret);