This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c8dd3574272 Fix SQLite downgrade failures caused by FK constraints 
during batch table recreation (#63437)
c8dd3574272 is described below

commit c8dd3574272acc0cc48c6689fa970d2ed141056e
Author: Rahul Vats <[email protected]>
AuthorDate: Tue Mar 17 15:43:13 2026 +0530

    Fix SQLite downgrade failures caused by FK constraints during batch table 
recreation (#63437)
    
    * Fix SQLite downgrade failures caused by FK constraints during batch table 
recreation
---
 .../0095_3_2_0_update_orm_asset_partitioning.py    | 15 ++++++++-----
 ..._enforce_log_event_and_dag_is_stale_not_null.py | 22 +++++++++++-------
 ...3_2_0_add_exceeds_max_runs_flag_to_dag_model.py |  7 ++++--
 ...0_3_2_0_add_timetable_type_to_dag_table_for_.py | 14 ++++++++----
 .../0105_3_2_0_add_allowed_run_types_to_dag.py     |  7 ++++--
 ..._3_2_0_add_partition_key_to_backfill_dag_run.py | 26 +++++++++++++---------
 .../0107_3_2_0_add_partition_fields_to_dag.py      | 15 ++++++++-----
 7 files changed, 67 insertions(+), 39 deletions(-)

diff --git 
a/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
 
b/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
index 08568750ee6..06162be639e 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
@@ -80,11 +80,14 @@ def upgrade():
 
 def downgrade():
     """Unapply Update ORM for asset partitioning."""
-    with op.batch_alter_table("dag_run", schema=None) as batch_op:
-        batch_op.drop_column("partition_key")
+    from airflow.migrations.utils import disable_sqlite_fkeys
 
-    with op.batch_alter_table("asset_event", schema=None) as batch_op:
-        batch_op.drop_column("partition_key")
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("dag_run", schema=None) as batch_op:
+            batch_op.drop_column("partition_key")
+
+        with op.batch_alter_table("asset_event", schema=None) as batch_op:
+            batch_op.drop_column("partition_key")
 
-    op.drop_table("partitioned_asset_key_log")
-    op.drop_table("asset_partition_dag_run")
+        op.drop_table("partitioned_asset_key_log")
+        op.drop_table("asset_partition_dag_run")
diff --git 
a/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
 
b/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
index bcac927d00e..49fb4bcc3f7 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
@@ -29,6 +29,8 @@ from __future__ import annotations
 import sqlalchemy as sa
 from alembic import op
 
+from airflow.migrations.utils import disable_sqlite_fkeys
+
 revision = "edc4f85a4619"
 down_revision = "b12d4f98a91e"
 branch_labels = None
@@ -40,19 +42,23 @@ def upgrade():
     """Bring existing deployments in line with 0010 and 0067."""
     # Ensure `log.event` can safely transition to NOT NULL.
     op.execute("UPDATE log SET event = '' WHERE event IS NULL")
-    with op.batch_alter_table("log") as batch_op:
-        batch_op.alter_column("event", existing_type=sa.String(60), 
nullable=False)
 
     # Make sure DAG rows that survived the old 0067 path are not NULL.
     op.execute("UPDATE dag SET is_stale = false WHERE is_stale IS NULL")
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.alter_column("is_stale", existing_type=sa.Boolean, 
nullable=False)
+
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("log") as batch_op:
+            batch_op.alter_column("event", existing_type=sa.String(60), 
nullable=False)
+
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.alter_column("is_stale", existing_type=sa.Boolean, 
nullable=False)
 
 
 def downgrade():
     """Allow the columns to accept NULL again for older state reversions."""
-    with op.batch_alter_table("log") as batch_op:
-        batch_op.alter_column("event", existing_type=sa.String(60), 
nullable=True)
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("log") as batch_op:
+            batch_op.alter_column("event", existing_type=sa.String(60), 
nullable=True)
 
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.alter_column("is_stale", existing_type=sa.Boolean, 
nullable=True)
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.alter_column("is_stale", existing_type=sa.Boolean, 
nullable=True)
diff --git 
a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
 
b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
index 2a3f9c48573..233e0c95557 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
@@ -52,5 +52,8 @@ def upgrade():
 
 def downgrade():
     """Unapply Add exceeds max runs flag to dag model."""
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.drop_column("exceeds_max_non_backfill")
+    from airflow.migrations.utils import disable_sqlite_fkeys
+
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.drop_column("exceeds_max_non_backfill")
diff --git 
a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
 
b/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
index 86049d06d72..47349528c6e 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
@@ -40,16 +40,22 @@ airflow_version = "3.2.0"
 
 def upgrade():
     """Apply add timetable_type to dag table for filtering."""
+    from airflow.migrations.utils import disable_sqlite_fkeys
+
     with op.batch_alter_table("dag", schema=None) as batch_op:
         batch_op.add_column(sa.Column("timetable_type", sa.String(length=255)))
 
     op.execute("UPDATE dag SET timetable_type = '' WHERE timetable_type IS 
NULL")
 
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.alter_column("timetable_type", 
existing_type=sa.String(length=255), nullable=False)
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.alter_column("timetable_type", 
existing_type=sa.String(length=255), nullable=False)
 
 
 def downgrade():
     """Unapply add timetable_type to dag table for filtering."""
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.drop_column("timetable_type")
+    from airflow.migrations.utils import disable_sqlite_fkeys
+
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.drop_column("timetable_type")
diff --git 
a/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
 
b/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
index 9a5ff5d5cad..6f4fb0ef132 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
@@ -46,5 +46,8 @@ def upgrade():
 
 def downgrade():
     """Remove allowed_run_types column from dag table."""
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.drop_column("allowed_run_types")
+    from airflow.migrations.utils import disable_sqlite_fkeys
+
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.drop_column("allowed_run_types")
diff --git 
a/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
 
b/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
index 69868e0d9ab..422e9b8f70d 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
@@ -31,6 +31,7 @@ import sqlalchemy as sa
 from alembic import op
 
 from airflow.migrations.db_types import StringID
+from airflow.migrations.utils import disable_sqlite_fkeys
 from airflow.utils.sqlalchemy import UtcDateTime
 
 revision = "134de42d3cb0"
@@ -44,20 +45,23 @@ def upgrade():
     """Apply Add partition_key to backfill_dag_run."""
     op.add_column("dag_run", sa.Column("created_at", 
UtcDateTime(timezone=True), nullable=True))
     op.execute("update dag_run set created_at = run_after;")
-    with op.batch_alter_table("dag_run", schema=None) as batch_op:
-        batch_op.alter_column("created_at", 
existing_type=UtcDateTime(timezone=True), nullable=False)
 
-    with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
-        batch_op.add_column(sa.Column("partition_key", StringID(), 
nullable=True))
-        batch_op.alter_column("logical_date", existing_type=sa.TIMESTAMP(), 
nullable=True)
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("dag_run", schema=None) as batch_op:
+            batch_op.alter_column("created_at", 
existing_type=UtcDateTime(timezone=True), nullable=False)
+
+        with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
+            batch_op.add_column(sa.Column("partition_key", StringID(), 
nullable=True))
+            batch_op.alter_column("logical_date", 
existing_type=sa.TIMESTAMP(), nullable=True)
 
 
 def downgrade():
     """Unapply Add partition_key to backfill_dag_run."""
-    op.execute("DELETE FROM backfill_dag_run WHERE logical_date IS NULL;")
-    with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
-        batch_op.alter_column("logical_date", existing_type=sa.TIMESTAMP(), 
nullable=False)
-        batch_op.drop_column("partition_key")
+    with disable_sqlite_fkeys(op):
+        op.execute("DELETE FROM backfill_dag_run WHERE logical_date IS NULL;")
+        with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
+            batch_op.alter_column("logical_date", 
existing_type=sa.TIMESTAMP(), nullable=False)
+            batch_op.drop_column("partition_key")
 
-    with op.batch_alter_table("dag_run", schema=None) as batch_op:
-        batch_op.drop_column("created_at")
+        with op.batch_alter_table("dag_run", schema=None) as batch_op:
+            batch_op.drop_column("created_at")
diff --git 
a/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
 
b/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
index a3d31eadc53..52abaee3560 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
@@ -53,9 +53,12 @@ def upgrade():
 
 def downgrade():
     """Remove partition fields from DagModel."""
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.drop_column("timetable_partitioned")
-        batch_op.drop_column("next_dagrun_partition_key")
-        batch_op.drop_column("next_dagrun_partition_date")
-    with op.batch_alter_table("dag_run", schema=None) as batch_op:
-        batch_op.drop_column("partition_date")
+    from airflow.migrations.utils import disable_sqlite_fkeys
+
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.drop_column("timetable_partitioned")
+            batch_op.drop_column("next_dagrun_partition_key")
+            batch_op.drop_column("next_dagrun_partition_date")
+        with op.batch_alter_table("dag_run", schema=None) as batch_op:
+            batch_op.drop_column("partition_date")

Reply via email to