josh-fell opened a new issue, #23285:
URL: https://github.com/apache/airflow/issues/23285

   ### Apache Airflow version
   
   2.3.0b1 (pre-release)
   
   ### What happened
   
   When attempting to create a DAG containing Task Groups and in those Task 
Groups there are Labels between nodes, the DAG fails to import due to cycle 
detection. 
   
   Consider this DAG:
   
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag, task, task_group
   from airflow.utils.edgemodifier import Label
   
   
   @task
   def begin():
       ...
   
   
   @task
   def end():
       ...
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def task_groups_with_edge_labels():
       @task_group
       def group():
           begin() >> Label("label") >> end()
   
       group()
   
   
   _ = task_groups_with_edge_labels()
   
   ```
   
   When attempting to import the DAG, this error message is displayed:
   <img width="1395" alt="image" 
src="https://user-images.githubusercontent.com/48934154/165566299-3dd65cff-5e36-47d3-a243-7bc33d4344d6.png";>
   
   
   This also occurs on the `main` branch as well.
   
   ### What you think should happen instead
   
   Users should be able to specify Labels between tasks within a Task Group. 
   
   ### How to reproduce
   
   - Use the DAG mentioned above and try to import into an Airflow environment
   
   - Or, create a simple unit test of the following and execute said test.
   ```python
       def test_cycle_task_group_with_edge_labels(self):
           from airflow.models.baseoperator import chain
           from airflow.utils.task_group import TaskGroup
           from airflow.utils.edgemodifier import Label
   
           dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 
'owner1'})
   
           with dag:
               with TaskGroup(group_id="task_group") as task_group:
                   op1 = EmptyOperator(task_id='A')
                   op2 = EmptyOperator(task_id='B')
   
                   op1 >> Label("label") >> op2
   
           assert not check_cycle(dag)
   ```
   
   A `AirflowDagCycleException` should be thrown:
   ```
   
tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels
 FAILED                                                                         
                           [100%]
   
   
===============================================================================================
 FAILURES 
===============================================================================================
   ________________________________________________________________________ 
TestCycleTester.test_cycle_task_group_with_edge_labels 
________________________________________________________________________
   
   self = <tests.utils.test_dag_cycle.TestCycleTester 
testMethod=test_cycle_task_group_with_edge_labels>
   
       def test_cycle_task_group_with_edge_labels(self):
           from airflow.models.baseoperator import chain
           from airflow.utils.task_group import TaskGroup
           from airflow.utils.edgemodifier import Label
   
           dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 
'owner1'})
   
           with dag:
               with TaskGroup(group_id="task_group") as task_group:
                   op1 = EmptyOperator(task_id='A')
                   op2 = EmptyOperator(task_id='B')
   
                   op1 >> Label("label") >> op2
   
   >       assert not check_cycle(dag)
   
   tests/utils/test_dag_cycle.py:168:
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   airflow/utils/dag_cycle_tester.py:76: in check_cycle
       child_to_check = _check_adjacent_tasks(current_task_id, task)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   task_id = 'task_group.B', current_task = <Task(EmptyOperator): task_group.B>
   
       def _check_adjacent_tasks(task_id, current_task):
           """Returns first untraversed child task, else None if all tasks 
traversed."""
           for adjacent_task in current_task.get_direct_relative_ids():
               if visited[adjacent_task] == CYCLE_IN_PROGRESS:
                   msg = f"Cycle detected in DAG. Faulty task: {task_id}"
   >               raise AirflowDagCycleException(msg)
   E               airflow.exceptions.AirflowDagCycleException: Cycle detected 
in DAG. Faulty task: task_group.B
   
   airflow/utils/dag_cycle_tester.py:62: AirflowDagCycleException
   
----------------------------------------------------------------------------------------
 Captured stdout setup 
-----------------------------------------------------------------------------------------
   ========================= AIRFLOW ==========================
   Home of the user: /root
   Airflow home /root/airflow
   Skipping initializing of the DB as it was initialized already.
   You can re-initialize the database by adding --with-db-init flag when 
running tests.
   
=======================================================================================
 short test summary info 
========================================================================================
   FAILED 
tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels
 - airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty 
task: task_group.B
   
====================================================================================
 1 failed, 2 warnings in 1.08s 
=====================================================================================
   ```
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   This issue also occurs on the `main` branch using Breeze.
   
   ### Anything else
   
   Possibly related to #21404
   
   When the Label is removed, no cycle is detected.
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag, task, task_group
   from airflow.utils.edgemodifier import Label
   
   
   @task
   def begin():
       ...
   
   
   @task
   def end():
       ...
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def task_groups_with_edge_labels():
       @task_group
       def group():
           begin() >> end()
   
       group()
   
   
   _ = task_groups_with_edge_labels()
   ```
   <img width="1437" alt="image" 
src="https://user-images.githubusercontent.com/48934154/165566908-a521d685-a032-482e-9e6b-ef85f0743e64.png";>
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to