[
https://issues.apache.org/jira/browse/AIRFLOW-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17204021#comment-17204021
]
ASF GitHub Bot commented on AIRFLOW-5545:
-----------------------------------------
kaxil commented on a change in pull request #6175:
URL: https://github.com/apache/airflow/pull/6175#discussion_r496798831
##########
File path: airflow/utils/dag_cycle_tester.py
##########
@@ -32,28 +32,34 @@ def test_cycle(dag):
Check to see if there are any cycles in the DAG. Returns False if no cycle
found,
otherwise raises exception.
"""
- def _test_cycle_helper(visit_map: Dict[str, int], task_id: str) -> None:
- """
- Checks if a cycle exists from the input task using DFS traversal
- """
- if visit_map[task_id] == CYCLE_DONE:
- return
-
- visit_map[task_id] = CYCLE_IN_PROGRESS
-
- task = dag.task_dict[task_id]
- for descendant_id in task.get_direct_relative_ids():
- if visit_map[descendant_id] == CYCLE_IN_PROGRESS:
- msg = "Cycle detected in DAG. Faulty task: {0} to
{1}".format(task_id, descendant_id)
- raise AirflowDagCycleException(msg)
- else:
- _test_cycle_helper(visit_map, descendant_id)
-
- visit_map[task_id] = CYCLE_DONE
-
# default of int is 0 which corresponds to CYCLE_NEW
dag_visit_map: Dict[str, int] = defaultdict(int)
for dag_task_id in dag.task_dict.keys():
if dag_visit_map[dag_task_id] == CYCLE_NEW:
- _test_cycle_helper(dag_visit_map, dag_task_id)
+ _test_cycle_helper(dag_visit_map, dag_task_id, dag.task_dict)
return False
+
+
+def _test_cycle_helper(visit_map, task_id, task_dict):
+ """
+ Checks if a cycle exists from the input task using DFS traversal
+ """
+ path, visited = deque([task_id]), []
+
+ while path:
+ current_task_id = path.pop()
+ if visit_map[current_task_id] == CYCLE_DONE:
+ continue
+ if visit_map[current_task_id] == CYCLE_IN_PROGRESS:
+ msg = "Cycle detected in DAG. Faulty task: {}".format(
+ current_task_id)
Review comment:
```suggestion
msg = f"Cycle detected in DAG. Faulty task: {current_task_id}"
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> dag.test_cycle uses recursion, limiting DAG length.
> ---------------------------------------------------
>
> Key: AIRFLOW-5545
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5545
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG
> Affects Versions: 1.10.3, 1.10.4, 1.10.5
> Reporter: Daniel Imberman
> Assignee: Daniel Imberman
> Priority: Critical
> Fix For: 2.0.0
>
>
> DAG Parsing is currently breaking for long DAGs because test_cycle uses
> recursive DFS
> Example breaking DAG
>
> {code:java}
> dag = DAG( 'dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
> with dag:
> last = dag
> for i in range(1,10000):
> op = DummyOperator(task_id='{}'.format(i))
> last >> op
> last = op
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)