kaxil commented on code in PR #44790:
URL: https://github.com/apache/airflow/pull/44790#discussion_r1884309720
##########
airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py:
##########
@@ -39,91 +41,200 @@
depends_on = None
airflow_version = "2.10.3"
-if TYPE_CHECKING:
- from alembic.operations.base import BatchOperations
- from sqlalchemy.sql.elements import conv
-
-
-def _rename_fk_constraint(
- *,
- batch_op: BatchOperations,
- original_name: str | conv,
- new_name: str | conv,
- referent_table: str,
- local_cols: list[str],
- remote_cols: list[str],
- ondelete: str,
-) -> None:
- batch_op.drop_constraint(original_name, type_="foreignkey")
- batch_op.create_foreign_key(
- constraint_name=new_name,
- referent_table=referent_table,
- local_cols=local_cols,
- remote_cols=remote_cols,
- ondelete=ondelete,
- )
+
+def mysql_create_foreignkey_if_not_exists(
+ constraint_name, table_name, column_name, ref_table, ref_column, op
+):
+ op.execute(f"""
+ set @var = (
+ SELECT CASE
+ WHEN EXISTS (
+ SELECT 1
+ FROM information_schema.TABLE_CONSTRAINTS
+ WHERE
+ CONSTRAINT_SCHEMA = DATABASE() AND
+ TABLE_NAME = '{table_name}' AND
+ CONSTRAINT_NAME = '{constraint_name}' AND
+ CONSTRAINT_TYPE = 'FOREIGN KEY'
+ ) THEN 'SELECT 1'
+ ELSE CONCAT(
+ 'ALTER TABLE {table_name} ',
+ 'ADD CONSTRAINT {constraint_name} FOREIGN KEY ({column_name})
',
+ 'REFERENCES {ref_table}({ref_column}) ',
+ 'ON DELETE CASCADE'
+ )
+ END
+ );
+
+ PREPARE stmt FROM @var;
+ EXECUTE stmt;
+ DEALLOCATE PREPARE stmt;
+ """)
+
+
+def postgres_create_foreignkey_if_not_exists(
+ constraint_name, table_name, column_name, ref_table, ref_column, op
+):
+ op.execute(f"""
+ DO $$
+ BEGIN
+ IF NOT EXISTS (
+ SELECT 1
+ FROM information_schema.table_constraints
+ WHERE constraint_type = 'FOREIGN KEY'
+ AND constraint_name = '{constraint_name}'
+ ) THEN
+ ALTER TABLE {table_name}
+ ADD CONSTRAINT {constraint_name}
+ FOREIGN KEY ({column_name})
+ REFERENCES {ref_table} ({ref_column})
+ ON DELETE CASCADE;
+ END IF;
+ END $$;
+ """)
def upgrade():
"""Rename dag_schedule_dataset_alias_reference constraint."""
- with op.batch_alter_table("dag_schedule_dataset_alias_reference",
schema=None) as batch_op:
- bind = op.get_context().bind
- insp = inspect(bind)
- fk_constraints = [fk["name"] for fk in
insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]
-
- # "dsdar_dataset_alias_fkey" was the constraint name defined in the
model while "dsdar_dataset_fkey" is the one
- # defined in the previous migration.
- # Rename this constraint name if user is using the name
"dsdar_dataset_fkey".
- if "dsdar_dataset_fkey" in fk_constraints:
- _rename_fk_constraint(
- batch_op=batch_op,
- original_name="dsdar_dataset_fkey",
- new_name="dsdar_dataset_alias_fkey",
- referent_table="dataset_alias",
- local_cols=["alias_id"],
- remote_cols=["id"],
+ dialect = op.get_context().dialect.name
+ if dialect == "sqlite":
+ op.create_table(
+ "new_table",
+ sa.Column("alias_id", sa.Integer(), primary_key=True,
nullable=False),
+ sa.Column("dag_id", sa.String(ID_LEN), primary_key=True,
nullable=False),
+ sa.Column("created_at", UtcDateTime(timezone=True),
nullable=False),
+ sa.Column("updated_at", UtcDateTime(timezone=True),
nullable=False),
+ sa.ForeignKeyConstraint(
+ ("alias_id",),
+ ["dataset_alias.id"],
+ name="dsdar_dataset_alias_fkey",
ondelete="CASCADE",
- )
-
- # "dsdar_dag_fkey" was the constraint name defined in the model while
"dsdar_dag_id_fkey" is the one
- # defined in the previous migration.
- # Rename this constraint name if user is using the name
"dsdar_dag_fkey".
- if "dsdar_dag_fkey" in fk_constraints:
- _rename_fk_constraint(
- batch_op=batch_op,
- original_name="dsdar_dag_fkey",
- new_name="dsdar_dag_id_fkey",
- referent_table="dataset_alias",
- local_cols=["alias_id"],
- remote_cols=["id"],
+ ),
+ sa.ForeignKeyConstraint(
+ columns=("dag_id",),
+ refcolumns=["dag.dag_id"],
+ name="dsdar_dag_id_fkey",
ondelete="CASCADE",
- )
+ ),
+ sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"),
+ )
+ op.execute(sa.text("INSERT INTO new_table SELECT * FROM
dag_schedule_dataset_alias_reference"))
+ op.execute("DROP TABLE dag_schedule_dataset_alias_reference")
+ op.execute("ALTER TABLE new_table RENAME TO
dag_schedule_dataset_alias_reference")
+ op.create_index(
+ "idx_dag_schedule_dataset_alias_reference_dag_id",
+ "dag_schedule_dataset_alias_reference",
+ ["dag_id"],
+ unique=False,
+ )
+ if dialect == "postgresql":
+ op.execute(
+ "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT
IF EXISTS dsdar_dataset_fkey"
+ )
+ op.execute(
+ "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT
IF EXISTS dsdar_dag_fkey"
+ )
+ postgres_create_foreignkey_if_not_exists(
+ "dsdar_dataset_alias_fkey",
+ "dag_schedule_dataset_alias_reference",
+ "alias_id",
+ "dataset_alias",
+ "id",
+ op,
+ )
+ postgres_create_foreignkey_if_not_exists(
+ "dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference",
"alias_id", "dataset_alias", "id", op
+ )
+ if dialect == "mysql":
+ mysql_drop_foreignkey_if_exists("dsdar_dataset_fkey",
"dag_schedule_dataset_alias_reference", op)
+ mysql_drop_foreignkey_if_exists("dsdar_dag_fkey",
"dag_schedule_dataset_alias_reference", op)
+ mysql_create_foreignkey_if_not_exists(
+ "dsdar_dataset_alias_fkey",
+ "dag_schedule_dataset_alias_reference",
+ "alias_id",
+ "dataset_alias",
+ "id",
+ op,
+ )
+ mysql_create_foreignkey_if_not_exists(
+ "dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference",
"alias_id", "dataset_alias", "id", op
+ )
def downgrade():
"""Undo dag_schedule_dataset_alias_reference constraint rename."""
- with op.batch_alter_table("dag_schedule_dataset_alias_reference",
schema=None) as batch_op:
- bind = op.get_context().bind
- insp = inspect(bind)
- fk_constraints = [fk["name"] for fk in
insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]
- if "dsdar_dataset_alias_fkey" in fk_constraints:
- _rename_fk_constraint(
- batch_op=batch_op,
- original_name="dsdar_dataset_alias_fkey",
- new_name="dsdar_dataset_fkey",
- referent_table="dataset_alias",
- local_cols=["alias_id"],
- remote_cols=["id"],
+ dialect = op.get_context().dialect.name
+ if dialect == "postgresql":
+ op.execute(
+ "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT
IF EXISTS dsdar_dataset_alias_fkey"
+ )
+ op.execute(
+ "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT
IF EXISTS dsdar_dag_id_fkey"
+ )
+ postgres_create_foreignkey_if_not_exists(
+ "dsdar_dataset_fkey",
+ "dag_schedule_dataset_alias_reference",
+ "alias_id",
+ "dataset_alias",
+ "id",
+ op,
+ )
+ postgres_create_foreignkey_if_not_exists(
+ "dsdar_dag_fkey",
+ "dag_schedule_dataset_alias_reference",
+ "alias_id",
+ "dataset_alias",
+ "id",
+ op,
+ )
+ if dialect == "mysql":
+ mysql_drop_foreignkey_if_exists(
+ "dsdar_dataset_alias_fkey",
"dag_schedule_dataset_alias_reference", op
+ )
+ mysql_drop_foreignkey_if_exists("dsdar_dag_id_fkey",
"dag_schedule_dataset_alias_reference", op)
+ mysql_create_foreignkey_if_not_exists(
+ "dsdar_dataset_fkey",
+ "dag_schedule_dataset_alias_reference",
+ "alias_id",
+ "dataset_alias",
+ "id",
+ op,
+ )
+ mysql_create_foreignkey_if_not_exists(
+ "dsdar_dag_fkey",
+ "dag_schedule_dataset_alias_reference",
+ "alias_id",
+ "dataset_alias",
+ "id",
+ op,
+ )
+ if dialect == "sqlite":
+ op.create_table(
+ "new_table",
+ sa.Column("alias_id", sa.Integer(), primary_key=True,
nullable=False),
+ sa.Column("dag_id", sa.String(ID_LEN), primary_key=True,
nullable=False),
+ sa.Column("created_at", UtcDateTime(timezone=True),
nullable=False),
+ sa.Column("updated_at", UtcDateTime(timezone=True),
nullable=False),
+ sa.ForeignKeyConstraint(
+ ("alias_id",),
+ ["dataset_alias.id"],
+ name="dsdar_dataset_fkey",
ondelete="CASCADE",
- )
-
- if "dsdar_dag_id_fkey" in fk_constraints:
- _rename_fk_constraint(
- batch_op=batch_op,
- original_name="dsdar_dag_id_fkey",
- new_name="dsdar_dag_fkey",
- referent_table="dataset_alias",
- local_cols=["alias_id"],
- remote_cols=["id"],
+ ),
+ sa.ForeignKeyConstraint(
+ columns=("dag_id",),
+ refcolumns=["dag.dag_id"],
+ name="dsdar_dag_fkey",
ondelete="CASCADE",
- )
+ ),
+ sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"),
+ )
+ op.execute(sa.text("INSERT INTO new_table SELECT * FROM
dag_schedule_dataset_alias_reference"))
+ op.execute("DROP TABLE dag_schedule_dataset_alias_reference")
Review Comment:
Not saying we absolutely should, but you can technically use alembic for
this op
```
op.drop_table("dag_schedule_dataset_alias_reference")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]