bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470074373
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for spouts and bolts in a topology. The
mapping contains ids of the spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of out bound
edges of SpoutIds/BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_spouts() != null) {
+ topology.get_spouts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First
recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid
re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is
detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut,
Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty())
{
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle detected
+ List<String> cycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ cycle.add(compId2);
+ } else if (edgesOut.get(compId2).contains(compId1)) {
+ cycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ tmp.subList(prevIdx, tmp.size());
+ }
+ tmp.add(compId2);
+ cycle.addAll(tmp);
+ }
+ cycles.add(cycle);
+ edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+ continue;
+ }
+ seen.add(compId2);
+ stack.push(compId2);
+ findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+ }
+ stack.pop();
+ }
+
+ /**
+ * Find and return components cycles in the topology graph when starting
from spout.
+ * Return a list of cycles. Each cycle may consist of one or more
components.
+ * Components that cannot be reached from any of the spouts are ignored.
+ *
+ * @return a List of cycles. Each cycle has a list of component names.
+ *
+ */
+ public static List<List<String>> findComponentCycles(StormTopology
topology, String topoId) {
Review comment:
This specific result will be fixed by the tmp.sublist.
However, I am wondering if this will misclassify diamonds as a cycle
(undesired), in which case, the code should be changed to require that #1974 be
true for cycles.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]