This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7348e09bed6 Remove unused/dead code in migration utils (#64947)
7348e09bed6 is described below
commit 7348e09bed647795ac05f9236840076a04bf7023
Author: Dev-iL <[email protected]>
AuthorDate: Fri Apr 10 02:26:41 2026 +0300
Remove unused/dead code in migration utils (#64947)
---
airflow-core/src/airflow/migrations/utils.py | 246 ---------------------------
1 file changed, 246 deletions(-)
diff --git a/airflow-core/src/airflow/migrations/utils.py
b/airflow-core/src/airflow/migrations/utils.py
index 985473cf80d..4eeaf373c6a 100644
--- a/airflow-core/src/airflow/migrations/utils.py
+++ b/airflow-core/src/airflow/migrations/utils.py
@@ -17,39 +17,8 @@
from __future__ import annotations
import contextlib
-from collections import defaultdict
from contextlib import contextmanager
-from sqlalchemy import text
-
-
-def get_mssql_table_constraints(conn, table_name) -> dict[str, dict[str,
list[str]]]:
- """
- Return the primary and unique constraint along with column name.
-
- Some tables like `task_instance` are missing the primary key constraint
- name and the name is auto-generated by the SQL server, so this function
- helps to retrieve any primary or unique constraint name.
-
- :param conn: sql connection object
- :param table_name: table name
- :return: a dictionary of ((constraint name, constraint type), column name)
of table
- """
- query = text(
- f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
- FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
- JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON
ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
- WHERE tc.TABLE_NAME = '{table_name}' AND
- (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) =
'UNIQUE'
- or UPPER(tc.CONSTRAINT_TYPE) = 'FOREIGN KEY')
- """
- )
- result = conn.execute(query).fetchall()
- constraint_dict = defaultdict(lambda: defaultdict(list))
- for constraint, constraint_type, col_name in result:
- constraint_dict[constraint_type][constraint].append(col_name)
- return constraint_dict
-
@contextmanager
def disable_sqlite_fkeys(op):
@@ -86,224 +55,9 @@ def mysql_drop_foreignkey_if_exists(constraint_name,
table_name, op):
""")
-def mysql_drop_index_if_exists(index_name, table_name, op):
- """Older Mysql versions do not support DROP INDEX IF EXISTS."""
- op.execute(f"""
- CREATE PROCEDURE DropIndexIfExists()
- BEGIN
- IF EXISTS (
- SELECT 1
- FROM information_schema.STATISTICS
- WHERE
- TABLE_SCHEMA = DATABASE() AND
- TABLE_NAME = '{table_name}' AND
- INDEX_NAME = '{index_name}'
- ) THEN
- DROP INDEX `{index_name}` ON `{table_name}`;
- END IF;
- END;
- CALL DropIndexIfExists();
- DROP PROCEDURE DropIndexIfExists;
- """)
-
-
def ignore_sqlite_value_error():
from alembic import op
if op.get_bind().dialect.name == "sqlite":
return contextlib.suppress(ValueError)
return contextlib.nullcontext()
-
-
-def get_dialect_name(op) -> str:
- conn = op.get_bind()
- return conn.dialect.name if conn is not None else
op.get_context().dialect.name
-
-
-def create_index_if_not_exists(op, index_name, table_name, columns,
unique=False) -> None:
- """
- Create an index if it does not already exist.
-
- MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure
is used.
- PostgreSQL and SQLite support it natively.
- """
- dialect_name = get_dialect_name(op)
-
- if dialect_name == "mysql":
- unique_kw = "UNIQUE " if unique else ""
- col_list = ", ".join(f"`{c}`" for c in columns)
- op.execute(
- text(f"""
- DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
- CREATE PROCEDURE CreateIndexIfNotExists()
- BEGIN
- IF NOT EXISTS (
- SELECT 1
- FROM information_schema.STATISTICS
- WHERE
- TABLE_SCHEMA = DATABASE() AND
- TABLE_NAME = '{table_name}' AND
- INDEX_NAME = '{index_name}'
- ) THEN
- CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}`
({col_list});
- END IF;
- END;
- CALL CreateIndexIfNotExists();
- DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
- """)
- )
- else:
- op.create_index(index_name, table_name, columns, unique=unique,
if_not_exists=True)
-
-
-def drop_index_if_exists(op, index_name, table_name) -> None:
- """
- Drop an index if it exists.
-
- Works in both online and offline mode by using raw SQL for PostgreSQL and
MySQL.
- SQLite and PostgreSQL support DROP INDEX IF EXISTS natively.
- MySQL requires a stored procedure since it does not support IF EXISTS for
DROP INDEX.
- """
- dialect_name = get_dialect_name(op)
-
- if dialect_name == "mysql":
- op.execute(
- text(f"""
- CREATE PROCEDURE DropIndexIfExists()
- BEGIN
- IF EXISTS (
- SELECT 1
- FROM information_schema.STATISTICS
- WHERE
- TABLE_SCHEMA = DATABASE() AND
- TABLE_NAME = '{table_name}' AND
- INDEX_NAME = '{index_name}'
- ) THEN
- DROP INDEX `{index_name}` ON `{table_name}`;
- END IF;
- END;
- CALL DropIndexIfExists();
- DROP PROCEDURE DropIndexIfExists;
- """)
- )
- else:
- # PostgreSQL and SQLite both support DROP INDEX IF EXISTS
- op.drop_index(index_name, table_name=table_name, if_exists=True)
-
-
-def drop_unique_constraints_on_columns(op, table_name, columns) -> None:
- """
- Drop all unique constraints covering any of the given columns, regardless
of constraint name.
-
- Works in both online and offline mode by using raw SQL for PostgreSQL and
MySQL.
- SQLite falls back to batch mode and requires a live connection.
- """
- import sqlalchemy as sa
-
- dialect_name = get_dialect_name(op)
-
- if dialect_name == "postgresql":
- cols_array = ", ".join(f"'{c}'" for c in columns)
- op.execute(
- text(f"""
- DO $$
- DECLARE r record;
- BEGIN
- FOR r IN
- SELECT DISTINCT tc.constraint_name
- FROM information_schema.table_constraints tc
- JOIN information_schema.key_column_usage kcu
- ON tc.constraint_name = kcu.constraint_name
- AND tc.table_schema = kcu.table_schema
- WHERE tc.table_name = '{table_name}'
- AND tc.constraint_type = 'UNIQUE'
- AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[])
- LOOP
- EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || '
DROP CONSTRAINT IF EXISTS '
- || quote_ident(r.constraint_name);
- END LOOP;
- END $$
- """)
- )
- elif dialect_name == "mysql":
- cols_in = ", ".join(f"'{c}'" for c in columns)
- op.execute(
- text(f"""
- CREATE PROCEDURE DropUniqueOnColumns()
- BEGIN
- DECLARE done INT DEFAULT FALSE;
- DECLARE v_name VARCHAR(255);
- DECLARE cur CURSOR FOR
- SELECT DISTINCT kcu.CONSTRAINT_NAME
- FROM information_schema.KEY_COLUMN_USAGE kcu
- JOIN information_schema.TABLE_CONSTRAINTS tc
- ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
- AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA
- AND kcu.TABLE_NAME = tc.TABLE_NAME
- WHERE kcu.TABLE_NAME = '{table_name}'
- AND kcu.TABLE_SCHEMA = DATABASE()
- AND tc.CONSTRAINT_TYPE = 'UNIQUE'
- AND kcu.COLUMN_NAME IN ({cols_in});
- DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
- OPEN cur;
- drop_loop: LOOP
- FETCH cur INTO v_name;
- IF done THEN LEAVE drop_loop; END IF;
- SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX
`', v_name, '`');
- PREPARE s FROM @stmt;
- EXECUTE s;
- DEALLOCATE PREPARE s;
- END LOOP;
- CLOSE cur;
- END;
- CALL DropUniqueOnColumns();
- DROP PROCEDURE DropUniqueOnColumns;
- """)
- )
- else:
- # SQLite — batch mode rewrites the table; requires a live connection
- with op.batch_alter_table(table_name, schema=None) as batch_op:
- for uq in
sa.inspect(op.get_bind()).get_unique_constraints(table_name):
- if any(col in uq["column_names"] for col in columns):
- batch_op.drop_constraint(uq["name"], type_="unique")
-
-
-def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None:
- """
- Drop a unique constraint by name if it exists.
-
- Works in both online and offline mode by using raw SQL for PostgreSQL and
MySQL.
- SQLite falls back to batch mode and requires a live connection.
- """
- dialect_name = get_dialect_name(op)
-
- if dialect_name == "postgresql":
- op.execute(text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS
"{constraint_name}"'))
- elif dialect_name == "mysql":
- op.execute(
- text(f"""
- CREATE PROCEDURE DropUniqueIfExists()
- BEGIN
- IF EXISTS (
- SELECT 1
- FROM information_schema.TABLE_CONSTRAINTS
- WHERE
- CONSTRAINT_SCHEMA = DATABASE() AND
- TABLE_NAME = '{table_name}' AND
- CONSTRAINT_NAME = '{constraint_name}' AND
- CONSTRAINT_TYPE = 'UNIQUE'
- ) THEN
- ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`;
- ELSE
- SELECT 1;
- END IF;
- END;
- CALL DropUniqueIfExists();
- DROP PROCEDURE DropUniqueIfExists;
- """)
- )
- else:
- # SQLite — batch mode rewrites the table; requires a live connection
- with op.batch_alter_table(table_name, schema=None) as batch_op:
- with contextlib.suppress(ValueError):
- batch_op.drop_constraint(constraint_name, type_="unique")