Repository: storm
Updated Branches:
  refs/heads/1.x-branch 27b269e82 -> 620d2be86


STORM-2423 - Join Bolt : Use explicit instead of default window anchoring of 
emitted tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/438be3cc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/438be3cc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/438be3cc

Branch: refs/heads/1.x-branch
Commit: 438be3cc3df633ddd2724bdfaa7264eeada703d8
Parents: e0b1333
Author: Roshan Naik <[email protected]>
Authored: Mon Mar 20 16:46:13 2017 -0700
Committer: Roshan Naik <[email protected]>
Committed: Mon Mar 20 16:46:13 2017 -0700

----------------------------------------------------------------------
 docs/Joins.md                                       | 16 +++++++++++++++-
 .../org/apache/storm/starter/JoinBoltExample.java   |  1 -
 .../src/jvm/org/apache/storm/bolt/JoinBolt.java     |  6 ++++--
 .../jvm/org/apache/storm/bolt/TestJoinBolt.java     |  3 ++-
 4 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/docs/Joins.md
----------------------------------------------------------------------
diff --git a/docs/Joins.md b/docs/Joins.md
index d2b072d..9efb7c6 100644
--- a/docs/Joins.md
+++ b/docs/Joins.md
@@ -122,4 +122,18 @@ is treated as a single stream and joined against `stream2` 
from `bolt3`.
 2. Duplication of joined records across windows is possible when using Sliding 
Windows. This is because the tuples continue to exist
    across multiple windows when using Sliding Windows.
 
-
+3. If message timeouts are enabled, ensure the timeout setting 
(topology.message.timeout.secs) is large enough to comfortably
+   accommodate the window size, plus the additional processing by other spouts 
and bolts.
+
+4. Joining a window of two streams with M and N elements each, *in the worst 
case*, can produce MxN elements with every output tuple
+   anchored to one tuple from each input stream. This can mean a lot of output 
tuples from JoinBolt and even more ACKs for downstream bolts
+   to emit. This can place a substantial pressure on the messaging system and 
dramatically slowdown the topology if not careful.
+   To manage the load on the messaging subsystem, it is advisable to:
+   * Increase the worker's heap (topology.worker.max.heap.size.mb).
+   * **If** ACKing is not necessary for your topology, disable ACKers 
(topology.acker.executors=0).
+   * Disable event logger (topology.eventlogger.executors=0).
+   * Turn of topology debugging (topology.debug=false).
+   * Set topology.max.spout.pending to a value large enough to accommodate an 
estimated full window worth of tuples plus some more for headroom.
+     This helps mitigate the possibility of spouts emitting excessive tuples 
when messaging subsystem is experiencing excessive load. This situation
+     can occur when its value is set to null.
+   * Lastly, keep the window size to the minimum value necessary for solving 
the problem at hand.

http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
index ec9b009..b029bea 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
@@ -53,7 +53,6 @@ public class JoinBoltExample {
         builder.setBolt("printer", new PrinterBolt() 
).shuffleGrouping("joiner");
 
         Config conf = new Config();
-        conf.setDebug(true);
 
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology("join-example", conf, builder.createTopology());

http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java 
b/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java
index 5f6845c..69b44ac 100644
--- a/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java
@@ -191,9 +191,11 @@ public class JoinBolt extends BaseWindowedBolt {
         for (ResultRecord resultRecord : joinResult.getRecords()) {
             ArrayList<Object> outputTuple = resultRecord.getOutputFields();
             if ( outputStreamName==null )
-                collector.emit( outputTuple );
+                // explicit anchoring emits to corresponding input tuples 
only, as default window anchoring will anchor them to all tuples in window
+                collector.emit( resultRecord.tupleList, outputTuple );
             else
-                collector.emit( outputStreamName, outputTuple );
+                // explicitly anchor emits to corresponding input tuples only, 
as default window anchoring will anchor them to all tuples in window
+                collector.emit( outputStreamName, resultRecord.tupleList, 
outputTuple );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/438be3cc/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java 
b/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
index 63c4a7f..8f99f68 100644
--- a/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
+++ b/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Collection;
 
 public class TestJoinBolt {
     String[] userFields = {"userId", "name", "city"};
@@ -328,7 +329,7 @@ public class TestJoinBolt {
         }
 
         @Override
-        public List<Integer> emit(List<Object> tuple) {
+        public List<Integer> emit(Collection<Tuple> anchors, List<Object> 
tuple) {
             actualResults.add(tuple);
             return null;
         }

Reply via email to