bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470061568
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for spouts and bolts in a topology. The
mapping contains ids of the spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of out bound
edges of SpoutIds/BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_spouts() != null) {
+ topology.get_spouts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First
recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid
re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is
detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut,
Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty())
{
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle detected
+ List<String> cycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ cycle.add(compId2);
+ } else if (edgesOut.get(compId2).contains(compId1)) {
+ cycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ tmp.subList(prevIdx, tmp.size());
Review comment:
fixed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]