ashb commented on a change in pull request #6175:
URL: https://github.com/apache/airflow/pull/6175#discussion_r496796225
##########
File path: airflow/utils/dag_cycle_tester.py
##########
@@ -32,28 +32,34 @@ def test_cycle(dag):
Check to see if there are any cycles in the DAG. Returns False if no cycle
found,
otherwise raises exception.
"""
- def _test_cycle_helper(visit_map: Dict[str, int], task_id: str) -> None:
- """
- Checks if a cycle exists from the input task using DFS traversal
- """
- if visit_map[task_id] == CYCLE_DONE:
- return
-
- visit_map[task_id] = CYCLE_IN_PROGRESS
-
- task = dag.task_dict[task_id]
- for descendant_id in task.get_direct_relative_ids():
- if visit_map[descendant_id] == CYCLE_IN_PROGRESS:
- msg = "Cycle detected in DAG. Faulty task: {0} to
{1}".format(task_id, descendant_id)
- raise AirflowDagCycleException(msg)
- else:
- _test_cycle_helper(visit_map, descendant_id)
-
- visit_map[task_id] = CYCLE_DONE
-
# default of int is 0 which corresponds to CYCLE_NEW
dag_visit_map: Dict[str, int] = defaultdict(int)
for dag_task_id in dag.task_dict.keys():
if dag_visit_map[dag_task_id] == CYCLE_NEW:
- _test_cycle_helper(dag_visit_map, dag_task_id)
+ _test_cycle_helper(dag_visit_map, dag_task_id, dag.task_dict)
return False
+
+
+def _test_cycle_helper(visit_map, task_id, task_dict):
+ """
+ Checks if a cycle exists from the input task using DFS traversal
+ """
+ path, visited = deque([task_id]), []
Review comment:
```suggestion
path = deque([task_id])
visited = []
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]