STORM-1837: Check for null callback before flushing messages, ensure flusher 
thread doesn't swallow exceptions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e740c32a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e740c32a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e740c32a

Branch: refs/heads/master
Commit: e740c32a0e19b71fda5e0a72ef6c55d1685a213b
Parents: db1e914
Author: Stig Rohde Døssing <stigdoess...@gmail.com>
Authored: Fri Aug 26 16:09:39 2016 +0200
Committer: Stig Rohde Døssing <s...@it-minds.dk>
Committed: Mon Aug 29 09:48:18 2016 +0200

----------------------------------------------------------------------
 .../jvm/org/apache/storm/messaging/local/Context.java  | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e740c32a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java 
b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
index fd68733..b53b86b 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.messaging.local;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
 
     private static class LocalServer implements IConnection {
-        IConnectionCallback _cb;
+        volatile IConnectionCallback _cb;
         final ConcurrentHashMap<Integer, Double> _load = new 
ConcurrentHashMap<>();
 
         @Override
@@ -105,8 +106,12 @@ public class Context implements IContext {
             _pendingFlusher.scheduleAtFixedRate(new Runnable(){
                 @Override
                 public void run(){
-                    //Ensure messages are flushed even if no more sends are 
performed
-                    flushPending();
+                    try {
+                        //Ensure messages are flushed even if no more sends 
are performed
+                        flushPending();
+                    } catch (Throwable t) {
+                        LOG.error("Uncaught throwable in pending message 
flusher thread, messages may be lost", t);
+                    }
                 }
             }, 5, 5, TimeUnit.SECONDS);
         }
@@ -117,7 +122,7 @@ public class Context implements IContext {
         }
         
         private void flushPending(){
-            if (!_pendingDueToUnregisteredServer.isEmpty()) {
+            if (_server._cb != null && 
!_pendingDueToUnregisteredServer.isEmpty()) {
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 _pendingDueToUnregisteredServer.drainTo(ret);
                 _server._cb.recv(ret);

Reply via email to