bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470122901
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for spouts and bolts in a topology. The
mapping contains ids of the spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of out bound
edges of SpoutIds/BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_spouts() != null) {
+ topology.get_spouts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First
recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid
re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is
detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut,
Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty())
{
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle detected
+ List<String> cycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ cycle.add(compId2);
+ } else if (edgesOut.get(compId2).contains(compId1)) {
+ cycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ tmp.subList(prevIdx, tmp.size());
+ }
+ tmp.add(compId2);
+ cycle.addAll(tmp);
+ }
+ cycles.add(cycle);
+ edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+ continue;
+ }
+ seen.add(compId2);
+ stack.push(compId2);
+ findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+ }
+ stack.pop();
+ }
+
+ /**
+ * Find and return components cycles in the topology graph when starting
from spout.
+ * Return a list of cycles. Each cycle may consist of one or more
components.
+ * Components that cannot be reached from any of the spouts are ignored.
+ *
+ * @return a List of cycles. Each cycle has a list of component names.
+ *
+ */
+ public static List<List<String>> findComponentCycles(StormTopology
topology, String topoId) {
+ List<List<String>> ret = new ArrayList<>();
+ Map<String, Set<String>> edgesOut =
getStormTopologyForwardGraph(topology);
+ Set<String> allComponentIds = new HashSet<>();
+ edgesOut.forEach((k, v) -> {
+ allComponentIds.add(k) ;
+ allComponentIds.addAll(v);
+ });
+
+ if (topology.get_spouts_size() == 0) {
+ LOG.error("Topology {} does not contain any spouts, cannot
traverse graph to determine cycles", topoId);
+ ret.add(new ArrayList(edgesOut.keySet()));
+ return ret;
+ }
+
+ Set<String> unreachable = new HashSet<>(edgesOut.keySet());
+ topology.get_spouts().forEach((spoutId, spout) -> {
+ Stack<String> dfsStack = new Stack<>();
+ dfsStack.push(spoutId);
+ Set<String> seen = new HashSet<>();
+ seen.add(spoutId);
+ findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
+ unreachable.removeAll(seen);
+ });
+
+ // warning about unreachable components
+ if (!unreachable.isEmpty()) {
+ LOG.warn("Topology {} contains unreachable components \"{}\"",
topoId, String.join(",", unreachable));
+ }
+
+ // detected cycles
+ if (!ret.isEmpty()) {
+ LOG.error("Topology {} contains cycles {}", topoId,
Review comment:
removed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]