This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c8dd3574272 Fix SQLite downgrade failures caused by FK constraints
during batch table recreation (#63437)
c8dd3574272 is described below
commit c8dd3574272acc0cc48c6689fa970d2ed141056e
Author: Rahul Vats <[email protected]>
AuthorDate: Tue Mar 17 15:43:13 2026 +0530
Fix SQLite downgrade failures caused by FK constraints during batch table
recreation (#63437)
* Fix SQLite downgrade failures caused by FK constraints during batch table
recreation
---
.../0095_3_2_0_update_orm_asset_partitioning.py | 15 ++++++++-----
..._enforce_log_event_and_dag_is_stale_not_null.py | 22 +++++++++++-------
...3_2_0_add_exceeds_max_runs_flag_to_dag_model.py | 7 ++++--
...0_3_2_0_add_timetable_type_to_dag_table_for_.py | 14 ++++++++----
.../0105_3_2_0_add_allowed_run_types_to_dag.py | 7 ++++--
..._3_2_0_add_partition_key_to_backfill_dag_run.py | 26 +++++++++++++---------
.../0107_3_2_0_add_partition_fields_to_dag.py | 15 ++++++++-----
7 files changed, 67 insertions(+), 39 deletions(-)
diff --git
a/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
b/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
index 08568750ee6..06162be639e 100644
---
a/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
+++
b/airflow-core/src/airflow/migrations/versions/0095_3_2_0_update_orm_asset_partitioning.py
@@ -80,11 +80,14 @@ def upgrade():
def downgrade():
"""Unapply Update ORM for asset partitioning."""
- with op.batch_alter_table("dag_run", schema=None) as batch_op:
- batch_op.drop_column("partition_key")
+ from airflow.migrations.utils import disable_sqlite_fkeys
- with op.batch_alter_table("asset_event", schema=None) as batch_op:
- batch_op.drop_column("partition_key")
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("dag_run", schema=None) as batch_op:
+ batch_op.drop_column("partition_key")
+
+ with op.batch_alter_table("asset_event", schema=None) as batch_op:
+ batch_op.drop_column("partition_key")
- op.drop_table("partitioned_asset_key_log")
- op.drop_table("asset_partition_dag_run")
+ op.drop_table("partitioned_asset_key_log")
+ op.drop_table("asset_partition_dag_run")
diff --git
a/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
b/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
index bcac927d00e..49fb4bcc3f7 100644
---
a/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
+++
b/airflow-core/src/airflow/migrations/versions/0097_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py
@@ -29,6 +29,8 @@ from __future__ import annotations
import sqlalchemy as sa
from alembic import op
+from airflow.migrations.utils import disable_sqlite_fkeys
+
revision = "edc4f85a4619"
down_revision = "b12d4f98a91e"
branch_labels = None
@@ -40,19 +42,23 @@ def upgrade():
"""Bring existing deployments in line with 0010 and 0067."""
# Ensure `log.event` can safely transition to NOT NULL.
op.execute("UPDATE log SET event = '' WHERE event IS NULL")
- with op.batch_alter_table("log") as batch_op:
- batch_op.alter_column("event", existing_type=sa.String(60),
nullable=False)
# Make sure DAG rows that survived the old 0067 path are not NULL.
op.execute("UPDATE dag SET is_stale = false WHERE is_stale IS NULL")
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.alter_column("is_stale", existing_type=sa.Boolean,
nullable=False)
+
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("log") as batch_op:
+ batch_op.alter_column("event", existing_type=sa.String(60),
nullable=False)
+
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.alter_column("is_stale", existing_type=sa.Boolean,
nullable=False)
def downgrade():
"""Allow the columns to accept NULL again for older state reversions."""
- with op.batch_alter_table("log") as batch_op:
- batch_op.alter_column("event", existing_type=sa.String(60),
nullable=True)
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("log") as batch_op:
+ batch_op.alter_column("event", existing_type=sa.String(60),
nullable=True)
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.alter_column("is_stale", existing_type=sa.Boolean,
nullable=True)
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.alter_column("is_stale", existing_type=sa.Boolean,
nullable=True)
diff --git
a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
index 2a3f9c48573..233e0c95557 100644
---
a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
+++
b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py
@@ -52,5 +52,8 @@ def upgrade():
def downgrade():
"""Unapply Add exceeds max runs flag to dag model."""
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.drop_column("exceeds_max_non_backfill")
+ from airflow.migrations.utils import disable_sqlite_fkeys
+
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.drop_column("exceeds_max_non_backfill")
diff --git
a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
b/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
index 86049d06d72..47349528c6e 100644
---
a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
+++
b/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py
@@ -40,16 +40,22 @@ airflow_version = "3.2.0"
def upgrade():
"""Apply add timetable_type to dag table for filtering."""
+ from airflow.migrations.utils import disable_sqlite_fkeys
+
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("timetable_type", sa.String(length=255)))
op.execute("UPDATE dag SET timetable_type = '' WHERE timetable_type IS
NULL")
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.alter_column("timetable_type",
existing_type=sa.String(length=255), nullable=False)
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.alter_column("timetable_type",
existing_type=sa.String(length=255), nullable=False)
def downgrade():
"""Unapply add timetable_type to dag table for filtering."""
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.drop_column("timetable_type")
+ from airflow.migrations.utils import disable_sqlite_fkeys
+
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.drop_column("timetable_type")
diff --git
a/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
b/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
index 9a5ff5d5cad..6f4fb0ef132 100644
---
a/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
+++
b/airflow-core/src/airflow/migrations/versions/0105_3_2_0_add_allowed_run_types_to_dag.py
@@ -46,5 +46,8 @@ def upgrade():
def downgrade():
"""Remove allowed_run_types column from dag table."""
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.drop_column("allowed_run_types")
+ from airflow.migrations.utils import disable_sqlite_fkeys
+
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.drop_column("allowed_run_types")
diff --git
a/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
b/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
index 69868e0d9ab..422e9b8f70d 100644
---
a/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
+++
b/airflow-core/src/airflow/migrations/versions/0106_3_2_0_add_partition_key_to_backfill_dag_run.py
@@ -31,6 +31,7 @@ import sqlalchemy as sa
from alembic import op
from airflow.migrations.db_types import StringID
+from airflow.migrations.utils import disable_sqlite_fkeys
from airflow.utils.sqlalchemy import UtcDateTime
revision = "134de42d3cb0"
@@ -44,20 +45,23 @@ def upgrade():
"""Apply Add partition_key to backfill_dag_run."""
op.add_column("dag_run", sa.Column("created_at",
UtcDateTime(timezone=True), nullable=True))
op.execute("update dag_run set created_at = run_after;")
- with op.batch_alter_table("dag_run", schema=None) as batch_op:
- batch_op.alter_column("created_at",
existing_type=UtcDateTime(timezone=True), nullable=False)
- with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
- batch_op.add_column(sa.Column("partition_key", StringID(),
nullable=True))
- batch_op.alter_column("logical_date", existing_type=sa.TIMESTAMP(),
nullable=True)
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("dag_run", schema=None) as batch_op:
+ batch_op.alter_column("created_at",
existing_type=UtcDateTime(timezone=True), nullable=False)
+
+ with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("partition_key", StringID(),
nullable=True))
+ batch_op.alter_column("logical_date",
existing_type=sa.TIMESTAMP(), nullable=True)
def downgrade():
"""Unapply Add partition_key to backfill_dag_run."""
- op.execute("DELETE FROM backfill_dag_run WHERE logical_date IS NULL;")
- with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
- batch_op.alter_column("logical_date", existing_type=sa.TIMESTAMP(),
nullable=False)
- batch_op.drop_column("partition_key")
+ with disable_sqlite_fkeys(op):
+ op.execute("DELETE FROM backfill_dag_run WHERE logical_date IS NULL;")
+ with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
+ batch_op.alter_column("logical_date",
existing_type=sa.TIMESTAMP(), nullable=False)
+ batch_op.drop_column("partition_key")
- with op.batch_alter_table("dag_run", schema=None) as batch_op:
- batch_op.drop_column("created_at")
+ with op.batch_alter_table("dag_run", schema=None) as batch_op:
+ batch_op.drop_column("created_at")
diff --git
a/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
b/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
index a3d31eadc53..52abaee3560 100644
---
a/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
+++
b/airflow-core/src/airflow/migrations/versions/0107_3_2_0_add_partition_fields_to_dag.py
@@ -53,9 +53,12 @@ def upgrade():
def downgrade():
"""Remove partition fields from DagModel."""
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.drop_column("timetable_partitioned")
- batch_op.drop_column("next_dagrun_partition_key")
- batch_op.drop_column("next_dagrun_partition_date")
- with op.batch_alter_table("dag_run", schema=None) as batch_op:
- batch_op.drop_column("partition_date")
+ from airflow.migrations.utils import disable_sqlite_fkeys
+
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.drop_column("timetable_partitioned")
+ batch_op.drop_column("next_dagrun_partition_key")
+ batch_op.drop_column("next_dagrun_partition_date")
+ with op.batch_alter_table("dag_run", schema=None) as batch_op:
+ batch_op.drop_column("partition_date")