josh-fell opened a new issue, #23285:
URL: https://github.com/apache/airflow/issues/23285
### Apache Airflow version
2.3.0b1 (pre-release)
### What happened
When attempting to create a DAG containing Task Groups and in those Task
Groups there are Labels between nodes, the DAG fails to import due to cycle
detection.
Consider this DAG:
```python
from pendulum import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label
@task
def begin():
...
@task
def end():
...
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
@task_group
def group():
begin() >> Label("label") >> end()
group()
_ = task_groups_with_edge_labels()
```
When attempting to import the DAG, this error message is displayed:
<img width="1395" alt="image"
src="https://user-images.githubusercontent.com/48934154/165566299-3dd65cff-5e36-47d3-a243-7bc33d4344d6.png">
This also occurs on the `main` branch as well.
### What you think should happen instead
Users should be able to specify Labels between tasks within a Task Group.
### How to reproduce
- Use the DAG mentioned above and try to import into an Airflow environment
- Or, create a simple unit test of the following and execute said test.
```python
def test_cycle_task_group_with_edge_labels(self):
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.utils.edgemodifier import Label
dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner':
'owner1'})
with dag:
with TaskGroup(group_id="task_group") as task_group:
op1 = EmptyOperator(task_id='A')
op2 = EmptyOperator(task_id='B')
op1 >> Label("label") >> op2
assert not check_cycle(dag)
```
A `AirflowDagCycleException` should be thrown:
```
tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels
FAILED
[100%]
===============================================================================================
FAILURES
===============================================================================================
________________________________________________________________________
TestCycleTester.test_cycle_task_group_with_edge_labels
________________________________________________________________________
self = <tests.utils.test_dag_cycle.TestCycleTester
testMethod=test_cycle_task_group_with_edge_labels>
def test_cycle_task_group_with_edge_labels(self):
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.utils.edgemodifier import Label
dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner':
'owner1'})
with dag:
with TaskGroup(group_id="task_group") as task_group:
op1 = EmptyOperator(task_id='A')
op2 = EmptyOperator(task_id='B')
op1 >> Label("label") >> op2
> assert not check_cycle(dag)
tests/utils/test_dag_cycle.py:168:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airflow/utils/dag_cycle_tester.py:76: in check_cycle
child_to_check = _check_adjacent_tasks(current_task_id, task)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
task_id = 'task_group.B', current_task = <Task(EmptyOperator): task_group.B>
def _check_adjacent_tasks(task_id, current_task):
"""Returns first untraversed child task, else None if all tasks
traversed."""
for adjacent_task in current_task.get_direct_relative_ids():
if visited[adjacent_task] == CYCLE_IN_PROGRESS:
msg = f"Cycle detected in DAG. Faulty task: {task_id}"
> raise AirflowDagCycleException(msg)
E airflow.exceptions.AirflowDagCycleException: Cycle detected
in DAG. Faulty task: task_group.B
airflow/utils/dag_cycle_tester.py:62: AirflowDagCycleException
----------------------------------------------------------------------------------------
Captured stdout setup
-----------------------------------------------------------------------------------------
========================= AIRFLOW ==========================
Home of the user: /root
Airflow home /root/airflow
Skipping initializing of the DB as it was initialized already.
You can re-initialize the database by adding --with-db-init flag when
running tests.
=======================================================================================
short test summary info
========================================================================================
FAILED
tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels
- airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty
task: task_group.B
====================================================================================
1 failed, 2 warnings in 1.08s
=====================================================================================
```
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
N/A
### Deployment
Astronomer
### Deployment details
This issue also occurs on the `main` branch using Breeze.
### Anything else
Possibly related to #21404
When the Label is removed, no cycle is detected.
```python
from pendulum import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label
@task
def begin():
...
@task
def end():
...
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
@task_group
def group():
begin() >> end()
group()
_ = task_groups_with_edge_labels()
```
<img width="1437" alt="image"
src="https://user-images.githubusercontent.com/48934154/165566908-a521d685-a032-482e-9e6b-ef85f0743e64.png">
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]