Copilot commented on code in PR #64838:
URL: https://github.com/apache/airflow/pull/64838#discussion_r3213058713


##########
airflow-core/src/airflow/migrations/utils.py:
##########
@@ -56,8 +66,196 @@ def mysql_drop_foreignkey_if_exists(constraint_name, 
table_name, op):
 
 
 def ignore_sqlite_value_error():
-    from alembic import op
-
-    if op.get_bind().dialect.name == "sqlite":
+    if get_dialect_name(alembic_op) == "sqlite":
         return contextlib.suppress(ValueError)
     return contextlib.nullcontext()
+
+
+def create_index_if_not_exists(op, index_name, table_name, columns, 
unique=False) -> None:
+    """
+    Create an index if it does not already exist.
+
+    MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure 
is used.
+    PostgreSQL and SQLite support it natively.
+    """
+    dialect_name = get_dialect_name(op)
+
+    if dialect_name == "mysql":
+        unique_kw = "UNIQUE " if unique else ""
+        col_list = ", ".join(f"`{c}`" for c in columns)
+        op.execute(
+            text(f"""
+            DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
+            CREATE PROCEDURE CreateIndexIfNotExists()
+            BEGIN
+                IF NOT EXISTS (
+                    SELECT 1
+                    FROM information_schema.STATISTICS
+                    WHERE
+                        TABLE_SCHEMA = DATABASE() AND
+                        TABLE_NAME = '{table_name}' AND
+                        INDEX_NAME = '{index_name}'
+                ) THEN
+                    CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` 
({col_list});
+                END IF;
+            END;
+            CALL CreateIndexIfNotExists();
+            DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
+            """)
+        )
+    else:
+        op.create_index(index_name, table_name, columns, unique=unique, 
if_not_exists=True)
+
+
+def drop_index_if_exists(op, index_name, table_name) -> None:
+    """
+    Drop an index if it exists.
+
+    Works in both online and offline mode by using raw SQL for PostgreSQL and 
MySQL.
+    SQLite and PostgreSQL support DROP INDEX IF EXISTS natively.
+    MySQL requires a stored procedure since it does not support IF EXISTS for 
DROP INDEX.
+    """
+    dialect_name = get_dialect_name(op)
+
+    if dialect_name == "mysql":
+        op.execute(
+            text(f"""
+            DROP PROCEDURE IF EXISTS DropIndexIfExists;
+            CREATE PROCEDURE DropIndexIfExists()
+            BEGIN
+                IF EXISTS (
+                    SELECT 1
+                    FROM information_schema.STATISTICS
+                    WHERE
+                        TABLE_SCHEMA = DATABASE() AND
+                        TABLE_NAME = '{table_name}' AND
+                        INDEX_NAME = '{index_name}'
+                ) THEN
+                    DROP INDEX `{index_name}` ON `{table_name}`;
+                END IF;
+            END;
+            CALL DropIndexIfExists();
+            DROP PROCEDURE DropIndexIfExists;
+            """)
+        )
+    else:
+        # PostgreSQL and SQLite both support DROP INDEX IF EXISTS
+        op.drop_index(index_name, table_name=table_name, if_exists=True)
+
+
+def drop_unique_constraints_on_columns(op, table_name, columns) -> None:
+    """
+    Drop all unique constraints covering any of the given columns, regardless 
of constraint name.
+
+    Works in both online and offline mode by using raw SQL for PostgreSQL and 
MySQL.
+    SQLite falls back to batch mode and requires a live connection.
+    """
+    dialect_name = get_dialect_name(op)
+
+    if dialect_name == "postgresql":
+        cols_array = ", ".join(f"'{c}'" for c in columns)
+        op.execute(

Review Comment:
   `drop_unique_constraints_on_columns()` will generate invalid SQL when 
`columns` is empty (Postgres: `ANY(ARRAY[]::text[])`; MySQL: `IN ()`). Consider 
explicitly validating `columns` (e.g., return early or raise a clear error) to 
prevent hard-to-debug migration failures if the helper is called with an empty 
iterable.



##########
airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py:
##########
@@ -103,19 +102,18 @@ def _drop_fkey_if_exists(table, constraint_name):
     conn = op.get_bind()
     dialect_name = conn.dialect.name
 

Review Comment:
   `_drop_fkey_if_exists()` still relies on `conn = op.get_bind()` / 
`conn.dialect.name` for dialect detection. In Alembic offline mode 
`op.get_bind()` returns `None`, so this helper will crash before it can fall 
back to the SQLite/Postgres/MySQL branches. Consider switching to 
`get_dialect_name(op)` (from `airflow.migrations.utils`) so dialect detection 
is offline-safe.



##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -171,64 +172,58 @@ def upgrade() -> None:
     #   user-provided DeadlineDefinition, and the actual instance of a 
Definition is (still) the Deadline.
     #   This feels more intuitive than DeadlineAlert defining the Deadline.
 
-    op.create_table(
-        "deadline_alert",
-        sa.Column("id", sa.Uuid(), default=uuid6.uuid7),
-        sa.Column("created_at", UtcDateTime, nullable=False),
-        sa.Column("serialized_dag_id", sa.Uuid(), nullable=False),
-        sa.Column("name", sa.String(250), nullable=True),
-        sa.Column("description", sa.Text(), nullable=True),
-        sa.Column("reference", sa.JSON(), nullable=False),
-        sa.Column("interval", sa.Float(), nullable=False),
-        sa.Column("callback_def", sa.JSON(), nullable=False),
-        sa.PrimaryKeyConstraint("id", name=op.f("deadline_alert_pkey")),
-    )
-
-    conn = op.get_bind()
-    dialect_name = conn.dialect.name
-
-    if dialect_name == "sqlite":
-        conn.execute(sa.text("PRAGMA foreign_keys=OFF"))
-
-    with op.batch_alter_table("deadline", schema=None) as batch_op:
-        batch_op.add_column(sa.Column("deadline_alert_id", sa.Uuid(), 
nullable=True))
-        batch_op.add_column(sa.Column("created_at", UtcDateTime, 
nullable=True))
-        batch_op.add_column(sa.Column("last_updated_at", UtcDateTime, 
nullable=True))
-        batch_op.create_foreign_key(
-            batch_op.f("deadline_deadline_alert_id_fkey"),
+    with disable_sqlite_fkeys(op):
+        op.create_table(
             "deadline_alert",
-            ["deadline_alert_id"],
-            ["id"],
-            ondelete="SET NULL",
+            sa.Column("id", sa.Uuid(), default=uuid6.uuid7),
+            sa.Column("created_at", UtcDateTime, nullable=False),
+            sa.Column("serialized_dag_id", sa.Uuid(), nullable=False),
+            sa.Column("name", sa.String(250), nullable=True),
+            sa.Column("description", sa.Text(), nullable=True),
+            sa.Column("reference", sa.JSON(), nullable=False),
+            sa.Column("interval", sa.Float(), nullable=False),
+            sa.Column("callback_def", sa.JSON(), nullable=False),
+            sa.PrimaryKeyConstraint("id", name=op.f("deadline_alert_pkey")),
         )
 
-    # For migration/backcompat purposes if no timestamp is there from the 
migration, use now()
-    # then lock the columns down so all new entries require the timestamps to 
be provided.
-    now = timezone.utcnow()
-    conn.execute(
-        sa.text("""
-            UPDATE deadline
-            SET created_at = :now, last_updated_at = :now
-            WHERE created_at IS NULL OR last_updated_at IS NULL
-        """),
-        {"now": now},
-    )
+        conn = op.get_bind()
+
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.add_column(sa.Column("deadline_alert_id", sa.Uuid(), 
nullable=True))
+            batch_op.add_column(sa.Column("created_at", UtcDateTime, 
nullable=True))
+            batch_op.add_column(sa.Column("last_updated_at", UtcDateTime, 
nullable=True))
+            batch_op.create_foreign_key(
+                batch_op.f("deadline_deadline_alert_id_fkey"),
+                "deadline_alert",
+                ["deadline_alert_id"],
+                ["id"],
+                ondelete="SET NULL",
+            )
 
-    with op.batch_alter_table("deadline", schema=None) as batch_op:
-        batch_op.alter_column("created_at", existing_type=UtcDateTime, 
nullable=False)
-        batch_op.alter_column("last_updated_at", existing_type=UtcDateTime, 
nullable=False)
-
-    with op.batch_alter_table("deadline_alert", schema=None) as batch_op:
-        batch_op.create_foreign_key(
-            batch_op.f("deadline_alert_serialized_dag_id_fkey"),
-            "serialized_dag",
-            ["serialized_dag_id"],
-            ["id"],
-            ondelete="CASCADE",
+        # For migration/backcompat purposes if no timestamp is there from the 
migration, use now()
+        # then lock the columns down so all new entries require the timestamps 
to be provided.
+        now = timezone.utcnow()
+        conn.execute(
+            sa.text("""
+                UPDATE deadline
+                SET created_at = :now, last_updated_at = :now
+                WHERE created_at IS NULL OR last_updated_at IS NULL
+            """),
+            {"now": now},
         )

Review Comment:
   Inside `upgrade()`, `conn = op.get_bind()` is used and then 
`conn.execute(...)` is called. In Alembic offline mode `op.get_bind()` returns 
`None`, so this will crash even though 
`disable_sqlite_fkeys()`/`get_dialect_name()` are now offline-safe. If offline 
SQL generation is a goal here, switch this UPDATE to `op.execute(...)` (or 
guard against `conn is None`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to