henriqueribeiro opened a new issue #20681:
URL: https://github.com/apache/airflow/issues/20681


   ### Apache Airflow version
   
   2.1.4
   
   ### What happened
   
   Recently I started to use TaskFlow API in some of my dag files where the 
tasks are being dynamically generated and started to notice (a lot of) warning 
messages in the logs. The messages look like this:
   ```bash
   [2022-01-05 13:19:11,729] {baseoperator.py:1295} WARNING - Dependency 
<Task(_PythonDecoratedOperator): 2_num_group.run_2>, 2_num_group.agg already 
registered for DAG: test_warn_msg
   [2022-01-05 13:19:11,730] {baseoperator.py:1295} WARNING - Dependency 
<Task(_PythonDecoratedOperator): 2_num_group.agg>, 2_num_group.run_2 already 
registered for DAG: test_warn_msg
   [2022-01-05 13:19:11,730] {baseoperator.py:1295} WARNING - Dependency 
<Task(_PythonDecoratedOperator): 2_num_group.run_1>, 2_num_group.agg already 
registered for DAG: test_warn_msg
   [2022-01-05 13:19:11,731] {baseoperator.py:1295} WARNING - Dependency 
<Task(_PythonDecoratedOperator): 2_num_group.agg>, 2_num_group.run_1 already 
registered for DAG: test_warn_msg
   [2022-01-05 13:19:11,731] {baseoperator.py:1295} WARNING - Dependency 
<Task(_PythonDecoratedOperator): 2_num_group.run_2>, 2_num_group.agg already 
registered for DAG: test_warn_msg
   ```
   
   And these messages keep appearing even when the DAG is not running and also 
when it's paused. So I guess this happens at when parsing the DAG.
   
   ### What you expected to happen
   
   I'm not sure if I'm doing something wrong, but this messages should not 
appear.
   
   ### How to reproduce
   
   Here is an DAG example to reproduce this issue:
   
   ```python
   from airflow.decorators import dag, task
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   from airflow.operators.dummy import DummyOperator
   from airflow.utils.task_group import TaskGroup
   
   
   NUMS = [1, 2]
   
   default_args = {
       "owner": "henrique",
       "depends_on_past": False,
       "email_on_failure": True,
       "email_on_retry": False,
       "retries": 3,
       "retry_delay": timedelta(minutes=15),
   }
   
   
   def print_id(num: int):
       print(num)
       return num
   
   
   def run_tests():
       results = []
       for i in NUMS:
           result = task(task_id=f"run_{i}")(print_id)(i)
           results.append(result)
   
       return results
   
   
   @task()
   def agg(results):
       print(results)
   
   
   @dag(
       "test_warn_msg",
       default_args=default_args,
       schedule_interval="@once",
       start_date=days_ago(1),
       max_active_runs=1,
   )
   def test_supervisor():
       task_start = DummyOperator(task_id="task_start")
       task_end = DummyOperator(task_id="task_end")
       groups = []
       for i in NUMS:
           with TaskGroup(group_id=f"{i}_num_group") as tg:
               results = run_tests()
               aggregation = agg(results)
   
               groups.append(tg)
   
       task_start >> groups >> task_end
   
   
   data_dag = test_supervisor()
   
   ```
   
   ### Operating System
   
   Cloud Composer
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to