yanshil opened a new issue, #56940:
URL: https://github.com/apache/airflow/issues/56940
### Apache Airflow version
3.1.0
### If "Other Airflow 2/3 version" selected, which one?
_No response_
### What happened?
It seems like renaming tag of a dag would cause problem in Airflow Dag
Processor.
Originally I have a dag A with `tags=['dangerous']`. Today I wrote another
dag B and tag it as DANGEROUS. I renamed tags in A as `tags=['DANGEROUS']` to
make them uniform, and then I found out my dag processor got crash with the
following message.
```
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 7, in <module>
sys.exit(main())
^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line
55, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line
114, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py",
line 54, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py",
line 53, in dag_processor
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py",
line 56, in <lambda>
callback=lambda: run_job(job=job_runner.job,
execute_callable=job_runner._execute),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 100, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line
368, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line
397, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py",
line 61, in _execute
self.processor.run()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 272, in run
return self._run_parsing_loop()
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 361, in _run_parsing_loop
self._collect_results()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 100, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 827, in _collect_results
self._file_stats[file] = process_parse_results(
^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 1155, in process_parse_results
update_dag_parsing_results_in_db(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 372, in update_dag_parsing_results_in_db
for attempt in run_with_db_retries(logger=log):
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
445, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
378, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
400, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 449,
in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 401,
in __get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 382, in update_dag_parsing_results_in_db
SerializedDAG.bulk_write_to_db(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
line 2867, in bulk_write_to_db
orm_dags = dag_op.add_dags(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 450, in add_dags
orm_dags = self.find_orm_dags(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 447, in find_orm_dags
return {dm.dag_id: dm for dm in session.scalars(stmt).unique()}
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1778, in scalars
return self.execute(
^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1716, in execute
conn = self._connection_for_bind(bind)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1555, in _connection_for_bind
return self._transaction._connection_for_bind(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 724, in _connection_for_bind
self._assert_active()
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 604, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (MySQLdb.IntegrityError) (1062, "Duplicate entry
'DANGEROUS-modify_dr_conf' for key 'dag_tag.PRIMARY'")
[SQL: INSERT INTO dag_tag (name, dag_id) VALUES (%s, %s)]
[parameters: ('DANGEROUS', 'modify_dr_conf')]
```
I have to rename the tag back to lowercase in dag A to make dag-process
alive again. But now I got `dangerous` in dag A and `DANGEROUS` in dag B, and I
can't even rename `DANGEROUS` to `dangerous` because I think there might be a
similar case for dag B if I do so.
<img width="2347" height="711" alt="Image"
src="https://github.com/user-attachments/assets/6e8fff99-6d6d-4fce-8f94-fc053d77c8c6"
/>
---
I tried another possible solution, which not works.
1. Delete the dag A with original lowercase `dangerous` tag on UI
2. Wait for the delete finished and dag disappear
3. Upload new dag A with new UPPERCASE `DANGEROUS` tag
4. But the UI still show the old lowercase tag, and the dag-processor
crashed.
### What you think should happen instead?
_No response_
### How to reproduce
I suppose simply renaming a lowercase tag to uppercase might be able to
reproduce the problem, but I will still attach my full action list.
1. I created a dag A with tag=['dangerous']
```
@dag(
dag_id="modify_dr_conf",
tags=["dangerous", "maintenance", "database"],
)
def modify_dr_conf(): ...
```
2. I created a dag B with tag=['DANGEROUS']
```
@dag(
dag_id="airflow_clean_db",
tags=["DANGEROUS", "maintenance", "database"],
)
def airflow_clean_db(): ...
```
3. I decided to rename tag in A as `DANGEROUS`, and now it is
```
@dag(
dag_id="modify_dr_conf",
tags=["DANGEROUS", "maintenance", "database"],
)
def modify_dr_conf(): ...
```
### Operating System
Helm Chart 1.18.0 with Docker Image based on
`apache/airflow:3.1.0-python3.11`
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
Helm Chart 1.18.0
### Anything else?
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]