This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5013aad00b3 Fix dag-processor crash when renaming DAG tag case on 
MySQL (#57113)
5013aad00b3 is described below

commit 5013aad00b3b76e442861f8233c2691845f1fff1
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Oct 23 20:36:21 2025 +0100

    Fix dag-processor crash when renaming DAG tag case on MySQL (#57113)
    
    When a user changed only the case of a DAG tag (e.g., 'dangerous' to
    'DANGEROUS'), the dag-processor would crash with a duplicate key error
    on MySQL due to case-insensitive collation in the PRIMARY KEY. This
    occurred because SQLAlchemy executed INSERT operations before DELETE
    operations during the flush.
    
    The fix ensures DELETE operations complete before attempting INSERT
    operations by explicitly flushing and refreshing the tag relationship
    from the database.
    
    Fixes #56940
---
 .../src/airflow/dag_processing/collection.py       | 24 +++++++++++++++-
 .../tests/unit/dag_processing/test_collection.py   | 32 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index 31a664b7e02..6634754a58d 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -159,10 +159,32 @@ class _RunInfo(NamedTuple):
 
 def _update_dag_tags(tag_names: set[str], dm: DagModel, *, session: Session) 
-> None:
     orm_tags = {t.name: t for t in dm.tags}
+    tags_to_delete = []
     for name, orm_tag in orm_tags.items():
         if name not in tag_names:
             session.delete(orm_tag)
-    dm.tags.extend(DagTag(name=name, dag_id=dm.dag_id) for name in 
tag_names.difference(orm_tags))
+            tags_to_delete.append(orm_tag)
+
+    tags_to_add = tag_names.difference(orm_tags)
+    if tags_to_delete:
+        # Remove deleted tags from the collection to keep it in sync
+        for tag in tags_to_delete:
+            dm.tags.remove(tag)
+
+        # Check if there's a potential case-only rename on MySQL (e.g., 'tag' 
-> 'TAG').
+        # MySQL uses case-insensitive collation for the (name, dag_id) primary 
key by default,
+        # which can cause duplicate key errors when renaming tags with only 
case changes.
+        if get_dialect_name(session) == "mysql":
+            orm_tags_lower = {name.lower(): name for name in orm_tags}
+            has_case_only_change = any(tag.lower() in orm_tags_lower for tag 
in tags_to_add)
+
+            if has_case_only_change:
+                # Force DELETE operations to execute before INSERT operations.
+                session.flush()
+                # Refresh the tags relationship from the database to reflect 
the deletions.
+                session.expire(dm, ["tags"])
+
+    dm.tags.extend(DagTag(name=name, dag_id=dm.dag_id) for name in tags_to_add)
 
 
 def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *, 
session: Session) -> None:
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index e7d12f62c32..92c03e97c52 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -37,6 +37,7 @@ from airflow.dag_processing.collection import (
     AssetModelOperation,
     DagModelOperation,
     _get_latest_runs_stmt,
+    _update_dag_tags,
     update_dag_parsing_results_in_db,
 )
 from airflow.exceptions import SerializationError
@@ -48,6 +49,7 @@ from airflow.models.asset import (
     DagScheduleAssetNameReference,
     DagScheduleAssetUriReference,
 )
+from airflow.models.dag import DagTag
 from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -941,3 +943,33 @@ class TestUpdateDagParsingResults:
             update_dag_parsing_results_in_db("testing", None, [dag], {}, 0.1, 
set(), session)
             orm_dag = session.get(DagModel, "dag_max_failed_runs_default")
             assert orm_dag.max_consecutive_failed_dag_runs == 6
+
+
[email protected]_test
+class TestUpdateDagTags:
+    @pytest.fixture(autouse=True)
+    def setup_teardown(self, session):
+        yield
+        session.query(DagModel).filter(DagModel.dag_id == "test_dag").delete()
+        session.commit()
+
+    @pytest.mark.parametrize(
+        ["initial_tags", "new_tags", "expected_tags"],
+        [
+            (["dangerous"], {"DANGEROUS"}, {"DANGEROUS"}),
+            (["existing"], {"existing", "new"}, {"existing", "new"}),
+            (["tag1", "tag2"], {"tag1"}, {"tag1"}),
+            (["keep", "remove", "lowercase"], {"keep", "LOWERCASE", "new"}, 
{"keep", "LOWERCASE", "new"}),
+            (["tag1", "tag2"], set(), set()),
+        ],
+    )
+    def test_update_dag_tags(self, testing_dag_bundle, session, initial_tags, 
new_tags, expected_tags):
+        dag_model = DagModel(dag_id="test_dag", bundle_name="testing")
+        dag_model.tags = [DagTag(name=tag, dag_id="test_dag") for tag in 
initial_tags]
+        session.add(dag_model)
+        session.commit()
+
+        _update_dag_tags(new_tags, dag_model, session=session)
+        session.commit()
+
+        assert {t.name for t in dag_model.tags} == expected_tags

Reply via email to