Repository: storm Updated Branches: refs/heads/master 809c4b2a9 -> c4bb12463
STORM-2422: Reduce the size of a serialized trident topology Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/525f14f6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/525f14f6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/525f14f6 Branch: refs/heads/master Commit: 525f14f64b112cb6e73788a0e97dad56048c876d Parents: 0946048 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Fri Mar 17 13:25:27 2017 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Fri Mar 17 13:31:58 2017 -0500 ---------------------------------------------------------------------- .../storm/trident/planner/SubtopologyBolt.java | 91 +++++++++++++------- 1 file changed, 60 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/525f14f6/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java b/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java index b78c151..780c01f 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java +++ b/storm-core/src/jvm/org/apache/storm/trident/planner/SubtopologyBolt.java @@ -17,20 +17,17 @@ */ package org.apache.storm.trident.planner; -import org.apache.storm.coordination.BatchOutputCollector; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.jgrapht.DirectedGraph; -import org.jgrapht.graph.DirectedSubgraph; -import org.jgrapht.traverse.TopologicalOrderIterator; + +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.trident.planner.processor.TridentContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.topology.BatchInfo; @@ -39,25 +36,61 @@ import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.tuple.TridentTuple.Factory; import org.apache.storm.trident.tuple.TridentTupleView.ProjectionFactory; import org.apache.storm.trident.tuple.TridentTupleView.RootFactory; +import org.apache.storm.trident.util.IndexedEdge; import org.apache.storm.trident.util.TridentUtils; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.jgrapht.DirectedGraph; +import org.jgrapht.graph.DefaultDirectedGraph; +import org.jgrapht.graph.DirectedSubgraph; +import org.jgrapht.traverse.TopologicalOrderIterator; -// TODO: parameterizing it like this with everything might be a high deserialization cost if there's lots of tasks? -// TODO: memory problems? -// TODO: can avoid these problems by adding a boltfactory abstraction, so that boltfactory is deserialized once -// bolt factory -> returns coordinatedbolt per task, but deserializes the batch bolt one time and caches + +/** + * A Bolt that does processing for a subsection of the complete graph. + */ public class SubtopologyBolt implements ITridentBatchBolt { - DirectedGraph _graph; - Set<Node> _nodes; - Map<String, InitialReceiver> _roots = new HashMap<>(); - Map<Node, Factory> _outputFactories = new HashMap<>(); - Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap<>(); - Map<Node, String> _batchGroups; + private static final long serialVersionUID = 1475508603138688412L; + @SuppressWarnings("rawtypes") + final DirectedGraph<Node, IndexedEdge> _graph; + final Set<Node> _nodes; + final Map<String, InitialReceiver> _roots = new HashMap<>(); + final Map<Node, Factory> _outputFactories = new HashMap<>(); + final Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap<>(); + final Map<Node, String> _batchGroups; //given processornodes and static state nodes - public SubtopologyBolt(DirectedGraph graph, Set<Node> nodes, Map<Node, String> batchGroups) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + public SubtopologyBolt(DefaultDirectedGraph<Node, IndexedEdge> graph, Set<Node> nodes, Map<Node, String> batchGroups) { _nodes = nodes; - _graph = graph; - _batchGroups = batchGroups; + _graph = (DirectedGraph<Node, IndexedEdge>) graph.clone(); + _batchGroups = copyAndOnlyKeep(batchGroups, nodes); + + //Remove the unneeded entries from the graph + //We want to keep all of our nodes, and the nodes that they are connected directly to (parents and children). + Set<Node> nodesToKeep = new HashSet<>(); + for (IndexedEdge edge : _graph.edgeSet()) { + Node s = _graph.getEdgeSource(edge); + Node t = _graph.getEdgeTarget(edge); + if (_nodes.contains(s) || _nodes.contains(t)) { + nodesToKeep.add(s); + nodesToKeep.add(t); + } + } + + Set<Node> nodesToRemove = new HashSet<>(_graph.vertexSet()); + nodesToRemove.removeAll(nodesToKeep); + _graph.removeAllVertices(nodesToRemove); + } + + private static Map<Node, String> copyAndOnlyKeep(Map<Node, String> batchGroups, Set<Node> nodes) { + Map<Node, String> ret = new HashMap<>(nodes.size()); + for (Map.Entry<Node, String> entry: batchGroups.entrySet()) { + if (nodes.contains(entry.getKey())) { + ret.put(entry.getKey(), entry.getValue()); + } + } + return ret; } @Override @@ -69,16 +102,16 @@ public class SubtopologyBolt implements ITridentBatchBolt { context.setTaskData(n.stateInfo.id, s); } } - DirectedSubgraph<Node, Object> subgraph = new DirectedSubgraph(_graph, _nodes, null); - TopologicalOrderIterator it = new TopologicalOrderIterator<>(subgraph); + DirectedSubgraph<Node, ?> subgraph = new DirectedSubgraph<>(_graph, _nodes, null); + TopologicalOrderIterator<Node, ?> it = new TopologicalOrderIterator<>(subgraph); int stateIndex = 0; while(it.hasNext()) { - Node n = (Node) it.next(); + Node n = it.next(); if(n instanceof ProcessorNode) { ProcessorNode pn = (ProcessorNode) n; String batchGroup = _batchGroups.get(n); if(!_myTopologicallyOrdered.containsKey(batchGroup)) { - _myTopologicallyOrdered.put(batchGroup, new ArrayList()); + _myTopologicallyOrdered.put(batchGroup, new ArrayList<>()); } _myTopologicallyOrdered.get(batchGroup).add(pn.processor); List<String> parentStreams = new ArrayList<>(); @@ -121,9 +154,7 @@ public class SubtopologyBolt implements ITridentBatchBolt { _outputFactories.put(n, pn.processor.getOutputFactory()); } stateIndex++; - } - // TODO: get prepared one time into executor data... need to avoid the ser/deser - // for each task (probably need storm to support boltfactory) + } } private Fields getSourceOutputFields(TopologyContext context, String sourceStream) { @@ -190,8 +221,6 @@ public class SubtopologyBolt implements ITridentBatchBolt { String _stream; public InitialReceiver(String stream, Fields allFields) { - // TODO: don't want to project for non-batch bolts...??? - // how to distinguish "batch" streams from non-batch streams? _stream = stream; _factory = new RootFactory(allFields); List<String> projected = new ArrayList<>(allFields.toList());