Repository: storm
Updated Branches:
  refs/heads/master 8e6552ce4 -> 4ce6f04e8


STORM-1837: Fix complete-topology and prevent message loss when running local 
clusters with no time simulation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e7b12846
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e7b12846
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e7b12846

Branch: refs/heads/master
Commit: e7b128461366ba2597bb083529cb3e0c36ac3bd3
Parents: a536937
Author: Stig Rohde Døssing <s...@it-minds.dk>
Authored: Fri May 13 18:22:37 2016 +0200
Committer: Stig Rohde Døssing <s...@it-minds.dk>
Committed: Mon May 16 10:41:59 2016 +0200

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/testing.clj |  3 +-
 .../apache/storm/messaging/local/Context.java   | 49 ++++++++++++++++++--
 .../org/apache/storm/testing4j_test.clj         | 25 ++++++----
 3 files changed, 62 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e7b12846/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj 
b/storm-core/src/clj/org/apache/storm/testing.clj
index 5b2e80d..d0234bc 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -601,7 +601,8 @@
       (startup spout))
 
     (submit-local-topology (:nimbus cluster-map) storm-name storm-conf 
topology)
-    (advance-cluster-time cluster-map 11)
+    (when (Time/isSimulating)
+      (advance-cluster-time cluster-map 11))
 
     (let [storm-id (StormCommon/getStormId state storm-name)]
       ;;Give the topology time to come up without using it to wait for the 
spouts to complete

http://git-wip-us.apache.org/repos/asf/storm/blob/e7b12846/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java 
b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
index 4f0ba1f..fd68733 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -27,8 +27,12 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 
-import org.apache.storm.Config;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import org.apache.storm.grouping.Load;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.TaskMessage;
@@ -82,31 +86,68 @@ public class Context implements IContext {
 
     private static class LocalClient implements IConnection {
         private final LocalServer _server;
+        //Messages sent before the server registered a callback
+        private final LinkedBlockingQueue<TaskMessage> 
_pendingDueToUnregisteredServer;
+        private final ScheduledExecutorService _pendingFlusher;
 
         public LocalClient(LocalServer server) {
             _server = server;
+            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
+            _pendingFlusher = Executors.newScheduledThreadPool(1, new 
ThreadFactory(){
+                @Override
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable);
+                    thread.setName("LocalClientFlusher-" + thread.getId());
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
+                @Override
+                public void run(){
+                    //Ensure messages are flushed even if no more sends are 
performed
+                    flushPending();
+                }
+            }, 5, 5, TimeUnit.SECONDS);
         }
 
         @Override
         public void registerRecv(IConnectionCallback cb) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
         }
-
+        
+        private void flushPending(){
+            if (!_pendingDueToUnregisteredServer.isEmpty()) {
+                ArrayList<TaskMessage> ret = new ArrayList<>();
+                _pendingDueToUnregisteredServer.drainTo(ret);
+                _server._cb.recv(ret);
+            }
+        }
+        
         @Override
         public void send(int taskId,  byte[] payload) {
+            TaskMessage message = new TaskMessage(taskId, payload);
             if (_server._cb != null) {
-                _server._cb.recv(Arrays.asList(new TaskMessage(taskId, 
payload)));
+                flushPending();
+                _server._cb.recv(Arrays.asList(message));
+            } else {
+                _pendingDueToUnregisteredServer.add(message);
             }
         }
  
         @Override
         public void send(Iterator<TaskMessage> msgs) {
             if (_server._cb != null) {
+                flushPending();
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 while (msgs.hasNext()) {
                     ret.add(msgs.next());
                 }
                 _server._cb.recv(ret);
+            } else {
+                while(msgs.hasNext()){
+                    _pendingDueToUnregisteredServer.add(msgs.next());
+                }
             }
         }
 
@@ -122,7 +163,7 @@ public class Context implements IContext {
  
         @Override
         public void close() {
-            //NOOP
+            _pendingFlusher.shutdown();
         }
     };
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e7b12846/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj 
b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 3b1a48b..342bba6 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -63,15 +63,8 @@
                                                                 (is 
(Time/isSimulating)))))
     (is (not (Time/isSimulating)))))
 
-(deftest test-complete-topology
-  (doseq [zmq-on? [true false]
-          :let [daemon-conf (doto (Config.)
-                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
-                mk-cluster-param (doto (MkClusterParam.)
-                                   (.setSupervisors (int 4))
-                                   (.setDaemonConf daemon-conf))]]
-    (Testing/withSimulatedTimeLocalCluster
-     (reify TestJob
+(def complete-topology-testjob
+  (reify TestJob
        (^void run [this ^ILocalCluster cluster]
          (let [topology (Thrift/buildTopology
                          {"1" (Thrift/prepareSpoutDetails (TestWordSpout. 
true) (Integer. 3))}
@@ -109,7 +102,19 @@
                   (Testing/readTuples results "3")))
            (is (= [[1] [2] [3] [4]]
                   (Testing/readTuples results "4")))
-           ))))))
+           ))))
+
+(deftest test-complete-topology
+  (doseq [zmq-on? [true false]
+          :let [daemon-conf (doto (Config.)
+                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
+                mk-cluster-param (doto (MkClusterParam.)
+                                   (.setSupervisors (int 4))
+                                   (.setDaemonConf daemon-conf))]]
+    (Testing/withSimulatedTimeLocalCluster
+      mk-cluster-param complete-topology-testjob )
+    (Testing/withLocalCluster
+      mk-cluster-param complete-topology-testjob)))
 
 (deftest test-with-tracked-cluster
   (Testing/withTrackedCluster

Reply via email to