This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c8d4aae1f7c Fix ORM vs migration files inconsistencies (#44221)
c8d4aae1f7c is described below

commit c8d4aae1f7c35120c9935d40bac48cf7c3ca3530
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Dec 1 23:58:30 2024 +0100

    Fix ORM vs migration files inconsistencies (#44221)
    
    * Fix ORM vs migration files inconsistencies
    
    There have been some inconsistences between ORM and migration files
    but it doesn't fail in tests. This is an attempt to fix the inconsistency
    and also have it fail in tests
    
    * fix for mysql and postgres
    
    * fixup! fix for mysql and postgres
    
    * fix for sqlite
    
    * fixup! fix for sqlite
    
    * fixup! fixup! fix for sqlite
    
    * use TIMESTAMP from db_types
    
    * skip_archive should not delete _xcom_archive tables since that was 
created by migration
    
    * fix conflicts
    
    * fixup! fix conflicts
    
    * drop _xcom_archive table if it exists
    
    * use sql for dropping xcom_archive table
    
    * fix conflicts
    
    * remove added migration file and make it work in one file
---
 ...audit_log_table_and_change_event_name_length.py |  4 +-
 .../0028_3_0_0_drop_ab_user_id_foreign_key.py      | 14 +++++++
 .../0032_3_0_0_drop_execution_date_unique.py       | 17 ++++++--
 .../versions/0033_3_0_0_add_tables_for_backfill.py |  8 +++-
 .../0036_3_0_0_add_name_field_to_dataset_model.py  |  4 +-
 ...3_0_0_add_exception_reason_and_logical_date_.py |  2 +-
 .../versions/0041_3_0_0_rename_dataset_as_asset.py | 23 +++--------
 ...3_0_0_add_uuid_primary_key_to_task_instance_.py | 46 +++++++++++++---------
 ...49_3_0_0_remove_pickled_data_from_xcom_table.py |  2 +
 airflow/models/__init__.py                         |  1 -
 airflow/models/asset.py                            |  2 +-
 docs/apache-airflow/img/airflow_erd.sha256         |  2 +-
 scripts/in_container/run_generate_migration.sh     |  4 +-
 tests/utils/test_db.py                             |  2 +
 tests/utils/test_db_cleanup.py                     |  4 +-
 tests_common/test_utils/db.py                      |  5 +++
 16 files changed, 88 insertions(+), 52 deletions(-)

diff --git 
a/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py
 
b/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py
index 44bef77578b..22b9c433781 100644
--- 
a/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py
+++ 
b/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py
@@ -59,8 +59,8 @@ def downgrade():
     if conn.dialect.name == "mssql":
         with op.batch_alter_table("log") as batch_op:
             batch_op.drop_index("idx_log_event")
-            batch_op.alter_column("event", type_=sa.String(30), nullable=False)
+            batch_op.alter_column("event", type_=sa.String(30))
             batch_op.create_index("idx_log_event", ["event"])
     else:
         with op.batch_alter_table("log") as batch_op:
-            batch_op.alter_column("event", type_=sa.String(30), nullable=False)
+            batch_op.alter_column("event", type_=sa.String(30))
diff --git 
a/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py 
b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py
index f88aaa014bb..8a9c77042e9 100644
--- a/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py
+++ b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py
@@ -45,6 +45,13 @@ def upgrade():
     with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
         batch_op.drop_constraint("task_instance_note_user_fkey", 
type_="foreignkey")
 
+    if op.get_bind().dialect.name == "mysql":
+        with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
+            batch_op.drop_index("dag_run_note_user_fkey")
+
+        with op.batch_alter_table("task_instance_note", schema=None) as 
batch_op:
+            batch_op.drop_index("task_instance_note_user_fkey")
+
 
 def downgrade():
     """Unapply Drop ab_user.id foreign key."""
@@ -53,3 +60,10 @@ def downgrade():
 
     with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
         batch_op.create_foreign_key("dag_run_note_user_fkey", "ab_user", 
["user_id"], ["id"])
+
+    if op.get_bind().dialect.name == "mysql":
+        with op.batch_alter_table("task_instance_note", schema=None) as 
batch_op:
+            batch_op.create_index("task_instance_note_user_fkey", ["user_id"], 
unique=False)
+
+        with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
+            batch_op.create_index("dag_run_note_user_fkey", ["user_id"], 
unique=False)
diff --git 
a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py 
b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py
index b76bf209bd4..399cc8aff91 100644
--- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py
+++ b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py
@@ -31,9 +31,10 @@ Create Date: 2024-08-28 08:35:26.634475
 
 from __future__ import annotations
 
-import sqlalchemy as sa
 from alembic import op
 
+from airflow.migrations.db_types import TIMESTAMP
+
 # revision identifiers, used by Alembic.
 revision = "1cdc775ca98f"
 down_revision = "a2c32e6c7729"
@@ -44,14 +45,24 @@ airflow_version = "3.0.0"
 
 def upgrade():
     with op.batch_alter_table("dag_run", schema=None) as batch_op:
-        batch_op.alter_column("execution_date", 
new_column_name="logical_date", existing_type=sa.TIMESTAMP)
+        batch_op.alter_column(
+            "execution_date",
+            new_column_name="logical_date",
+            existing_type=TIMESTAMP(timezone=True),
+            existing_nullable=False,
+        )
     with op.batch_alter_table("dag_run", schema=None) as batch_op:
         batch_op.drop_constraint("dag_run_dag_id_execution_date_key", 
type_="unique")
 
 
 def downgrade():
     with op.batch_alter_table("dag_run", schema=None) as batch_op:
-        batch_op.alter_column("logical_date", 
new_column_name="execution_date", existing_type=sa.TIMESTAMP)
+        batch_op.alter_column(
+            "logical_date",
+            new_column_name="execution_date",
+            existing_type=TIMESTAMP(timezone=True),
+            existing_nullable=False,
+        )
     with op.batch_alter_table("dag_run", schema=None) as batch_op:
         batch_op.create_unique_constraint(
             "dag_run_dag_id_execution_date_key",
diff --git a/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py 
b/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py
index bcbaa8b89a0..5ac391413e9 100644
--- a/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py
+++ b/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py
@@ -46,10 +46,10 @@ def upgrade():
     op.create_table(
         "backfill",
         sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
-        sa.Column("dag_id", sa.String(length=250), nullable=True),
+        sa.Column("dag_id", sa.String(length=250), nullable=False),
         sa.Column("from_date", 
airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
         sa.Column("to_date", 
airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
-        sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), 
nullable=True),
+        sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), 
nullable=False),
         sa.Column("is_paused", sa.Boolean(), nullable=True),
         sa.Column("max_active_runs", sa.Integer(), nullable=False),
         sa.Column("created_at", 
airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
@@ -65,6 +65,10 @@ def upgrade():
         sa.Column("sort_ordinal", sa.Integer(), nullable=False),
         sa.PrimaryKeyConstraint("id", name=op.f("backfill_dag_run_pkey")),
         sa.UniqueConstraint("backfill_id", "dag_run_id", 
name="ix_bdr_backfill_id_dag_run_id"),
+        sa.ForeignKeyConstraint(
+            ["backfill_id"], ["backfill.id"], name="bdr_backfill_fkey", 
ondelete="cascade"
+        ),
+        sa.ForeignKeyConstraint(["dag_run_id"], ["dag_run.id"], 
name="bdr_dag_run_fkey", ondelete="set null"),
     )
 
 
diff --git 
a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py 
b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
index 353dcbf0f8f..2676176692a 100644
--- a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
+++ b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
@@ -61,9 +61,7 @@ def upgrade():
     # Add 'name' column. Set it to nullable for now.
     with op.batch_alter_table("dataset", schema=None) as batch_op:
         batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
-        batch_op.add_column(
-            sa.Column("group", _STRING_COLUMN_TYPE, default=str, 
server_default="", nullable=False)
-        )
+        batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, 
default="", nullable=False))
     # Fill name from uri column.
     with Session(bind=op.get_bind()) as session:
         session.execute(sa.text("update dataset set name=uri"))
