This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7348e09bed6 Remove unused/dead code in migration utils (#64947)
7348e09bed6 is described below

commit 7348e09bed647795ac05f9236840076a04bf7023
Author: Dev-iL <[email protected]>
AuthorDate: Fri Apr 10 02:26:41 2026 +0300

    Remove unused/dead code in migration utils (#64947)
---
 airflow-core/src/airflow/migrations/utils.py | 246 ---------------------------
 1 file changed, 246 deletions(-)

diff --git a/airflow-core/src/airflow/migrations/utils.py 
b/airflow-core/src/airflow/migrations/utils.py
index 985473cf80d..4eeaf373c6a 100644
--- a/airflow-core/src/airflow/migrations/utils.py
+++ b/airflow-core/src/airflow/migrations/utils.py
@@ -17,39 +17,8 @@
 from __future__ import annotations
 
 import contextlib
-from collections import defaultdict
 from contextlib import contextmanager
 
-from sqlalchemy import text
-
-
-def get_mssql_table_constraints(conn, table_name) -> dict[str, dict[str, 
list[str]]]:
-    """
-    Return the primary and unique constraint along with column name.
-
-    Some tables like `task_instance` are missing the primary key constraint
-    name and the name is auto-generated by the SQL server, so this function
-    helps to retrieve any primary or unique constraint name.
-
-    :param conn: sql connection object
-    :param table_name: table name
-    :return: a dictionary of ((constraint name, constraint type), column name) 
of table
-    """
-    query = text(
-        f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
-     FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
-     JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON 
ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
-     WHERE tc.TABLE_NAME = '{table_name}' AND
-     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 
'UNIQUE'
-     or UPPER(tc.CONSTRAINT_TYPE) = 'FOREIGN KEY')
-    """
-    )
-    result = conn.execute(query).fetchall()
-    constraint_dict = defaultdict(lambda: defaultdict(list))
-    for constraint, constraint_type, col_name in result:
-        constraint_dict[constraint_type][constraint].append(col_name)
-    return constraint_dict
-
 
 @contextmanager
 def disable_sqlite_fkeys(op):
@@ -86,224 +55,9 @@ def mysql_drop_foreignkey_if_exists(constraint_name, 
table_name, op):
     """)
 
 
-def mysql_drop_index_if_exists(index_name, table_name, op):
-    """Older Mysql versions do not support DROP INDEX IF EXISTS."""
-    op.execute(f"""
-    CREATE PROCEDURE DropIndexIfExists()
-    BEGIN
-        IF EXISTS (
-            SELECT 1
-            FROM information_schema.STATISTICS
-            WHERE
-                TABLE_SCHEMA = DATABASE() AND
-                TABLE_NAME = '{table_name}' AND
-                INDEX_NAME = '{index_name}'
-        ) THEN
-            DROP INDEX `{index_name}` ON `{table_name}`;
-        END IF;
-    END;
-    CALL DropIndexIfExists();
-    DROP PROCEDURE DropIndexIfExists;
-    """)
-
-
 def ignore_sqlite_value_error():
     from alembic import op
 
     if op.get_bind().dialect.name == "sqlite":
         return contextlib.suppress(ValueError)
     return contextlib.nullcontext()
-
-
-def get_dialect_name(op) -> str:
-    conn = op.get_bind()
-    return conn.dialect.name if conn is not None else 
op.get_context().dialect.name
-
-
-def create_index_if_not_exists(op, index_name, table_name, columns, 
unique=False) -> None:
-    """
-    Create an index if it does not already exist.
-
-    MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure 
is used.
-    PostgreSQL and SQLite support it natively.
-    """
-    dialect_name = get_dialect_name(op)
-
-    if dialect_name == "mysql":
-        unique_kw = "UNIQUE " if unique else ""
-        col_list = ", ".join(f"`{c}`" for c in columns)
-        op.execute(
-            text(f"""
-            DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
-            CREATE PROCEDURE CreateIndexIfNotExists()
-            BEGIN
-                IF NOT EXISTS (
-                    SELECT 1
-                    FROM information_schema.STATISTICS
-                    WHERE
-                        TABLE_SCHEMA = DATABASE() AND
-                        TABLE_NAME = '{table_name}' AND
-                        INDEX_NAME = '{index_name}'
-                ) THEN
-                    CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` 
({col_list});
-                END IF;
-            END;
-            CALL CreateIndexIfNotExists();
-            DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
-            """)
-        )
-    else:
-        op.create_index(index_name, table_name, columns, unique=unique, 
if_not_exists=True)
-
-
-def drop_index_if_exists(op, index_name, table_name) -> None:
-    """
-    Drop an index if it exists.
-
-    Works in both online and offline mode by using raw SQL for PostgreSQL and 
MySQL.
-    SQLite and PostgreSQL support DROP INDEX IF EXISTS natively.
-    MySQL requires a stored procedure since it does not support IF EXISTS for 
DROP INDEX.
-    """
-    dialect_name = get_dialect_name(op)
-
-    if dialect_name == "mysql":
-        op.execute(
-            text(f"""
-            CREATE PROCEDURE DropIndexIfExists()
-            BEGIN
-                IF EXISTS (
-                    SELECT 1
-                    FROM information_schema.STATISTICS
-                    WHERE
-                        TABLE_SCHEMA = DATABASE() AND
-                        TABLE_NAME = '{table_name}' AND
-                        INDEX_NAME = '{index_name}'
-                ) THEN
-                    DROP INDEX `{index_name}` ON `{table_name}`;
-                END IF;
-            END;
-            CALL DropIndexIfExists();
-            DROP PROCEDURE DropIndexIfExists;
-            """)
-        )
-    else:
-        # PostgreSQL and SQLite both support DROP INDEX IF EXISTS
-        op.drop_index(index_name, table_name=table_name, if_exists=True)
-
-
-def drop_unique_constraints_on_columns(op, table_name, columns) -> None:
-    """
-    Drop all unique constraints covering any of the given columns, regardless 
of constraint name.
-
-    Works in both online and offline mode by using raw SQL for PostgreSQL and 
MySQL.
-    SQLite falls back to batch mode and requires a live connection.
-    """
-    import sqlalchemy as sa
-
-    dialect_name = get_dialect_name(op)
-
-    if dialect_name == "postgresql":
-        cols_array = ", ".join(f"'{c}'" for c in columns)
-        op.execute(
-            text(f"""
-            DO $$
-            DECLARE r record;
-            BEGIN
-                FOR r IN
-                    SELECT DISTINCT tc.constraint_name
-                    FROM information_schema.table_constraints tc
-                    JOIN information_schema.key_column_usage kcu
-                        ON tc.constraint_name = kcu.constraint_name
-                        AND tc.table_schema = kcu.table_schema
-                    WHERE tc.table_name = '{table_name}'
-                        AND tc.constraint_type = 'UNIQUE'
-                        AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[])
-                LOOP
-                    EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || ' 
DROP CONSTRAINT IF EXISTS '
-                        || quote_ident(r.constraint_name);
-                END LOOP;
-            END $$
-            """)
-        )
-    elif dialect_name == "mysql":
-        cols_in = ", ".join(f"'{c}'" for c in columns)
-        op.execute(
-            text(f"""
-            CREATE PROCEDURE DropUniqueOnColumns()
-            BEGIN
-                DECLARE done INT DEFAULT FALSE;
-                DECLARE v_name VARCHAR(255);
-                DECLARE cur CURSOR FOR
-                    SELECT DISTINCT kcu.CONSTRAINT_NAME
-                    FROM information_schema.KEY_COLUMN_USAGE kcu
-                    JOIN information_schema.TABLE_CONSTRAINTS tc
-                        ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
-                        AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA
-                        AND kcu.TABLE_NAME = tc.TABLE_NAME
-                    WHERE kcu.TABLE_NAME = '{table_name}'
-                        AND kcu.TABLE_SCHEMA = DATABASE()
-                        AND tc.CONSTRAINT_TYPE = 'UNIQUE'
-                        AND kcu.COLUMN_NAME IN ({cols_in});
-                DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
-                OPEN cur;
-                drop_loop: LOOP
-                    FETCH cur INTO v_name;
-                    IF done THEN LEAVE drop_loop; END IF;
-                    SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX 
`', v_name, '`');
-                    PREPARE s FROM @stmt;
-                    EXECUTE s;
-                    DEALLOCATE PREPARE s;
-                END LOOP;
-                CLOSE cur;
-            END;
-            CALL DropUniqueOnColumns();
-            DROP PROCEDURE DropUniqueOnColumns;
-            """)
-        )
-    else:
-        # SQLite — batch mode rewrites the table; requires a live connection
-        with op.batch_alter_table(table_name, schema=None) as batch_op:
-            for uq in 
sa.inspect(op.get_bind()).get_unique_constraints(table_name):
-                if any(col in uq["column_names"] for col in columns):
-                    batch_op.drop_constraint(uq["name"], type_="unique")
-
-
-def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None:
-    """
-    Drop a unique constraint by name if it exists.
-
-    Works in both online and offline mode by using raw SQL for PostgreSQL and 
MySQL.
-    SQLite falls back to batch mode and requires a live connection.
-    """
-    dialect_name = get_dialect_name(op)
-
-    if dialect_name == "postgresql":
-        op.execute(text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS 
"{constraint_name}"'))
-    elif dialect_name == "mysql":
-        op.execute(
-            text(f"""
-            CREATE PROCEDURE DropUniqueIfExists()
-            BEGIN
-                IF EXISTS (
-                    SELECT 1
-                    FROM information_schema.TABLE_CONSTRAINTS
-                    WHERE
-                        CONSTRAINT_SCHEMA = DATABASE() AND
-                        TABLE_NAME = '{table_name}' AND
-                        CONSTRAINT_NAME = '{constraint_name}' AND
-                        CONSTRAINT_TYPE = 'UNIQUE'
-                ) THEN
-                    ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`;
-                ELSE
-                    SELECT 1;
-                END IF;
-            END;
-            CALL DropUniqueIfExists();
-            DROP PROCEDURE DropUniqueIfExists;
-            """)
-        )
-    else:
-        # SQLite — batch mode rewrites the table; requires a live connection
-        with op.batch_alter_table(table_name, schema=None) as batch_op:
-            with contextlib.suppress(ValueError):
-                batch_op.drop_constraint(constraint_name, type_="unique")

Reply via email to