bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470122901



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws 
IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The 
mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound 
edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> 
getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) 
-> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new 
HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) 
-> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new 
HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First 
recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid 
re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is 
detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, 
Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) 
{
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());
+                    }
+                    tmp.add(compId2);
+                    cycle.addAll(tmp);
+                }
+                cycles.add(cycle);
+                edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                continue;
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting 
from spout.
+     * Return a list of cycles. Each cycle may consist of one or more 
components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology 
topology, String topoId) {
+        List<List<String>> ret = new ArrayList<>();
+        Map<String, Set<String>> edgesOut = 
getStormTopologyForwardGraph(topology);
+        Set<String> allComponentIds = new HashSet<>();
+        edgesOut.forEach((k, v) -> {
+            allComponentIds.add(k) ;
+            allComponentIds.addAll(v);
+        });
+
+        if (topology.get_spouts_size() == 0) {
+            LOG.error("Topology {} does not contain any spouts, cannot 
traverse graph to determine cycles", topoId);
+            ret.add(new ArrayList(edgesOut.keySet()));
+            return ret;
+        }
+
+        Set<String> unreachable = new HashSet<>(edgesOut.keySet());
+        topology.get_spouts().forEach((spoutId, spout)  -> {
+            Stack<String> dfsStack = new Stack<>();
+            dfsStack.push(spoutId);
+            Set<String> seen = new HashSet<>();
+            seen.add(spoutId);
+            findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
+            unreachable.removeAll(seen);
+        });
+
+        // warning about unreachable components
+        if (!unreachable.isEmpty()) {
+            LOG.warn("Topology {} contains unreachable components \"{}\"", 
topoId, String.join(",", unreachable));
+        }
+
+        // detected cycles
+        if (!ret.isEmpty()) {
+            LOG.error("Topology {} contains cycles {}", topoId,

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to