diff --git 
a/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py
 
b/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py
index c4f96fb0ebf..ab1a060af1d 100644
--- 
a/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py
+++ 
b/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py
@@ -42,7 +42,7 @@ airflow_version = "3.0.0"
 def upgrade():
     """Apply Add exception_reason and logical_date to BackfillDagRun."""
     with op.batch_alter_table("backfill", schema=None) as batch_op:
-        batch_op.add_column(sa.Column("reprocess_behavior", 
sa.String(length=250), nullable=True))
+        batch_op.add_column(sa.Column("reprocess_behavior", 
sa.String(length=250), nullable=False))
     with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
         batch_op.add_column(sa.Column("exception_reason", 
sa.String(length=250), nullable=True))
         batch_op.add_column(sa.Column("logical_date", 
UtcDateTime(timezone=True), nullable=False))
diff --git a/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py 
b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py
index 03836503efe..6c7b20f3816 100644
--- a/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py
+++ b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py
@@ -210,7 +210,7 @@ def upgrade():
             columns=["dag_id"],
             unique=False,
         )
-
+    with op.batch_alter_table("dag_schedule_asset_alias_reference", 
schema=None) as batch_op:
         batch_op.create_foreign_key(
             constraint_name="dsaar_asset_alias_fkey",
             referent_table="asset_alias",
@@ -284,14 +284,8 @@ def upgrade():
             columns=["dag_id"],
             unique=False,
         )
