kaxil commented on code in PR #62234:
URL: https://github.com/apache/airflow/pull/62234#discussion_r2940895090


##########
airflow-core/src/airflow/migrations/utils.py:
##########
@@ -112,3 +113,196 @@ def ignore_sqlite_value_error():
     if op.get_bind().dialect.name == "sqlite":
         return contextlib.suppress(ValueError)
     return contextlib.nullcontext()
+
+
+def get_dialect_name(op) -> str:
+    conn = op.get_bind()
+    return conn.dialect.name if conn is not None else 
op.get_context().dialect.name
+
+
+def create_index_if_not_exists(op, index_name, table_name, columns, 
unique=False) -> None:
+    """
+    Create an index if it does not already exist.
+
+    MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure 
is used.
+    PostgreSQL and SQLite support it natively.
+    """
+    dialect_name = get_dialect_name(op)
+
+    if dialect_name == "mysql":
+        unique_kw = "UNIQUE " if unique else ""
+        col_list = ", ".join(f"`{c}`" for c in columns)
+        op.execute(
+            text(f"""
+            CREATE PROCEDURE CreateIndexIfNotExists()

Review Comment:
   The stored procedure name `CreateIndexIfNotExists` is hardcoded. If two 
`create_index_if_not_exists()` calls run in the same `op.execute()` batch 
(unlikely but possible in future migrations), the second `CREATE PROCEDURE 
CreateIndexIfNotExists()` would fail. The FAB migration (`0001_3_5_0`) uses a 
safer pattern: `DROP PROCEDURE IF EXISTS <name>` before `CREATE PROCEDURE`. 
Consider adding that guard here too, or parameterizing the procedure name.



##########
providers/fab/src/airflow/providers/fab/migrations/versions/0001_3_5_0_fix_fab_db_inconsistencies.py:
##########
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Fix fab db inconsistencies.
+
+Revision ID: 02ca36b0235b
+Revises: 6709f7a774b9
+Create Date: 2026-03-10 14:07:31.559184
+
+"""
+
+from __future__ import annotations
+
+import contextlib
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "02ca36b0235b"
+down_revision = "6709f7a774b9"
+branch_labels = None
+depends_on = None
+fab_version = "3.5.0"
+
+
+def _mysql_run_procedure(procedure_name: str, body: str) -> str:
+    return f"""
+    DROP PROCEDURE IF EXISTS {procedure_name};
+    CREATE PROCEDURE {procedure_name}()
+    BEGIN
+    {body}
+    END;
+    CALL {procedure_name}();
+    DROP PROCEDURE IF EXISTS {procedure_name};
+    """
+
+
+def _mysql_create_idx_permission_view_id_if_not_exists() -> str:
+    return _mysql_run_procedure(
+        "CreateIdxPermissionViewId",
+        """
+        IF NOT EXISTS (
+            SELECT 1 FROM information_schema.STATISTICS
+            WHERE TABLE_SCHEMA = DATABASE()
+                AND TABLE_NAME = 'ab_permission_view_role'
+                AND INDEX_NAME = 'idx_permission_view_id'
+        ) THEN
+            CREATE INDEX `idx_permission_view_id` ON `ab_permission_view_role` 
(`permission_view_id`);
+        END IF;
+        """,
+    )
+
+
+def _mysql_create_idx_role_id_if_not_exists() -> str:
+    return _mysql_run_procedure(
+        "CreateIdxRoleId",
+        """
+        IF NOT EXISTS (
+            SELECT 1 FROM information_schema.STATISTICS
+            WHERE TABLE_SCHEMA = DATABASE()
+                AND TABLE_NAME = 'ab_permission_view_role'
+                AND INDEX_NAME = 'idx_role_id'
+        ) THEN
+            CREATE INDEX `idx_role_id` ON `ab_permission_view_role` 
(`role_id`);
+        END IF;
+        """,
+    )
+
+
+def _postgresql_drop_unique_constraints_on_ab_register_user_email() -> str:
+    return """
+    DO $$
+    DECLARE r record;
+    BEGIN
+        FOR r IN
+            SELECT DISTINCT tc.constraint_name
+            FROM information_schema.table_constraints tc
+            JOIN information_schema.key_column_usage kcu
+                ON tc.constraint_name = kcu.constraint_name
+                AND tc.table_schema = kcu.table_schema
+            WHERE tc.table_name = 'ab_register_user'
+                AND tc.constraint_type = 'UNIQUE'
+                AND kcu.column_name = 'email'
+        LOOP
+            EXECUTE 'ALTER TABLE ' || quote_ident('ab_register_user')
+                || ' DROP CONSTRAINT IF EXISTS ' || 
quote_ident(r.constraint_name);
+        END LOOP;
+    END $$
+    """
+
+
+def _mysql_drop_unique_constraints_on_ab_register_user_email() -> str:
+    return _mysql_run_procedure(
+        "DropEmailUqIfExists",
+        """
+        DECLARE done INT DEFAULT FALSE;
+        DECLARE v_name VARCHAR(255);
+        DECLARE cur CURSOR FOR
+            SELECT DISTINCT kcu.CONSTRAINT_NAME
+            FROM information_schema.KEY_COLUMN_USAGE kcu
+            JOIN information_schema.TABLE_CONSTRAINTS tc
+                ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
+                AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA
+                AND kcu.TABLE_NAME = tc.TABLE_NAME
+            WHERE kcu.TABLE_NAME = 'ab_register_user'
+                AND kcu.TABLE_SCHEMA = DATABASE()
+                AND tc.CONSTRAINT_TYPE = 'UNIQUE'
+                AND kcu.COLUMN_NAME = 'email';
+        DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
+        OPEN cur;
+        drop_loop: LOOP
+            FETCH cur INTO v_name;
+            IF done THEN LEAVE drop_loop; END IF;
+            SET @stmt = CONCAT('ALTER TABLE `ab_register_user` DROP INDEX `', 
v_name, '`');
+            PREPARE s FROM @stmt;
+            EXECUTE s;
+            DEALLOCATE PREPARE s;
+        END LOOP;
+        CLOSE cur;
+        """,
+    )
+
+
+def _drop_unique_constraint_if_exists(table_name: str, constraint_name: str) 
-> None:
+    dialect_name = op.get_context().dialect.name
+
+    if dialect_name == "postgresql":
+        op.execute(sa.text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF 
EXISTS "{constraint_name}"'))
+    elif dialect_name == "mysql":
+        op.execute(
+            sa.text(
+                _mysql_run_procedure(
+                    "DropUniqueIfExists",
+                    f"""
+                IF EXISTS (
+                    SELECT 1
+                    FROM information_schema.TABLE_CONSTRAINTS
+                    WHERE
+                        CONSTRAINT_SCHEMA = DATABASE() AND
+                        TABLE_NAME = '{table_name}' AND
+                        CONSTRAINT_NAME = '{constraint_name}' AND
+                        CONSTRAINT_TYPE = 'UNIQUE'
+                ) THEN
+                    ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`;
+                ELSE
+                    SELECT 1;
+                END IF;
+                    """,
+                )
+            )
+        )
+    else:
+        with op.batch_alter_table(table_name, schema=None) as batch_op:
+            with contextlib.suppress(ValueError):
+                batch_op.drop_constraint(constraint_name, type_="unique")
+
+
+def _drop_index_if_exists(table_name: str, index_name: str) -> None:
+    dialect_name = op.get_context().dialect.name
+
+    if dialect_name == "mysql":
+        op.execute(
+            sa.text(
+                _mysql_run_procedure(
+                    "DropIndexIfExists",
+                    f"""
+                IF EXISTS (
+                    SELECT 1
+                    FROM information_schema.STATISTICS
+                    WHERE
+                        TABLE_SCHEMA = DATABASE() AND
+                        TABLE_NAME = '{table_name}' AND
+                        INDEX_NAME = '{index_name}'
+                ) THEN
+                    DROP INDEX `{index_name}` ON `{table_name}`;
+                END IF;
+                    """,
+                )
+            )
+        )
+    else:
+        op.drop_index(index_name, table_name=table_name, if_exists=True)
+
+
+def upgrade() -> None:
+    dialect_name = op.get_context().dialect.name
+    bind = op.get_bind()
+    if dialect_name == "postgresql":
+        op.create_index(
+            "idx_ab_user_username",
+            "ab_user",
+            [sa.literal_column("lower(username::text)")],
+            unique=True,
+            if_not_exists=True,
+        )
+        op.create_index(
+            "idx_ab_register_user_username",
+            "ab_register_user",
+            [sa.literal_column("lower(username::text)")],
+            unique=True,
+            if_not_exists=True,
+        )
+
+    # These indexes exist in the ORM (models/__init__.py) so they may already 
be present
+    # on ORM-created databases. Guard with an existence check for all paths 
including offline.
+    if bind is not None:
+        existing_pvr_indexes = {
+            idx["name"] for idx in 
sa.inspect(bind).get_indexes("ab_permission_view_role")
+        }
+        if "idx_permission_view_id" not in existing_pvr_indexes:
+            op.create_index("idx_permission_view_id", 
"ab_permission_view_role", ["permission_view_id"])
+        if "idx_role_id" not in existing_pvr_indexes:
+            op.create_index("idx_role_id", "ab_permission_view_role", 
["role_id"])
+    elif dialect_name == "postgresql":
+        op.create_index(
+            "idx_permission_view_id", "ab_permission_view_role", 
["permission_view_id"], if_not_exists=True
+        )
+        op.create_index("idx_role_id", "ab_permission_view_role", ["role_id"], 
if_not_exists=True)
+    elif dialect_name == "mysql":
+        # Offline MySQL: CREATE INDEX IF NOT EXISTS is unsupported; use stored 
procedures.
+        
op.execute(sa.text(_mysql_create_idx_permission_view_id_if_not_exists()))
+        op.execute(sa.text(_mysql_create_idx_role_id_if_not_exists()))
+
+    with op.batch_alter_table("ab_permission_view_role", schema=None) as 
batch_op:
+        
batch_op.drop_constraint(batch_op.f("ab_permission_view_role_role_id_fkey"), 
type_="foreignkey")

Review Comment:
   On MySQL, dropping a foreign key also drops the index that was implicitly 
created for it. Lines 228-230 create `idx_permission_view_id` and 
`idx_role_id`, then lines 242-245 drop and recreate the foreign keys on the 
same columns. If MySQL's implicit FK index happened to be the same as 
`idx_permission_view_id`/`idx_role_id`, the drop+recreate could leave the table 
without those indexes. Worth double-checking that the explicit indexes survive 
the FK recreation on MySQL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to