Repository: storm
Updated Branches:
  refs/heads/1.x-branch 9d039588f -> 58cbfe5e2


Merge branch 'STORM-1837-2' of https://github.com/srdo/storm into STORM-1837

STORM-1837: Fix complete-topology and prevent message loss


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9899ac3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9899ac3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9899ac3

Branch: refs/heads/1.x-branch
Commit: b9899ac3050651d8c6ac6f64c31cdcdc369b22db
Parents: 9d03958
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 19 15:12:25 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 19 15:27:13 2016 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/testing.clj |  3 +-
 .../apache/storm/messaging/local/Context.java   | 70 +++++++++++++++++---
 .../org/apache/storm/testing4j_test.clj         | 25 ++++---
 3 files changed, 79 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj 
b/storm-core/src/clj/org/apache/storm/testing.clj
index 5e5700c..565d1b9 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -540,7 +540,8 @@
       (startup spout))
 
     (submit-local-topology (:nimbus cluster-map) storm-name storm-conf 
topology)
-    (advance-cluster-time cluster-map 11)
+    (when (Time/isSimulating)
+      (advance-cluster-time cluster-map 11))
 
     (let [storm-id (common/get-storm-id state storm-name)]
       ;;Give the topology time to come up without using it to wait for the 
spouts to complete

http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java 
b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
index 4f0ba1f..7300847 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -27,8 +27,12 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 
-import org.apache.storm.Config;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import org.apache.storm.grouping.Load;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.TaskMessage;
@@ -39,7 +43,7 @@ public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
 
     private static class LocalServer implements IConnection {
-        IConnectionCallback _cb;
+        volatile IConnectionCallback _cb;
         final ConcurrentHashMap<Integer, Double> _load = new 
ConcurrentHashMap<>();
 
         @Override
@@ -82,31 +86,76 @@ public class Context implements IContext {
 
     private static class LocalClient implements IConnection {
         private final LocalServer _server;
+        //Messages sent before the server registered a callback
+        private final LinkedBlockingQueue<TaskMessage> 
_pendingDueToUnregisteredServer;
+        private final ScheduledExecutorService _pendingFlusher;
 
         public LocalClient(LocalServer server) {
             _server = server;
+            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
+            _pendingFlusher = Executors.newScheduledThreadPool(1, new 
ThreadFactory(){
+                @Override
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable);
+                    thread.setName("LocalClientFlusher-" + thread.getId());
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
+                @Override
+                public void run(){
+                    try {
+                        //Ensure messages are flushed even if no more sends 
are performed
+                        flushPending();
+                    } catch (Throwable t) {
+                        LOG.error("Uncaught throwable in pending message 
flusher thread, messages may be lost", t);
+                        throw new RuntimeException(t);
+                    }
+                }
+            }, 5, 5, TimeUnit.SECONDS);
         }
 
         @Override
         public void registerRecv(IConnectionCallback cb) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
         }
-
+        
+        private void flushPending(){
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null && 
!_pendingDueToUnregisteredServer.isEmpty()) {
+                ArrayList<TaskMessage> ret = new ArrayList<>();
+                _pendingDueToUnregisteredServer.drainTo(ret);
+                serverCb.recv(ret);
+            }
+        }
+        
         @Override
         public void send(int taskId,  byte[] payload) {
-            if (_server._cb != null) {
-                _server._cb.recv(Arrays.asList(new TaskMessage(taskId, 
payload)));
+            TaskMessage message = new TaskMessage(taskId, payload);
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null) {
+                flushPending();
+                serverCb.recv(Arrays.asList(message));
+            } else {
+                _pendingDueToUnregisteredServer.add(message);
             }
         }
  
         @Override
         public void send(Iterator<TaskMessage> msgs) {
-            if (_server._cb != null) {
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null) {
+                flushPending();
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 while (msgs.hasNext()) {
                     ret.add(msgs.next());
                 }
-                _server._cb.recv(ret);
+                serverCb.recv(ret);
+            } else {
+                while(msgs.hasNext()){
+                    _pendingDueToUnregisteredServer.add(msgs.next());
+                }
             }
         }
 
@@ -122,7 +171,12 @@ public class Context implements IContext {
  
         @Override
         public void close() {
-            //NOOP
+            _pendingFlusher.shutdown();
+            try{
+                _pendingFlusher.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e){
+                throw new RuntimeException("Interrupted while awaiting flusher 
shutdown", e);
+            }
         }
     };
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj 
b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index cd139d7..5f71e86 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -59,15 +59,8 @@
                                                                 (is 
(Time/isSimulating)))))
     (is (not (Time/isSimulating)))))
 
-(deftest test-complete-topology
-  (doseq [zmq-on? [true false]
-          :let [daemon-conf (doto (Config.)
-                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
-                mk-cluster-param (doto (MkClusterParam.)
-                                   (.setSupervisors (int 4))
-                                   (.setDaemonConf daemon-conf))]]
-    (Testing/withSimulatedTimeLocalCluster
-     (reify TestJob
+(def complete-topology-testjob
+  (reify TestJob
        (^void run [this ^ILocalCluster cluster]
          (let [topology (thrift/mk-topology
                          {"1" (thrift/mk-spout-spec (TestWordSpout. true) 
:parallelism-hint 3)}
@@ -97,7 +90,19 @@
                   (Testing/readTuples results "3")))
            (is (= [[1] [2] [3] [4]]
                   (Testing/readTuples results "4")))
-           ))))))
+           ))))
+
+(deftest test-complete-topology
+  (doseq [zmq-on? [true false]
+          :let [daemon-conf (doto (Config.)
+                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
+                mk-cluster-param (doto (MkClusterParam.)
+                                   (.setSupervisors (int 4))
+                                   (.setDaemonConf daemon-conf))]]
+    (Testing/withSimulatedTimeLocalCluster
+      mk-cluster-param complete-topology-testjob )
+    (Testing/withLocalCluster
+      mk-cluster-param complete-topology-testjob)))
 
 (deftest test-with-tracked-cluster
   (Testing/withTrackedCluster

Reply via email to