baryluk opened a new issue #17079:
URL: https://github.com/apache/airflow/issues/17079
**Description**
I wish `dag.cli()` reported cycles in a task graph.
**Use case / motivation**
We use Airflow (now 2.1.1), with about 40 DAGs authored by many people, with
daily changes, and put our DAGs into custom docker image that we deploy with
flux.
However, I noticed that a lot of commits from our developers, are a lot of
small fixes, because it is tricky to test DAGs locally (especially if one uses
plugins, which we don't anymore).
So I wrote a script that does import every dag file, runs it, and calls
`dag.cli()`, and I then list all tasks, and run a test --dry_run on each task.
That proved to be a super useful script, that can detect a lot of issues
(malformed imports, syntax errors, typos in jinja2 templates, uses of
unitialized variables, task id name collisions, and so on), before the change
is even commited to our git repo, docker image is build, and deployed. Thus
making iteration speed faster.
However, I noticed that `dag.cli()` does not detect cycles in a task graph.
Example:
```python3
from pprint import pprint
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
with DAG(
dag_id="test_dag1",
description="Testing",
schedule_interval="@daily",
catchup=False,
start_date=days_ago(2),
) as dag:
a = PythonOperator(
task_id='print_the_context1',
python_callable=print_context,
)
b = PythonOperator(
task_id='print_the_context2',
python_callable=print_context,
)
a >> b
b >> a
if __name__ == '__main__':
dag.cli()
```
Now running:
```
$ python3 dags/primary/examples/tutorial_cycles.py tasks list
print_the_context1
print_the_context2
$
```
```
$ python3 dags/primary/examples/tutorial_cycles.py tasks test --dry-run
print_the_context2 '2021-07-19T00:00:00+0000'
[2021-07-19 10:37:27,513] {baseoperator.py:1263} INFO - Dry run
$
```
No warnings.
When running a dag using a scheduler, it eventually detects a cycle (not
sure if on load, or only when executing it, or reaching a specific task), but
that is a bit too late.
I wonder if it is possible to make `dag.cli()` detect cycles? It might also
be possible to detect cycles even earlier, when adding DAG edges, but that
might be too slow to do on every call. However, I am pretty sure dag.cli()
could do it efficiently, as it does have a full graph available. (There are
well known linear algorithms based on DFS that detect cycles).
Just now, I noticed that there is method `dag.topological_sort()`, that is
quite handy, and will detect cycles, so if I add:
```python3
if __name__ == '__main__':
dag.topological_sort()
dag.cli()
```
It does detect a cycle:
```
Traceback (most recent call last):
File "/home/witek/code/airflow/dags/primary/examples/tutorial_cycles.py",
line 33, in <module>
print(dag.topological_sort())
File
"/home/witek/airflow-testing/venv/lib/python3.9/site-packages/airflow/models/dag.py",
line 1119, in topological_sort
raise AirflowException(f"A cyclic dependency occurred in dag:
{self.dag_id}")
airflow.exceptions.AirflowException: A cyclic dependency occurred in dag:
test_dag1
```
I think it might be useful to add `topological_sort` (and `tree_view`) to
be accessible via `dag.cli()`, so the external script can easily detect cycles
this way.
I also noticed that calling `dag.treeview()` does not detect cycle. In fact
it does not print anything when there is a cycle.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]