henriqueribeiro opened a new issue #20681:
URL: https://github.com/apache/airflow/issues/20681
### Apache Airflow version
2.1.4
### What happened
Recently I started to use TaskFlow API in some of my dag files where the
tasks are being dynamically generated and started to notice (a lot of) warning
messages in the logs. The messages look like this:
```bash
[2022-01-05 13:19:11,729] {baseoperator.py:1295} WARNING - Dependency
<Task(_PythonDecoratedOperator): 2_num_group.run_2>, 2_num_group.agg already
registered for DAG: test_warn_msg
[2022-01-05 13:19:11,730] {baseoperator.py:1295} WARNING - Dependency
<Task(_PythonDecoratedOperator): 2_num_group.agg>, 2_num_group.run_2 already
registered for DAG: test_warn_msg
[2022-01-05 13:19:11,730] {baseoperator.py:1295} WARNING - Dependency
<Task(_PythonDecoratedOperator): 2_num_group.run_1>, 2_num_group.agg already
registered for DAG: test_warn_msg
[2022-01-05 13:19:11,731] {baseoperator.py:1295} WARNING - Dependency
<Task(_PythonDecoratedOperator): 2_num_group.agg>, 2_num_group.run_1 already
registered for DAG: test_warn_msg
[2022-01-05 13:19:11,731] {baseoperator.py:1295} WARNING - Dependency
<Task(_PythonDecoratedOperator): 2_num_group.run_2>, 2_num_group.agg already
registered for DAG: test_warn_msg
```
And these messages keep appearing even when the DAG is not running and also
when it's paused. So I guess this happens at when parsing the DAG.
### What you expected to happen
I'm not sure if I'm doing something wrong, but this messages should not
appear.
### How to reproduce
Here is an DAG example to reproduce this issue:
```python
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
NUMS = [1, 2]
default_args = {
"owner": "henrique",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=15),
}
def print_id(num: int):
print(num)
return num
def run_tests():
results = []
for i in NUMS:
result = task(task_id=f"run_{i}")(print_id)(i)
results.append(result)
return results
@task()
def agg(results):
print(results)
@dag(
"test_warn_msg",
default_args=default_args,
schedule_interval="@once",
start_date=days_ago(1),
max_active_runs=1,
)
def test_supervisor():
task_start = DummyOperator(task_id="task_start")
task_end = DummyOperator(task_id="task_end")
groups = []
for i in NUMS:
with TaskGroup(group_id=f"{i}_num_group") as tg:
results = run_tests()
aggregation = agg(results)
groups.append(tg)
task_start >> groups >> task_end
data_dag = test_supervisor()
```
### Operating System
Cloud Composer
### Versions of Apache Airflow Providers
_No response_
### Deployment
Composer
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]