Repository: storm Updated Branches: refs/heads/1.x-branch 27b269e82 -> 620d2be86
STORM-2423 - Join Bolt : Use explicit instead of default window anchoring of emitted tuples Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/438be3cc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/438be3cc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/438be3cc Branch: refs/heads/1.x-branch Commit: 438be3cc3df633ddd2724bdfaa7264eeada703d8 Parents: e0b1333 Author: Roshan Naik <[email protected]> Authored: Mon Mar 20 16:46:13 2017 -0700 Committer: Roshan Naik <[email protected]> Committed: Mon Mar 20 16:46:13 2017 -0700 ---------------------------------------------------------------------- docs/Joins.md | 16 +++++++++++++++- .../org/apache/storm/starter/JoinBoltExample.java | 1 - .../src/jvm/org/apache/storm/bolt/JoinBolt.java | 6 ++++-- .../jvm/org/apache/storm/bolt/TestJoinBolt.java | 3 ++- 4 files changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/docs/Joins.md ---------------------------------------------------------------------- diff --git a/docs/Joins.md b/docs/Joins.md index d2b072d..9efb7c6 100644 --- a/docs/Joins.md +++ b/docs/Joins.md @@ -122,4 +122,18 @@ is treated as a single stream and joined against `stream2` from `bolt3`. 2. Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist across multiple windows when using Sliding Windows. - +3. If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably + accommodate the window size, plus the additional processing by other spouts and bolts. + +4. Joining a window of two streams with M and N elements each, *in the worst case*, can produce MxN elements with every output tuple + anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts + to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful. + To manage the load on the messaging subsystem, it is advisable to: + * Increase the worker's heap (topology.worker.max.heap.size.mb). + * **If** ACKing is not necessary for your topology, disable ACKers (topology.acker.executors=0). + * Disable event logger (topology.eventlogger.executors=0). + * Turn of topology debugging (topology.debug=false). + * Set topology.max.spout.pending to a value large enough to accommodate an estimated full window worth of tuples plus some more for headroom. + This helps mitigate the possibility of spouts emitting excessive tuples when messaging subsystem is experiencing excessive load. This situation + can occur when its value is set to null. + * Lastly, keep the window size to the minimum value necessary for solving the problem at hand. http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java index ec9b009..b029bea 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java @@ -53,7 +53,6 @@ public class JoinBoltExample { builder.setBolt("printer", new PrinterBolt() ).shuffleGrouping("joiner"); Config conf = new Config(); - conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("join-example", conf, builder.createTopology()); http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java b/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java index 5f6845c..69b44ac 100644 --- a/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java +++ b/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java @@ -191,9 +191,11 @@ public class JoinBolt extends BaseWindowedBolt { for (ResultRecord resultRecord : joinResult.getRecords()) { ArrayList<Object> outputTuple = resultRecord.getOutputFields(); if ( outputStreamName==null ) - collector.emit( outputTuple ); + // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window + collector.emit( resultRecord.tupleList, outputTuple ); else - collector.emit( outputStreamName, outputTuple ); + // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window + collector.emit( outputStreamName, resultRecord.tupleList, outputTuple ); } } http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java b/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java index 63c4a7f..8f99f68 100644 --- a/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java +++ b/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Collection; public class TestJoinBolt { String[] userFields = {"userId", "name", "city"}; @@ -328,7 +329,7 @@ public class TestJoinBolt { } @Override - public List<Integer> emit(List<Object> tuple) { + public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) { actualResults.add(tuple); return null; }
