rustikk opened a new issue #21972:
URL: https://github.com/apache/airflow/issues/21972


   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   I'm getting an import error of ` Cycle detected in DAG. Faulty task: 
group1.dummy4`
   when I load this dag into airflow
   
   ### What you expected to happen
   
   I expected the dag to have no import errors. It imports fine in version 
2.2.4-2 but when switching to version 2.3.0.dev20220302 it gives the import 
error described above.
   
   ### How to reproduce
   
   ```
   from airflow.utils.edgemodifier import Label
   from airflow.models.baseoperator import chain
   from airflow.decorators import dag, task, task_group
   from airflow.utils.dot_renderer import render_dag
   from textwrap import indent
   
   from datetime import datetime, timedelta
   
   two_days = datetime.now() - timedelta(days=2)
   
   @dag(dag_id="taskflow_compare2",
       schedule_interval=None,
       start_date=two_days,
       tags=['core'])
   def task_grouper():
   
       @task
       def assert_homomorphic(task_group_names, **context):
           """
           The structure of all of the task groups above should be the same
           """
           # get the dag in dot notation, focus only on its edges
           dag = context["dag"]
           #gives string which represents whole dag structure
           graph = render_dag(dag)
           print("Whole DAG:")
           print(indent(str(graph), "    "))
           lines = list(filter(lambda x: "->" in x, str(graph).split("\n")))
   
           # bin them by task group, then remove the group names
           group_strings = []
           #removes everything thats not a task name
           for name in task_group_names:
               print(name)
               relevant_lines = filter(lambda x: name in x, lines)
               normalized_lines = map(
               lambda x: x.strip().replace(name, ""), sorted(relevant_lines)
               )
               edges_str = "\n".join(normalized_lines)
               group_strings.append(edges_str)
               print(indent(edges_str, "    "))
   
           # these should be identical
           for xgroup, ygroup in zip(group_strings, group_strings[1:]):
               assert xgroup == ygroup
   
       @task_group(group_id="group1")
       def grouper1():
   
           @task
           def dummy00():
               return 0
       
           @task
           def dummy0():
               return 0
       
           @task
           def dummy1(val):
               return val + 1
   
           @task
           def dummy2(val):
               return val + 2
       
           @task
           def dummy3(val):
               return val + 3
       
           @task
           def dummy4(val):
               return val + 4
           
           @task
           def dummy5(val):
               return val + 5
           
           @task
           def dummy6(val):
               return val + 6
           
           @task
           def dummy7(val):
               return val + 7
           
           @task
           def dummy8(val):
               return val + 8
           
           @task
           def dummy9(val):
               return val + 9
           
           @task
           def dummy10(val):
               return val + 10
   
           d00 = dummy00()
           d0 = dummy0()
           d1 = dummy1(0)
           d2 = dummy2(0)
           d3 = dummy3(0)
           d4 = dummy4(0)
           d5 = dummy5(0)
           d6 = dummy6(0)
           d7 = dummy7(0)
           d8 = dummy8(0)
           d9 = dummy9(0)
           d10 = dummy10(0)
   
           chain(d00, [d0, d1, d2], d3, [Label("branch one"), Label("branch 
two"), Label("branch three"), Label("branch four"), Label("branch five")], [d4, 
d5, d6, d7, d8], d9, d10)
           return d10
       
       @task_group(group_id="group2")
       def grouper2():
           
           @task
           def dummy00():
               return 0
   
           @task
           def dummy0():
               return 0
       
           @task
           def dummy1(val):
               return val + 1
       
           @task
           def dummy2(val):
               return val + 2
       
           @task
           def dummy3(val):
               return val + 3
       
           @task
           def dummy4(val):
               return val + 4
           
           @task
           def dummy5(val):
               return val + 5
           
           @task
           def dummy6(val):
               return val + 6
   
           @task
           def dummy7(val):
               return val + 7
           
           @task
           def dummy8(val):
               return val + 8
           
           @task
           def dummy9(val):
               return val + 9
           
           @task
           def dummy10(val):
               return val + 10
   
           d00 = dummy00()
           d0 = dummy0()
           d1 = dummy1(0)
           d2 = dummy2(0)
           d3 = dummy3(0)
           d4 = dummy4(0)
           d5 = dummy5(0)
           d6 = dummy6(0)
           d7 = dummy7(0)
           d8 = dummy8(0)
           d9 = dummy9(0)
           d10 = dummy10(0)
   
           d00.set_downstream(d0)
           d00.set_downstream(d1)
           d00.set_downstream(d2)
           d0.set_downstream(d3)
           d1.set_downstream(d3)
           d2.set_downstream(d3)
           d3.set_downstream(d4, edge_modifier=Label("branch one"))
           d3.set_downstream(d5, edge_modifier=Label("branch two"))
           d3.set_downstream(d6, edge_modifier=Label("branch three"))
           d3.set_downstream(d7, edge_modifier=Label("branch four"))
           d3.set_downstream(d8, edge_modifier=Label("branch five"))
           d4.set_downstream(d9)
           d5.set_downstream(d9)
           d6.set_downstream(d9)
           d7.set_downstream(d9)
           d8.set_downstream(d9)
           d9.set_downstream(d10)
   
           #return dummy4(dummy3(dummy2(dummy1(dummy0()))))
           return d10
   
       tg1 = grouper1()
       tg2 = grouper2()
   
       [tg1, tg2] >> assert_homomorphic(["group1", "group2"])
   dag = task_grouper()
   
   
   
   ```
   
   ### Operating System
   
   Docker (debian:buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Using the Astro CLI with this image:
   quay.io/astronomer/ap-airflow-dev:main
   
   ### Anything else
   
   Happens every time.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to