-
-        batch_op.create_foreign_key(
-            constraint_name="toar_asset_fkey",
-            referent_table="asset",
-            local_cols=["asset_id"],
-            remote_cols=["id"],
-            ondelete="CASCADE",
-        )
+    with op.batch_alter_table("task_outlet_asset_reference", schema=None) as 
batch_op:
+        batch_op.create_foreign_key("toar_asset_fkey", "asset", ["asset_id"], 
["id"], ondelete="CASCADE")
         batch_op.create_foreign_key(
             constraint_name="toar_dag_id_fkey",
             referent_table="dag",
@@ -320,14 +314,8 @@ def upgrade():
             columns=["target_dag_id"],
             unique=False,
         )
-
-        batch_op.create_foreign_key(
-            constraint_name="adrq_asset_fkey",
-            referent_table="asset",
-            local_cols=["asset_id"],
-            remote_cols=["id"],
-            ondelete="CASCADE",
-        )
+    with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op:
+        batch_op.create_foreign_key("adrq_asset_fkey", "asset", ["asset_id"], 
["id"], ondelete="CASCADE")
         batch_op.create_foreign_key(
             constraint_name="adrq_dag_fkey",
             referent_table="dag",
@@ -564,7 +552,6 @@ def downgrade():
 
     with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as 
batch_op:
         batch_op.alter_column("asset_id", new_column_name="dataset_id", 
type_=sa.Integer(), nullable=False)
-
         batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey")
         batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey")
 
diff --git 
a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py
 
b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py
index 2abd2116f98..41cfddc9cef 100644
--- 
a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py
+++ 
b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py
@@ -167,6 +167,32 @@ def _get_type_id_column(dialect_name: str) -> 
sa.types.TypeEngine:
         return sa.String(36)
 
 
+def create_foreign_keys():
+    for fk in ti_fk_constraints:
+        if fk["table"] in ["task_instance_history", "task_map"]:
+            continue
+        with op.batch_alter_table(fk["table"]) as batch_op:
+            batch_op.create_foreign_key(
+                constraint_name=fk["fk"],
+                referent_table=ti_table,
+                local_cols=ti_fk_cols,
+                remote_cols=ti_fk_cols,
+                ondelete="CASCADE",
+            )
+    for fk in ti_fk_constraints:
+        if fk["table"] not in ["task_instance_history", "task_map"]:
+            continue
+        with op.batch_alter_table(fk["table"]) as batch_op:
+            batch_op.create_foreign_key(
+                constraint_name=fk["fk"],
+                referent_table=ti_table,
+                local_cols=ti_fk_cols,
+                remote_cols=ti_fk_cols,
+                ondelete="CASCADE",
+                onupdate="CASCADE",
+            )
+
+
 def upgrade():
     """Add UUID primary key to task instance table."""
     conn = op.get_bind()
@@ -232,15 +258,7 @@ def upgrade():
         batch_op.create_primary_key("task_instance_pkey", ["id"])
 
     # Create foreign key constraints
-    for fk in ti_fk_constraints:
-        with op.batch_alter_table(fk["table"]) as batch_op:
-            batch_op.create_foreign_key(
-                constraint_name=fk["fk"],
-                referent_table=ti_table,
-                local_cols=ti_fk_cols,
-                remote_cols=ti_fk_cols,
-                ondelete="CASCADE",
-            )
+    create_foreign_keys()
 
 
 def downgrade():
@@ -270,12 +288,4 @@ def downgrade():
         batch_op.create_primary_key("task_instance_pkey", ti_fk_cols)
 
     # Re-add foreign key constraints
-    for fk in ti_fk_constraints:
-        with op.batch_alter_table(fk["table"]) as batch_op:
-            batch_op.create_foreign_key(
-                constraint_name=fk["fk"],
-                referent_table=ti_table,
-                local_cols=ti_fk_cols,
-                remote_cols=ti_fk_cols,
-                ondelete="CASCADE",
-            )
+    create_foreign_keys()
diff --git 
a/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py 
b/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py
index 2b19827b6ae..ed3379ef51b 100644
--- 
a/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py
+++ 
b/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py
@@ -180,3 +180,5 @@ def downgrade():
 
         with op.batch_alter_table("xcom", schema=None) as batch_op:
             batch_op.drop_column("value_old")
+
+    op.execute(sa.text("DROP TABLE IF EXISTS _xcom_archive"))
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index 6d880341053..731c7d60da4 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -63,7 +63,6 @@ def import_all_models():
     import airflow.models.serialized_dag
     import airflow.models.taskinstancehistory
     import airflow.models.tasklog
-    import airflow.providers.fab.auth_manager.models
 
 
 def __getattr__(name):
diff --git a/airflow/models/asset.py b/airflow/models/asset.py
index d47986a85e5..c3c61b2785c 100644
--- a/airflow/models/asset.py
+++ b/airflow/models/asset.py
@@ -119,7 +119,7 @@ class AssetAliasModel(Base):
             ),
             "mysql",
         ),
