Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480402134
##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name,
Map<String, Object> topoConf, S
conf.putAll(topoConf);
topoConf.putAll(prepareZookeeperAuthentication(conf));
- validateConfs(conf, topology);
+ validateConfs(conf, topology, name);
Review comment:
I feel like this is not the right place to detect cycles. We should
probably put it in another function, like
`StormCommon.validateBasic`
https://git.vzbuilders.com/storm/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L156
or create a new method.
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for bolts in a topology. Note that spouts
can be source but not a target in
+ * the edge. The mapping contains ids of spouts and bolts.
Review comment:
Comments here and below need to be updated since the mapping only
contains bolts.
##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws
IOException {
}
}
}
+
+ /**
+ * Create a map of forward edges for bolts in a topology. Note that spouts
can be source but not a target in
+ * the edge. The mapping contains ids of spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of outbound
edges of BoltIds.
+ */
+ private static Map<String, Set<String>>
getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v)
-> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new
HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First
recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid
re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is
detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut,
Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty())
{
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle/diamond detected
+ List<String> possibleCycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ possibleCycle.add(compId2);
+ } else if (edgesOut.get(compId2).contains(compId1)) {
+ possibleCycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ // cycle (as opposed to diamond)
+ tmp = tmp.subList(prevIdx, tmp.size());
+ tmp.add(compId2);
+ possibleCycle.addAll(tmp);
+ }
+ }
+ if (!possibleCycle.isEmpty()) {
+ cycles.add(possibleCycle);
+ edgesOut.get(compId1).remove(compId2); // disconnect this
cycle
+ continue;
+ }
+ }
+ seen.add(compId2);
+ stack.push(compId2);
+ findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+ }
+ stack.pop();
+ }
+
+ /**
+ * Find and return components cycles in the topology graph when starting
from spout.
+ * Return a list of cycles. Each cycle may consist of one or more
components.
+ * Components that cannot be reached from any of the spouts are ignored.
+ *
+ * @return a List of cycles. Each cycle has a list of component names.
+ *
+ */
+ public static List<List<String>> findComponentCycles(StormTopology
topology, String topoId) {
+ List<List<String>> ret = new ArrayList<>();
+ Map<String, Set<String>> edgesOut =
getStormTopologyForwardGraph(topology);
+ Set<String> allComponentIds = new HashSet<>();
+ edgesOut.forEach((k, v) -> {
+ allComponentIds.add(k) ;
+ allComponentIds.addAll(v);
+ });
+
+ if (topology.get_spouts_size() == 0) {
+ LOG.error("Topology {} does not contain any spouts, cannot
traverse graph to determine cycles", topoId);
+ ret.add(new ArrayList(edgesOut.keySet()));
Review comment:
This doesn't mean it has cycles. We should separate them.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]