Repository: storm
Updated Branches:
  refs/heads/master 809c4b2a9 -> c4bb12463


STORM-2422: Reduce the size of a serialized trident topology


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/525f14f6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/525f14f6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/525f14f6

Branch: refs/heads/master
Commit: 525f14f64b112cb6e73788a0e97dad56048c876d
Parents: 0946048
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Mar 17 13:25:27 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Mar 17 13:31:58 2017 -0500

----------------------------------------------------------------------
 .../storm/trident/planner/SubtopologyBolt.java  | 91 +++++++++++++-------
 1 file changed, 60 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/525f14f6/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java 
b/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java
index b78c151..780c01f 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java
@@ -17,20 +17,17 @@
  */
 package org.apache.storm.trident.planner;
 
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.jgrapht.DirectedGraph;
-import org.jgrapht.graph.DirectedSubgraph;
-import org.jgrapht.traverse.TopologicalOrderIterator;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.trident.planner.processor.TridentContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.topology.BatchInfo;
@@ -39,25 +36,61 @@ import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.tuple.TridentTuple.Factory;
 import org.apache.storm.trident.tuple.TridentTupleView.ProjectionFactory;
 import org.apache.storm.trident.tuple.TridentTupleView.RootFactory;
+import org.apache.storm.trident.util.IndexedEdge;
 import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.jgrapht.DirectedGraph;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.jgrapht.graph.DirectedSubgraph;
+import org.jgrapht.traverse.TopologicalOrderIterator;
 
-// TODO: parameterizing it like this with everything might be a high 
deserialization cost if there's lots of tasks?
-// TODO: memory problems?
-// TODO: can avoid these problems by adding a boltfactory abstraction, so that 
boltfactory is deserialized once
-//   bolt factory -> returns coordinatedbolt per task, but deserializes the 
batch bolt one time and caches
+
+/**
+ * A Bolt that does processing for a subsection of the complete graph.
+ */
 public class SubtopologyBolt implements ITridentBatchBolt {
-    DirectedGraph _graph;
-    Set<Node> _nodes;
-    Map<String, InitialReceiver> _roots = new HashMap<>();
-    Map<Node, Factory> _outputFactories = new HashMap<>();
-    Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new 
HashMap<>();
-    Map<Node, String> _batchGroups;
+    private static final long serialVersionUID = 1475508603138688412L;
+    @SuppressWarnings("rawtypes")
+    final DirectedGraph<Node, IndexedEdge> _graph;
+    final Set<Node> _nodes;
+    final Map<String, InitialReceiver> _roots = new HashMap<>();
+    final Map<Node, Factory> _outputFactories = new HashMap<>();
+    final Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new 
HashMap<>();
+    final Map<Node, String> _batchGroups;
     
     //given processornodes and static state nodes
-    public SubtopologyBolt(DirectedGraph graph, Set<Node> nodes, Map<Node, 
String> batchGroups) {
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public SubtopologyBolt(DefaultDirectedGraph<Node, IndexedEdge> graph, 
Set<Node> nodes, Map<Node, String> batchGroups) {
         _nodes = nodes;
-        _graph = graph;
-        _batchGroups = batchGroups;
+        _graph = (DirectedGraph<Node, IndexedEdge>) graph.clone();
+        _batchGroups = copyAndOnlyKeep(batchGroups, nodes);
+        
+        //Remove the unneeded entries from the graph
+        //We want to keep all of our nodes, and the nodes that they are 
connected directly to (parents and children).
+        Set<Node> nodesToKeep = new HashSet<>();
+        for (IndexedEdge edge : _graph.edgeSet()) {
+            Node s = _graph.getEdgeSource(edge);
+            Node t = _graph.getEdgeTarget(edge);
+            if (_nodes.contains(s) || _nodes.contains(t)) {
+                nodesToKeep.add(s);
+                nodesToKeep.add(t);
+            }
+        }
+        
+        Set<Node> nodesToRemove = new HashSet<>(_graph.vertexSet());
+        nodesToRemove.removeAll(nodesToKeep);
+        _graph.removeAllVertices(nodesToRemove);
+    }
+
+    private static Map<Node, String> copyAndOnlyKeep(Map<Node, String> 
batchGroups, Set<Node> nodes) {
+        Map<Node, String> ret = new HashMap<>(nodes.size());
+        for (Map.Entry<Node, String> entry: batchGroups.entrySet()) {
+            if (nodes.contains(entry.getKey())) {
+                ret.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return ret;
     }
 
     @Override
@@ -69,16 +102,16 @@ public class SubtopologyBolt implements ITridentBatchBolt {
                 context.setTaskData(n.stateInfo.id, s);
             }
         }
-        DirectedSubgraph<Node, Object> subgraph = new DirectedSubgraph(_graph, 
_nodes, null);
-        TopologicalOrderIterator it = new TopologicalOrderIterator<>(subgraph);
+        DirectedSubgraph<Node, ?> subgraph = new DirectedSubgraph<>(_graph, 
_nodes, null);
+        TopologicalOrderIterator<Node, ?> it = new 
TopologicalOrderIterator<>(subgraph);
         int stateIndex = 0;
         while(it.hasNext()) {
-            Node n = (Node) it.next();
+            Node n = it.next();
             if(n instanceof ProcessorNode) {
                 ProcessorNode pn = (ProcessorNode) n;
                 String batchGroup = _batchGroups.get(n);
                 if(!_myTopologicallyOrdered.containsKey(batchGroup)) {
-                    _myTopologicallyOrdered.put(batchGroup, new ArrayList());
+                    _myTopologicallyOrdered.put(batchGroup, new ArrayList<>());
                 }
                 _myTopologicallyOrdered.get(batchGroup).add(pn.processor);
                 List<String> parentStreams = new ArrayList<>();
@@ -121,9 +154,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
                 _outputFactories.put(n, pn.processor.getOutputFactory());
             }   
             stateIndex++;
-        }        
-        // TODO: get prepared one time into executor data... need to avoid the 
ser/deser
-        // for each task (probably need storm to support boltfactory)
+        }
     }
 
     private Fields getSourceOutputFields(TopologyContext context, String 
sourceStream) {
@@ -190,8 +221,6 @@ public class SubtopologyBolt implements ITridentBatchBolt {
         String _stream;
         
         public InitialReceiver(String stream, Fields allFields) {
-            // TODO: don't want to project for non-batch bolts...???
-            // how to distinguish "batch" streams from non-batch streams?
             _stream = stream;
             _factory = new RootFactory(allFields);
             List<String> projected = new ArrayList<>(allFields.toList());

Reply via email to