-        default=str,
+        default="",
         nullable=False,
     )
 
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index 6b004f91298..4987e63b9bb 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-aa9e2e5b2a52af1e92bc876727ad5e8958e291315096fc5249a9afa2c21a5d06
\ No newline at end of file
+b42b04b6cc47650cb9e7a37258a6e8e99bdca2677253715505b8ad287192bf72
\ No newline at end of file
diff --git a/scripts/in_container/run_generate_migration.sh 
b/scripts/in_container/run_generate_migration.sh
index 50a69855132..46560c91122 100755
--- a/scripts/in_container/run_generate_migration.sh
+++ b/scripts/in_container/run_generate_migration.sh
@@ -20,5 +20,7 @@
 
 cd "${AIRFLOW_SOURCES}" || exit 1
 cd "airflow" || exit 1
-airflow db reset
+airflow db reset -y
+airflow db downgrade -n 2.10.3 -y
+airflow db migrate -r heads
 alembic revision --autogenerate -m "${@}"
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 5312fcacaaa..2290fd2cb98 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -96,6 +96,8 @@ class TestDb:
             lambda t: (t[0] == "remove_table" and t[1].name == 
"sqlite_sequence"),
             # fab version table
             lambda t: (t[0] == "remove_table" and t[1].name == 
"alembic_version_fab"),
+            # Ignore _xcom_archive table
+            lambda t: (t[0] == "remove_table" and t[1].name == 
"_xcom_archive"),
         ]
 
         for ignore in ignores:
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 7dca5815904..531480d3761 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -277,11 +277,13 @@ class TestDBCleanup:
 
     @pytest.mark.parametrize(
         "skip_archive, expected_archives",
-        [pytest.param(True, 0, id="skip_archive"), pytest.param(False, 1, 
id="do_archive")],
+        [pytest.param(True, 1, id="skip_archive"), pytest.param(False, 2, 
id="do_archive")],
     )
     def test__skip_archive(self, skip_archive, expected_archives):
         """
         Verify that running cleanup_table with drops the archives when 
requested.
+
+        Archived tables from DB migration should be kept when skip_archive is 
True.
         """
         base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
         num_tis = 10
diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py
index 9b8f2646dd6..c0ca875b84d 100644
--- a/tests_common/test_utils/db.py
+++ b/tests_common/test_utils/db.py
@@ -73,7 +73,12 @@ def initial_db_init():
     from airflow.www.extensions.init_appbuilder import init_appbuilder
     from airflow.www.extensions.init_auth_manager import get_auth_manager
 
+    from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS
+
     db.resetdb()
+    if AIRFLOW_V_3_0_PLUS:
+        db.downgrade(to_revision="5f2621c13b39")
+        db.upgradedb(to_revision="head")
     _bootstrap_dagbag()
     # minimal app to add roles
     flask_app = Flask(__name__)

Reply via email to