Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r469539586
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for spouts and bolts in a topology. The
mapping contains ids of the spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of out bound
edges of SpoutIds/BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_spouts() != null) {
+ topology.get_spouts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
Review comment:
The `topology` here must be a userTopology (if it is a system topology,
it will have loops because of ackers).
So I think we don't need to check `Utils.isSystemId`. And we don't need to
`get_inputs` on spout since it will be empty. (see
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L162-L166)
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for spouts and bolts in a topology. The
mapping contains ids of the spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of out bound
edges of SpoutIds/BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_spouts() != null) {
+ topology.get_spouts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First
recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid
re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is
detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut,
Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty())
{
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle detected
+ List<String> cycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ cycle.add(compId2);
+ } else if (edgesOut.get(compId2).contains(compId1)) {
+ cycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ tmp.subList(prevIdx, tmp.size());
Review comment:
Looks like this needs to be `tmp = tmp.subList(prevIdx, tmp.size());`
otherwise the result will not be accurate
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for spouts and bolts in a topology. The
mapping contains ids of the spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of out bound
edges of SpoutIds/BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_spouts() != null) {
+ topology.get_spouts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First
recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid
re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is
detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut,
Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty())
{
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle detected
+ List<String> cycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ cycle.add(compId2);
+ } else if (edgesOut.get(compId2).contains(compId1)) {
+ cycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ tmp.subList(prevIdx, tmp.size());
+ }
+ tmp.add(compId2);
+ cycle.addAll(tmp);
+ }
+ cycles.add(cycle);
+ edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+ continue;
+ }
+ seen.add(compId2);
+ stack.push(compId2);
+ findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+ }
+ stack.pop();
+ }
+
+ /**
+ * Find and return components cycles in the topology graph when starting
from spout.
+ * Return a list of cycles. Each cycle may consist of one or more
components.
+ * Components that cannot be reached from any of the spouts are ignored.
+ *
+ * @return a List of cycles. Each cycle has a list of component names.
+ *
+ */
+ public static List<List<String>> findComponentCycles(StormTopology
topology, String topoId) {
+ List<List<String>> ret = new ArrayList<>();
+ Map<String, Set<String>> edgesOut =
getStormTopologyForwardGraph(topology);
+ Set<String> allComponentIds = new HashSet<>();
+ edgesOut.forEach((k, v) -> {
+ allComponentIds.add(k) ;
+ allComponentIds.addAll(v);
+ });
+
+ if (topology.get_spouts_size() == 0) {
+ LOG.error("Topology {} does not contain any spouts, cannot
traverse graph to determine cycles", topoId);
+ ret.add(new ArrayList(edgesOut.keySet()));
+ return ret;
+ }
+
+ Set<String> unreachable = new HashSet<>(edgesOut.keySet());
+ topology.get_spouts().forEach((spoutId, spout) -> {
+ Stack<String> dfsStack = new Stack<>();
+ dfsStack.push(spoutId);
+ Set<String> seen = new HashSet<>();
+ seen.add(spoutId);
+ findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
+ unreachable.removeAll(seen);
+ });
+
+ // warning about unreachable components
+ if (!unreachable.isEmpty()) {
+ LOG.warn("Topology {} contains unreachable components \"{}\"",
topoId, String.join(",", unreachable));
+ }
+
+ // detected cycles
+ if (!ret.isEmpty()) {
+ LOG.error("Topology {} contains cycles {}", topoId,
Review comment:
This seems repeating the log message in validateConfs, and can be
probably removed.
##########
File path: storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
##########
@@ -237,4 +254,184 @@ public void checkVersionInfo() {
assertNotNull(found);
assertEquals(key, found.getVersion());
}
+
+ @Test
+ public void testFindComponentCycles() {
+ class CycleDetectionScenario {
+ final String testName;
+ final String testDescription;
+ final StormTopology topology;
+ final int expectedCycles;
+
+ CycleDetectionScenario() {
+ testName = "dummy";
+ testDescription = "dummy test";
+ topology = null;
+ expectedCycles = 0;
+ }
+
+ CycleDetectionScenario(String testName, String testDescription,
StormTopology topology, int expectedCycles) {
+ this.testName = testName;
+ this.testDescription = testDescription;
+ this.topology = topology;
+ this.expectedCycles = expectedCycles;
+ }
+
+ private IRichSpout makeDummySpout() {
+ return new BaseRichSpout() {
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer
declarer) {
+ }
+
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext
context, SpoutOutputCollector collector) {
+ }
+
+ @Override
+ public void nextTuple() {
+ }
+
+ private void writeObject(java.io.ObjectOutputStream
stream) {
+ }
+ };
+ }
+
+ private IStatefulBolt makeDummyStatefulBolt() {
Review comment:
can we use non-stateful bolt? We don't really use it anywhere. And it
will insert some bolts into the topology. So the unit test result might be hard
to understand. For example, like
https://github.com/apache/storm/blob/master/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java#L200-L201
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for spouts and bolts in a topology. The
mapping contains ids of the spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of out bound
edges of SpoutIds/BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_spouts() != null) {
+ topology.get_spouts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First
recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid
re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is
detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut,
Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty())
{
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle detected
+ List<String> cycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ cycle.add(compId2);
+ } else if (edgesOut.get(compId2).contains(compId1)) {
+ cycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ tmp.subList(prevIdx, tmp.size());
+ }
+ tmp.add(compId2);
+ cycle.addAll(tmp);
+ }
+ cycles.add(cycle);
+ edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+ continue;
+ }
+ seen.add(compId2);
+ stack.push(compId2);
+ findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+ }
+ stack.pop();
+ }
+
+ /**
+ * Find and return components cycles in the topology graph when starting
from spout.
+ * Return a list of cycles. Each cycle may consist of one or more
components.
+ * Components that cannot be reached from any of the spouts are ignored.
+ *
+ * @return a List of cycles. Each cycle has a list of component names.
+ *
+ */
+ public static List<List<String>> findComponentCycles(StormTopology
topology, String topoId) {
Review comment:
I haven't read the complete implementation of `findComponentCycles`.
But there seems to be an issue on detecting complex cycles.
```
tb = new TopologyBuilder();
tb.setSpout("spout1", new TestWordSpout(), 10);
tb.setSpout("spout2", new TestWordSpout(), 10);
tb.setBolt("bolt1", new TestWordCounter(),
10).shuffleGrouping("spout1").shuffleGrouping("bolt4");
tb.setBolt("bolt2", new TestWordCounter(),
10).shuffleGrouping("bolt1");
tb.setBolt("bolt3", new TestWordCounter(),
10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
tb.setBolt("bolt4", new TestWordCounter(),
10).shuffleGrouping("bolt3").shuffleGrouping("spout2");
```
The result ( is
```
contains cycles bolt3,bolt4 ; spout2,bolt4,bolt3
```
`spout2` shouldn't be in the result. Please let me know if I am doing
anything wrong.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]