Repository: storm Updated Branches: refs/heads/1.x-branch 9d039588f -> 58cbfe5e2
Merge branch 'STORM-1837-2' of https://github.com/srdo/storm into STORM-1837 STORM-1837: Fix complete-topology and prevent message loss Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9899ac3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9899ac3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9899ac3 Branch: refs/heads/1.x-branch Commit: b9899ac3050651d8c6ac6f64c31cdcdc369b22db Parents: 9d03958 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Mon Sep 19 15:12:25 2016 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Mon Sep 19 15:27:13 2016 -0500 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/testing.clj | 3 +- .../apache/storm/messaging/local/Context.java | 70 +++++++++++++++++--- .../org/apache/storm/testing4j_test.clj | 25 ++++--- 3 files changed, 79 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 5e5700c..565d1b9 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -540,7 +540,8 @@ (startup spout)) (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology) - (advance-cluster-time cluster-map 11) + (when (Time/isSimulating) + (advance-cluster-time cluster-map 11)) (let [storm-id (common/get-storm-id state storm-name)] ;;Give the topology time to come up without using it to wait for the spouts to complete http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java index 4f0ba1f..7300847 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/local/Context.java @@ -27,8 +27,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; -import org.apache.storm.Config; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.storm.grouping.Load; import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.TaskMessage; @@ -39,7 +43,7 @@ public class Context implements IContext { private static final Logger LOG = LoggerFactory.getLogger(Context.class); private static class LocalServer implements IConnection { - IConnectionCallback _cb; + volatile IConnectionCallback _cb; final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>(); @Override @@ -82,31 +86,76 @@ public class Context implements IContext { private static class LocalClient implements IConnection { private final LocalServer _server; + //Messages sent before the server registered a callback + private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer; + private final ScheduledExecutorService _pendingFlusher; public LocalClient(LocalServer server) { _server = server; + _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>(); + _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){ + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName("LocalClientFlusher-" + thread.getId()); + thread.setDaemon(true); + return thread; + } + }); + _pendingFlusher.scheduleAtFixedRate(new Runnable(){ + @Override + public void run(){ + try { + //Ensure messages are flushed even if no more sends are performed + flushPending(); + } catch (Throwable t) { + LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t); + throw new RuntimeException(t); + } + } + }, 5, 5, TimeUnit.SECONDS); } @Override public void registerRecv(IConnectionCallback cb) { throw new IllegalArgumentException("SHOULD NOT HAPPEN"); } - + + private void flushPending(){ + IConnectionCallback serverCb = _server._cb; + if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) { + ArrayList<TaskMessage> ret = new ArrayList<>(); + _pendingDueToUnregisteredServer.drainTo(ret); + serverCb.recv(ret); + } + } + @Override public void send(int taskId, byte[] payload) { - if (_server._cb != null) { - _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload))); + TaskMessage message = new TaskMessage(taskId, payload); + IConnectionCallback serverCb = _server._cb; + if (serverCb != null) { + flushPending(); + serverCb.recv(Arrays.asList(message)); + } else { + _pendingDueToUnregisteredServer.add(message); } } @Override public void send(Iterator<TaskMessage> msgs) { - if (_server._cb != null) { + IConnectionCallback serverCb = _server._cb; + if (serverCb != null) { + flushPending(); ArrayList<TaskMessage> ret = new ArrayList<>(); while (msgs.hasNext()) { ret.add(msgs.next()); } - _server._cb.recv(ret); + serverCb.recv(ret); + } else { + while(msgs.hasNext()){ + _pendingDueToUnregisteredServer.add(msgs.next()); + } } } @@ -122,7 +171,12 @@ public class Context implements IContext { @Override public void close() { - //NOOP + _pendingFlusher.shutdown(); + try{ + _pendingFlusher.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e){ + throw new RuntimeException("Interrupted while awaiting flusher shutdown", e); + } } }; http://git-wip-us.apache.org/repos/asf/storm/blob/b9899ac3/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj index cd139d7..5f71e86 100644 --- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj @@ -59,15 +59,8 @@ (is (Time/isSimulating))))) (is (not (Time/isSimulating))))) -(deftest test-complete-topology - (doseq [zmq-on? [true false] - :let [daemon-conf (doto (Config.) - (.put STORM-LOCAL-MODE-ZMQ zmq-on?)) - mk-cluster-param (doto (MkClusterParam.) - (.setSupervisors (int 4)) - (.setDaemonConf daemon-conf))]] - (Testing/withSimulatedTimeLocalCluster - (reify TestJob +(def complete-topology-testjob + (reify TestJob (^void run [this ^ILocalCluster cluster] (let [topology (thrift/mk-topology {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} @@ -97,7 +90,19 @@ (Testing/readTuples results "3"))) (is (= [[1] [2] [3] [4]] (Testing/readTuples results "4"))) - )))))) + )))) + +(deftest test-complete-topology + (doseq [zmq-on? [true false] + :let [daemon-conf (doto (Config.) + (.put STORM-LOCAL-MODE-ZMQ zmq-on?)) + mk-cluster-param (doto (MkClusterParam.) + (.setSupervisors (int 4)) + (.setDaemonConf daemon-conf))]] + (Testing/withSimulatedTimeLocalCluster + mk-cluster-param complete-topology-testjob ) + (Testing/withLocalCluster + mk-cluster-param complete-topology-testjob))) (deftest test-with-tracked-cluster (Testing/withTrackedCluster