Lee-W commented on code in PR #64838: URL: https://github.com/apache/airflow/pull/64838#discussion_r3217759901
########## airflow-core/tests/unit/migrations/test_sqlite_migration_utils.py: ########## @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.migrations.utils import disable_sqlite_fkeys + + +class _Dialect: + def __init__(self, name: str) -> None: + self.name = name + + +class _Bind: + def __init__(self, dialect_name: str) -> None: + self.dialect = _Dialect(name=dialect_name) + + +class _Context: + def __init__(self, dialect_name: str) -> None: + self.dialect = _Dialect(name=dialect_name) + + +class _FakeOp: + def __init__(self, dialect_name: str) -> None: + self._bind = _Bind(dialect_name=dialect_name) + self.executed: list[str] = [] + + def get_bind(self) -> _Bind: + return self._bind + + def execute(self, statement: str) -> None: + self.executed.append(statement) + + +class _OfflineFakeOp: + """Simulates Alembic offline mode where get_bind() returns None.""" + + def __init__(self, dialect_name: str) -> None: + self._context = _Context(dialect_name=dialect_name) + self.executed: list[str] = [] + + def get_bind(self) -> None: + return None + + def get_context(self) -> _Context: + return self._context + + def execute(self, statement: str) -> None: + self.executed.append(statement) + + +def test_disable_sqlite_fkeys_restores_pragma_on_success() -> None: Review Comment: We no longer need this. we have a unit test for it in airflow-core/tests/unit/migrations/test_migration_utils.py ########## airflow-core/src/airflow/migrations/utils.py: ########## @@ -56,8 +66,196 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): def ignore_sqlite_value_error(): - from alembic import op - - if op.get_bind().dialect.name == "sqlite": + if get_dialect_name(alembic_op) == "sqlite": return contextlib.suppress(ValueError) return contextlib.nullcontext() + + +def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None: Review Comment: is this still used somewhere? ########## airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py: ########## @@ -103,19 +102,18 @@ def _drop_fkey_if_exists(table, constraint_name): conn = op.get_bind() dialect_name = conn.dialect.name - if dialect_name == "sqlite": - # SQLite requires foreign key constraints to be disabled during batch operations - conn.execute(text("PRAGMA foreign_keys=OFF")) - try: - with op.batch_alter_table(table, schema=None) as batch_op: - batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey") - except ValueError: - pass - conn.execute(text("PRAGMA foreign_keys=ON")) - elif dialect_name == "mysql": + if dialect_name == "mysql": mysql_drop_foreignkey_if_exists(constraint_name, table, op) - else: + elif dialect_name == "postgresql": op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}") + else: + # SQLite requires foreign key constraints to be disabled during batch operations. + with disable_sqlite_fkeys(op): + try: Review Comment: should we use `ignore_sqlite_value_error`? ########## airflow-core/src/airflow/migrations/utils.py: ########## @@ -56,8 +66,196 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): def ignore_sqlite_value_error(): - from alembic import op - - if op.get_bind().dialect.name == "sqlite": + if get_dialect_name(alembic_op) == "sqlite": return contextlib.suppress(ValueError) return contextlib.nullcontext() + + +def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None: + """ + Create an index if it does not already exist. + + MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used. + PostgreSQL and SQLite support it natively. + """ + dialect_name = get_dialect_name(op) + + if dialect_name == "mysql": + unique_kw = "UNIQUE " if unique else "" + col_list = ", ".join(f"`{c}`" for c in columns) + op.execute( + text(f""" + DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; + CREATE PROCEDURE CreateIndexIfNotExists() + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM information_schema.STATISTICS + WHERE + TABLE_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + INDEX_NAME = '{index_name}' + ) THEN + CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list}); + END IF; + END; + CALL CreateIndexIfNotExists(); + DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; + """) + ) + else: + op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True) + + +def drop_index_if_exists(op, index_name, table_name) -> None: Review Comment: is this still used somewhere? ########## airflow-core/src/airflow/migrations/utils.py: ########## @@ -56,8 +66,196 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): def ignore_sqlite_value_error(): - from alembic import op - - if op.get_bind().dialect.name == "sqlite": + if get_dialect_name(alembic_op) == "sqlite": return contextlib.suppress(ValueError) return contextlib.nullcontext() + + +def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None: + """ + Create an index if it does not already exist. + + MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used. + PostgreSQL and SQLite support it natively. + """ + dialect_name = get_dialect_name(op) + + if dialect_name == "mysql": + unique_kw = "UNIQUE " if unique else "" + col_list = ", ".join(f"`{c}`" for c in columns) + op.execute( + text(f""" + DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; + CREATE PROCEDURE CreateIndexIfNotExists() + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM information_schema.STATISTICS + WHERE + TABLE_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + INDEX_NAME = '{index_name}' + ) THEN + CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list}); + END IF; + END; + CALL CreateIndexIfNotExists(); + DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; + """) + ) + else: + op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True) + + +def drop_index_if_exists(op, index_name, table_name) -> None: + """ + Drop an index if it exists. + + Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL. + SQLite and PostgreSQL support DROP INDEX IF EXISTS natively. + MySQL requires a stored procedure since it does not support IF EXISTS for DROP INDEX. + """ + dialect_name = get_dialect_name(op) + + if dialect_name == "mysql": + op.execute( + text(f""" + DROP PROCEDURE IF EXISTS DropIndexIfExists; + CREATE PROCEDURE DropIndexIfExists() + BEGIN + IF EXISTS ( + SELECT 1 + FROM information_schema.STATISTICS + WHERE + TABLE_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + INDEX_NAME = '{index_name}' + ) THEN + DROP INDEX `{index_name}` ON `{table_name}`; + END IF; + END; + CALL DropIndexIfExists(); + DROP PROCEDURE DropIndexIfExists; + """) + ) + else: + # PostgreSQL and SQLite both support DROP INDEX IF EXISTS + op.drop_index(index_name, table_name=table_name, if_exists=True) + + +def drop_unique_constraints_on_columns(op, table_name, columns) -> None: + """ + Drop all unique constraints covering any of the given columns, regardless of constraint name. + + Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL. + SQLite falls back to batch mode and requires a live connection. + """ + dialect_name = get_dialect_name(op) + + if dialect_name == "postgresql": + cols_array = ", ".join(f"'{c}'" for c in columns) + op.execute( + text(f""" + DO $$ + DECLARE r record; + BEGIN + FOR r IN + SELECT DISTINCT tc.constraint_name + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + WHERE tc.table_name = '{table_name}' + AND tc.constraint_type = 'UNIQUE' + AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[]) + LOOP + EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || ' DROP CONSTRAINT IF EXISTS ' + || quote_ident(r.constraint_name); + END LOOP; + END $$ + """) + ) + elif dialect_name == "mysql": + cols_in = ", ".join(f"'{c}'" for c in columns) + op.execute( + text(f""" + DROP PROCEDURE IF EXISTS DropUniqueOnColumns; + CREATE PROCEDURE DropUniqueOnColumns() + BEGIN + DECLARE done INT DEFAULT FALSE; + DECLARE v_name VARCHAR(255); + DECLARE cur CURSOR FOR + SELECT DISTINCT kcu.CONSTRAINT_NAME + FROM information_schema.KEY_COLUMN_USAGE kcu + JOIN information_schema.TABLE_CONSTRAINTS tc + ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME + AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA + AND kcu.TABLE_NAME = tc.TABLE_NAME + WHERE kcu.TABLE_NAME = '{table_name}' + AND kcu.TABLE_SCHEMA = DATABASE() + AND tc.CONSTRAINT_TYPE = 'UNIQUE' + AND kcu.COLUMN_NAME IN ({cols_in}); + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; + OPEN cur; + drop_loop: LOOP + FETCH cur INTO v_name; + IF done THEN LEAVE drop_loop; END IF; + SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX `', v_name, '`'); + PREPARE s FROM @stmt; + EXECUTE s; + DEALLOCATE PREPARE s; + END LOOP; + CLOSE cur; + END; + CALL DropUniqueOnColumns(); + DROP PROCEDURE DropUniqueOnColumns; + """) + ) + else: + # SQLite — batch mode rewrites the table; requires a live connection + with op.batch_alter_table(table_name, schema=None) as batch_op: + for uq in sa.inspect(op.get_bind()).get_unique_constraints(table_name): + if any(col in uq["column_names"] for col in columns): + batch_op.drop_constraint(uq["name"], type_="unique") + + +def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None: Review Comment: is this still used somewhere? ########## airflow-core/tests/unit/migrations/test_sqlite_migration_utils.py: ########## @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.migrations.utils import disable_sqlite_fkeys + + +class _Dialect: + def __init__(self, name: str) -> None: + self.name = name + + +class _Bind: + def __init__(self, dialect_name: str) -> None: + self.dialect = _Dialect(name=dialect_name) + + +class _Context: + def __init__(self, dialect_name: str) -> None: + self.dialect = _Dialect(name=dialect_name) + + +class _FakeOp: + def __init__(self, dialect_name: str) -> None: + self._bind = _Bind(dialect_name=dialect_name) + self.executed: list[str] = [] + + def get_bind(self) -> _Bind: + return self._bind + + def execute(self, statement: str) -> None: + self.executed.append(statement) + + +class _OfflineFakeOp: + """Simulates Alembic offline mode where get_bind() returns None.""" + + def __init__(self, dialect_name: str) -> None: + self._context = _Context(dialect_name=dialect_name) + self.executed: list[str] = [] + + def get_bind(self) -> None: + return None + + def get_context(self) -> _Context: + return self._context + + def execute(self, statement: str) -> None: + self.executed.append(statement) + + +def test_disable_sqlite_fkeys_restores_pragma_on_success() -> None: Review Comment: we can merge tests there if needed ########## airflow-core/src/airflow/migrations/utils.py: ########## @@ -56,8 +66,196 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): def ignore_sqlite_value_error(): - from alembic import op - - if op.get_bind().dialect.name == "sqlite": + if get_dialect_name(alembic_op) == "sqlite": return contextlib.suppress(ValueError) return contextlib.nullcontext() + + +def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None: + """ + Create an index if it does not already exist. + + MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used. + PostgreSQL and SQLite support it natively. + """ + dialect_name = get_dialect_name(op) + + if dialect_name == "mysql": + unique_kw = "UNIQUE " if unique else "" + col_list = ", ".join(f"`{c}`" for c in columns) + op.execute( + text(f""" + DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; + CREATE PROCEDURE CreateIndexIfNotExists() + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM information_schema.STATISTICS + WHERE + TABLE_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + INDEX_NAME = '{index_name}' + ) THEN + CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list}); + END IF; + END; + CALL CreateIndexIfNotExists(); + DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; + """) + ) + else: + op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True) + + +def drop_index_if_exists(op, index_name, table_name) -> None: + """ + Drop an index if it exists. + + Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL. + SQLite and PostgreSQL support DROP INDEX IF EXISTS natively. + MySQL requires a stored procedure since it does not support IF EXISTS for DROP INDEX. + """ + dialect_name = get_dialect_name(op) + + if dialect_name == "mysql": + op.execute( + text(f""" + DROP PROCEDURE IF EXISTS DropIndexIfExists; + CREATE PROCEDURE DropIndexIfExists() + BEGIN + IF EXISTS ( + SELECT 1 + FROM information_schema.STATISTICS + WHERE + TABLE_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + INDEX_NAME = '{index_name}' + ) THEN + DROP INDEX `{index_name}` ON `{table_name}`; + END IF; + END; + CALL DropIndexIfExists(); + DROP PROCEDURE DropIndexIfExists; + """) + ) + else: + # PostgreSQL and SQLite both support DROP INDEX IF EXISTS + op.drop_index(index_name, table_name=table_name, if_exists=True) + + +def drop_unique_constraints_on_columns(op, table_name, columns) -> None: Review Comment: is this still used somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
