Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480402134



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, 
Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       I feel like this is not the right place to detect cycles.  We should 
probably put it in another function, like 
   `StormCommon.validateBasic`
   
   
https://git.vzbuilders.com/storm/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L156
   
   or create a new method.

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws 
IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts 
can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.

Review comment:
       Comments  here and below need to be updated since the mapping only 
contains bolts.

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws 
IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts 
can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of outbound 
edges of BoltIds.
+     */
+    private static Map<String, Set<String>> 
getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) 
-> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new 
HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First 
recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid 
re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is 
detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, 
Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) 
{
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle/diamond detected
+                List<String> possibleCycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    possibleCycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    possibleCycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        // cycle (as opposed to diamond)
+                        tmp = tmp.subList(prevIdx, tmp.size());
+                        tmp.add(compId2);
+                        possibleCycle.addAll(tmp);
+                    }
+                }
+                if (!possibleCycle.isEmpty()) {
+                    cycles.add(possibleCycle);
+                    edgesOut.get(compId1).remove(compId2); // disconnect this 
cycle
+                    continue;
+                }
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting 
from spout.
+     * Return a list of cycles. Each cycle may consist of one or more 
components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology 
topology, String topoId) {
+        List<List<String>> ret = new ArrayList<>();
+        Map<String, Set<String>> edgesOut = 
getStormTopologyForwardGraph(topology);
+        Set<String> allComponentIds = new HashSet<>();
+        edgesOut.forEach((k, v) -> {
+            allComponentIds.add(k) ;
+            allComponentIds.addAll(v);
+        });
+
+        if (topology.get_spouts_size() == 0) {
+            LOG.error("Topology {} does not contain any spouts, cannot 
traverse graph to determine cycles", topoId);
+            ret.add(new ArrayList(edgesOut.keySet()));

Review comment:
       This doesn't mean it has cycles. We should separate them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to