STORM-1837: Guard against calls to registerRecv with null parameter by caching 
serverCb before using it


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ee37e57a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ee37e57a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ee37e57a

Branch: refs/heads/master
Commit: ee37e57a4bcdf93d85019314909c04143ef0a96a
Parents: 70ecffc
Author: Stig Rohde Døssing <stigdoess...@gmail.com>
Authored: Mon Sep 19 19:44:57 2016 +0200
Committer: Stig Rohde Døssing <stigdoess...@gmail.com>
Committed: Mon Sep 19 19:44:57 2016 +0200

----------------------------------------------------------------------
 .../org/apache/storm/messaging/local/Context.java    | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ee37e57a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java 
b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
index 83a8a99..7300847 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -122,19 +122,21 @@ public class Context implements IContext {
         }
         
         private void flushPending(){
-            if (_server._cb != null && 
!_pendingDueToUnregisteredServer.isEmpty()) {
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null && 
!_pendingDueToUnregisteredServer.isEmpty()) {
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 _pendingDueToUnregisteredServer.drainTo(ret);
-                _server._cb.recv(ret);
+                serverCb.recv(ret);
             }
         }
         
         @Override
         public void send(int taskId,  byte[] payload) {
             TaskMessage message = new TaskMessage(taskId, payload);
-            if (_server._cb != null) {
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null) {
                 flushPending();
-                _server._cb.recv(Arrays.asList(message));
+                serverCb.recv(Arrays.asList(message));
             } else {
                 _pendingDueToUnregisteredServer.add(message);
             }
@@ -142,13 +144,14 @@ public class Context implements IContext {
  
         @Override
         public void send(Iterator<TaskMessage> msgs) {
-            if (_server._cb != null) {
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null) {
                 flushPending();
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 while (msgs.hasNext()) {
                     ret.add(msgs.next());
                 }
-                _server._cb.recv(ret);
+                serverCb.recv(ret);
             } else {
                 while(msgs.hasNext()){
                     _pendingDueToUnregisteredServer.add(msgs.next());

Reply via email to