yanshil opened a new issue, #56940:
URL: https://github.com/apache/airflow/issues/56940

   ### Apache Airflow version
   
   3.1.0
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   It seems like renaming tag of a dag would cause problem in Airflow Dag 
Processor.
   
   Originally I have a dag A with `tags=['dangerous']`. Today I wrote another 
dag B and tag it as DANGEROUS. I renamed tags in A as `tags=['DANGEROUS']` to 
make them uniform, and then I found out my dag processor got crash with the 
following message.
   
   ```
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 7, in <module>
       sys.exit(main())
                ^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 
55, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 
114, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py",
 line 53, in dag_processor
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py",
 line 56, in <lambda>
       callback=lambda: run_job(job=job_runner.job, 
execute_callable=job_runner._execute),
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 
368, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 
397, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 272, in run
       return self._run_parsing_loop()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 361, in _run_parsing_loop
       self._collect_results()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 827, in _collect_results
       self._file_stats[file] = process_parse_results(
                                ^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 1155, in process_parse_results
       update_dag_parsing_results_in_db(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 372, in update_dag_parsing_results_in_db
       for attempt in run_with_db_retries(logger=log):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
445, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
378, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
400, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 449, 
in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 401, 
in __get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 382, in update_dag_parsing_results_in_db
       SerializedDAG.bulk_write_to_db(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2867, in bulk_write_to_db
       orm_dags = dag_op.add_dags(session=session)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 450, in add_dags
       orm_dags = self.find_orm_dags(session=session)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 447, in find_orm_dags
       return {dm.dag_id: dm for dm in session.scalars(stmt).unique()}
                                       ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1778, in scalars
       return self.execute(
              ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1716, in execute
       conn = self._connection_for_bind(bind)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1555, in _connection_for_bind
       return self._transaction._connection_for_bind(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 724, in _connection_for_bind
       self._assert_active()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 604, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (MySQLdb.IntegrityError) (1062, "Duplicate entry 
'DANGEROUS-modify_dr_conf' for key 'dag_tag.PRIMARY'")
   [SQL: INSERT INTO dag_tag (name, dag_id) VALUES (%s, %s)]
   [parameters: ('DANGEROUS', 'modify_dr_conf')]
   ```
   
   I have to rename the tag back to lowercase in dag A to make dag-process 
alive again. But now I got `dangerous` in dag A and `DANGEROUS` in dag B, and I 
can't even rename `DANGEROUS` to `dangerous` because I think there might be a 
similar case for dag B if I do so.
   
   <img width="2347" height="711" alt="Image" 
src="https://github.com/user-attachments/assets/6e8fff99-6d6d-4fce-8f94-fc053d77c8c6";
 />
   
   ---
   
   I tried another possible solution, which not works. 
   1. Delete the dag A with original lowercase `dangerous` tag on UI
   2. Wait for the delete finished and dag disappear
   3. Upload new dag A with new UPPERCASE `DANGEROUS` tag
   4. But the UI still show the old lowercase tag, and the dag-processor 
crashed.
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   I suppose simply renaming a lowercase tag to uppercase might be able to 
reproduce the problem, but I will still attach my full action list.
   
   1. I created a dag A with tag=['dangerous']
   ```
   @dag(
       dag_id="modify_dr_conf",
       tags=["dangerous", "maintenance", "database"],
   )
   def modify_dr_conf(): ...
   ```
   
   2. I created a dag B with tag=['DANGEROUS']
   ```
   
   @dag(
       dag_id="airflow_clean_db",
       tags=["DANGEROUS", "maintenance", "database"],
   )
   def airflow_clean_db(): ...
   ```
   
   3. I decided to rename tag in A as `DANGEROUS`, and now it is
   ```
   @dag(
       dag_id="modify_dr_conf",
       tags=["DANGEROUS", "maintenance", "database"],
   )
   def modify_dr_conf(): ...
   ```
   
   
   
   ### Operating System
   
   Helm Chart 1.18.0 with Docker Image based on 
`apache/airflow:3.1.0-python3.11`
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Helm Chart 1.18.0
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to