Copilot commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2648327300
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1203,18 +1338,32 @@ def downgrade(*, to_revision, from_revision=None,
show_sql_only=False, session:
log.info("Attempting downgrade to revision %s", to_revision)
config = _get_alembic_config()
+
# If downgrading to less than 3.0.0, we need to handle the FAB provider
if _revision_greater(config, _REVISION_HEADS_MAP["2.10.3"], to_revision):
_handle_fab_downgrade(session=session)
- with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
+
+ # Determine which session to use for the migration operations
+ if get_dialect_name(session) == "mysql":
+ # MySQL: Commit session to release metadata locks before Alembic DDL
+ session_cm: contextlib.AbstractContextManager[Session] =
_mysql_lock_session_for_migration(session)
+ else:
+ # PostgreSQL / SQLite: Use original session
+ session_cm = contextlib.nullcontext(session)
+
+ with (
+ session_cm as work_session,
+ create_global_lock(session=work_session, lock=DBLocks.MIGRATIONS),
+ ):
if show_sql_only:
log.warning("Generating sql scripts for manual migration.")
if not from_revision:
- from_revision = _get_current_revision(session)
+ from_revision = _get_current_revision(work_session)
revision_range = f"{from_revision}:{to_revision}"
_offline_migration(command.downgrade, config=config,
revision=revision_range)
else:
- log.info("Applying downgrade migrations to Airflow database.")
+ dialect_label = " (MySQL)" if get_dialect_name(session) == "mysql"
else ""
Review Comment:
Inconsistent session usage: Line 1365 calls `get_dialect_name(session)`
using the original session parameter, but we're inside a context where
`work_session` is the active session being used for operations. For
consistency, this should use `get_dialect_name(work_session)` instead. This
could be problematic for MySQL where the original session has been committed
and a new session created.
```suggestion
dialect_label = " (MySQL)" if get_dialect_name(work_session) ==
"mysql" else ""
```
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1120,38 +1235,56 @@ def upgradedb(
exit(1)
if not _get_current_revision(session=session) and not to_revision:
- # Don't load default connections
# New DB; initialize and exit
initdb(session=session)
return
- with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
- import sqlalchemy.pool
- log.info("Migrating the Airflow database")
- val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
- try:
- # Reconfigure the ORM to use _EXACTLY_ one connection, otherwise
some db engines hang forever
- # trying to ALTER TABLEs
- os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = "1"
-
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
- command.upgrade(config, revision=to_revision or "heads")
- current_revision = _get_current_revision(session=session)
- with _configured_alembic_environment() as env:
- source_heads = env.script.get_heads()
- if current_revision == source_heads[0]:
- # Only run external DB upgrade migration if user upgraded to
heads
+ _run_upgradedb(config, to_revision, session)
+
+
+def _resetdb_mysql(session: Session) -> None:
+ """Drop all Airflow tables for MySQL."""
+ from sqlalchemy.orm import Session as SASession
+
+ # MySQL: Release metadata locks and use AUTOCOMMIT for DDL
+ log.info("MySQL: Releasing metadata locks before DDL operations")
+ session.commit()
+ session.close()
+
+ # Use create_global_lock for migration safety (now handles MySQL with
AUTOCOMMIT)
+ engine = settings.get_engine()
+ lock_session = SASession(bind=engine)
+ try:
+ with (
+ create_global_lock(session=lock_session, lock=DBLocks.MIGRATIONS),
+ engine.connect() as connection,
+ ):
+ ddl_conn =
connection.execution_options(isolation_level="AUTOCOMMIT")
+
+ drop_airflow_models(ddl_conn)
+ drop_airflow_moved_tables(ddl_conn)
+ log.info("Dropped all Airflow tables")
+
+ # Use raw Session to avoid scoped session issues
+ work_session = SASession(bind=ddl_conn)
+ try:
external_db_manager = RunDBManager()
- external_db_manager.upgradedb(session)
+ external_db_manager.drop_tables(work_session, ddl_conn)
+ finally:
+ work_session.close()
+ finally:
+ lock_session.close()
- finally:
- if val is None:
- os.environ.pop("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
- else:
- os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = val
- settings.reconfigure_orm()
- add_default_pool_if_not_exists(session=session)
- synchronize_log_template(session=session)
+def _resetdb_default(session: Session) -> None:
+ """Drop all Airflow tables for PostgreSQL/SQLite."""
+ connection = settings.get_engine().connect()
+ with create_global_lock(session=session, lock=DBLocks.MIGRATIONS),
connection.begin():
Review Comment:
Connection resource leak: The connection created by
`settings.get_engine().connect()` is not properly closed. Unlike the MySQL path
which uses a context manager (`with engine.connect() as connection`), this
connection is never explicitly closed. In SQLAlchemy 2.0, proper connection
management is more critical. This should be wrapped in a context manager to
ensure the connection is properly closed.
```suggestion
engine = settings.get_engine()
with engine.connect() as connection, create_global_lock(session=session,
lock=DBLocks.MIGRATIONS), connection.begin():
```
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -700,39 +721,101 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
- # Use AUTOCOMMIT for DDL to avoid metadata lock issues
- with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
- from alembic import command
+ log.info("Stamping migration head")
+ command.stamp(config, "head")
- log.info("Stamping migration head")
- command.stamp(config, "head")
+ log.info("Airflow database tables created")
- log.info("Airflow database tables created")
+
+def _create_db_from_orm(session):
+ """Create database tables from ORM models and stamp alembic version."""
+ log.info("Creating Airflow database tables from the ORM")
+ _setup_debug_logging_if_needed()
+
+ if get_dialect_name(session) == "mysql":
+ _create_db_from_orm_mysql(session)
+ else:
+ _create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+ import atexit
import faulthandler
- import threading
+ from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
- # Enable Fault Handler
+ # Enable faulthandler for debugging long-running threads and deadlocks,
+ # but disable it before interpreter shutdown to avoid segfaults during
+ # cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
- # Print Active Threads and Stack Traces Periodically
- def dump_stacks():
- while True:
- for thread_id, frame in sys._current_frames().items():
- log.info("\nThread %s stack:", thread_id)
- traceback.print_stack(frame)
- time.sleep(300)
+ # Cancel any pending traceback dumps and disable faulthandler before exit
+ # to prevent it from interfering with C extension cleanup
+ def cleanup_faulthandler():
+ with suppress(Exception):
+ faulthandler.cancel_dump_traceback_later()
+ with suppress(Exception):
+ faulthandler.disable()
+
+ atexit.register(cleanup_faulthandler)
+
+ # Set up periodic traceback dumps for debugging hanging tests/threads
+ faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
- threading.Thread(target=dump_stacks, daemon=True).start()
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+ """
+ Create a MySQL-specific lock session for migration operations.
+
+ This context manager:
+ 1. Commits the original session to release metadata locks
+ 2. Creates a new session bound to the engine
+ 3. Ensures the session is properly closed on exit
+
+ :param original_session: The original session to commit
+ :return: A new session suitable for use with create_global_lock
+ """
+ from sqlalchemy.orm import Session as SASession
+
+ log.info("MySQL: Committing session to release metadata locks")
+ original_session.commit()
+
+ lock_session = SASession(bind=settings.engine)
Review Comment:
Inconsistent engine access pattern: Line 790 uses `settings.engine`
directly, while most other places in this file use `settings.get_engine()`
(e.g., lines 1151, 1255, 1281, 1507). For consistency and better compatibility
with SQLAlchemy 2.0's engine management, consider using `settings.get_engine()`
here as well.
```suggestion
lock_session = SASession(bind=settings.get_engine())
```
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1340,58 +1496,102 @@ def __str__(self):
@contextlib.contextmanager
-def create_global_lock(
- session: Session,
- lock: DBLocks,
- lock_timeout: int = 1800,
+def _create_global_lock_mysql(lock: DBLocks, lock_timeout: int) ->
Generator[None, None, None]:
+ """
+ Create a global advisory lock for MySQL.
+
+ Uses a dedicated AUTOCOMMIT connection because:
+ - GET_LOCK is session-level, not transaction-level
+ - DDL operations cause implicit commits that would break transaction
wrappers
+ """
+ lock_conn = settings.get_engine().connect()
+ try:
+ lock_conn = lock_conn.execution_options(isolation_level="AUTOCOMMIT")
+
+ # GET_LOCK returns: 1 = acquired, 0 = timeout, NULL = error
+ lock_result = lock_conn.execute(
+ text("SELECT GET_LOCK(:lock_name, :timeout)"),
+ {"lock_name": str(lock), "timeout": lock_timeout},
+ ).scalar()
+
+ if lock_result != 1:
+ raise RuntimeError(
+ f"Could not acquire MySQL advisory lock '{lock}'. "
+ f"Result: {lock_result}. Another process may be holding the
lock."
+ )
+
+ try:
+ yield
+ finally:
+ lock_conn.execute(text("SELECT RELEASE_LOCK(:lock_name)"),
{"lock_name": str(lock)})
+ finally:
+ lock_conn.close()
+
+
[email protected]
+def _create_global_lock_postgresql(
+ session: Session, lock: DBLocks, lock_timeout: int
) -> Generator[None, None, None]:
- """Contextmanager that will create and teardown a global db lock."""
+ """Create a global advisory lock for PostgreSQL using transactional
advisory locks."""
bind = session.get_bind()
if hasattr(bind, "connect"):
conn = bind.connect()
else:
conn = bind
Review Comment:
Potential connection leak in PostgreSQL lock: Lines 1537-1540 create a new
connection if `bind` has a `connect` method, but this connection is never
explicitly closed in the `finally` block. In SQLAlchemy 2.0, proper connection
lifecycle management is critical. If a new connection is created at line 1538,
it should be closed in the finally block to prevent resource leaks.
##########
airflow-core/src/airflow/utils/db_manager.py:
##########
@@ -107,6 +124,8 @@ def drop_tables(self, connection):
def resetdb(self, skip_init=False):
from airflow.utils.db import DBLocks, create_global_lock
+ self._release_metadata_locks_if_needed()
+
connection = settings.engine.connect()
Review Comment:
Inconsistent engine access: Line 129 uses `settings.engine.connect()`
directly, while the rest of the codebase predominantly uses
`settings.get_engine().connect()`. For consistency with the SQLAlchemy 2.0
upgrade and the patterns used elsewhere in this PR (see db.py lines 1151, 1255,
1281, 1507), this should use `settings.get_engine()` instead of accessing the
`engine` attribute directly.
```suggestion
connection = settings.get_engine().connect()
```
##########
providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py:
##########
@@ -888,20 +888,26 @@ def test_get_db_manager(self, auth_manager):
@mock.patch("airflow.utils.db.drop_airflow_models")
@mock.patch("airflow.utils.db.drop_airflow_moved_tables")
@mock.patch("airflow.utils.db.initdb")
[email protected]("airflow.settings.engine.connect")
[email protected]("airflow.settings.engine")
Review Comment:
Test mocking mismatch: The test mocks `airflow.settings.engine` (line 891),
but the actual code in `_resetdb_default` calls `settings.get_engine()` (line
1281 of db.py). While this might work if `get_engine()` returns the `engine`
attribute, this creates a fragile dependency. The test should mock
`airflow.settings.get_engine` instead to match the actual implementation, or
verify that the mocked `engine` is returned by `get_engine()`.
##########
providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py:
##########
@@ -888,20 +888,26 @@ def test_get_db_manager(self, auth_manager):
@mock.patch("airflow.utils.db.drop_airflow_models")
@mock.patch("airflow.utils.db.drop_airflow_moved_tables")
@mock.patch("airflow.utils.db.initdb")
[email protected]("airflow.settings.engine.connect")
[email protected]("airflow.settings.engine")
def test_resetdb(
- mock_connect,
+ mock_engine,
mock_init,
mock_drop_moved,
mock_drop_airflow,
mock_fabdb_manager,
skip_init,
):
+ # Mock as non-MySQL to use the simpler PostgreSQL/SQLite path
+ mock_engine.dialect.name = "postgresql"
+ mock_connect = mock_engine.connect.return_value
+
session_mock = MagicMock()
Review Comment:
Incomplete test setup: The test sets `mock_engine.dialect.name =
"postgresql"` to test the non-MySQL path, but `get_dialect_name(session)` gets
the dialect from `session.get_bind().dialect.name`, not from `settings.engine`.
The test should also configure `session_mock.get_bind().dialect.name` to ensure
it returns "postgresql" so that the code correctly identifies this as the
PostgreSQL path. Without this, the test may not actually be testing the
intended code path.
```suggestion
session_mock = MagicMock()
# Ensure the session's bind also reports a PostgreSQL dialect, matching
the non-MySQL path
bind_mock = session_mock.get_bind.return_value
bind_mock.dialect.name = "postgresql"
```
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -700,39 +721,101 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
- # Use AUTOCOMMIT for DDL to avoid metadata lock issues
- with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
- from alembic import command
+ log.info("Stamping migration head")
+ command.stamp(config, "head")
- log.info("Stamping migration head")
- command.stamp(config, "head")
+ log.info("Airflow database tables created")
- log.info("Airflow database tables created")
+
+def _create_db_from_orm(session):
+ """Create database tables from ORM models and stamp alembic version."""
+ log.info("Creating Airflow database tables from the ORM")
+ _setup_debug_logging_if_needed()
+
+ if get_dialect_name(session) == "mysql":
+ _create_db_from_orm_mysql(session)
+ else:
+ _create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+ import atexit
import faulthandler
- import threading
+ from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
- # Enable Fault Handler
+ # Enable faulthandler for debugging long-running threads and deadlocks,
+ # but disable it before interpreter shutdown to avoid segfaults during
+ # cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
- # Print Active Threads and Stack Traces Periodically
- def dump_stacks():
- while True:
- for thread_id, frame in sys._current_frames().items():
- log.info("\nThread %s stack:", thread_id)
- traceback.print_stack(frame)
- time.sleep(300)
+ # Cancel any pending traceback dumps and disable faulthandler before exit
+ # to prevent it from interfering with C extension cleanup
+ def cleanup_faulthandler():
+ with suppress(Exception):
+ faulthandler.cancel_dump_traceback_later()
+ with suppress(Exception):
+ faulthandler.disable()
+
+ atexit.register(cleanup_faulthandler)
+
+ # Set up periodic traceback dumps for debugging hanging tests/threads
+ faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
- threading.Thread(target=dump_stacks, daemon=True).start()
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+ """
+ Create a MySQL-specific lock session for migration operations.
+
+ This context manager:
+ 1. Commits the original session to release metadata locks
+ 2. Creates a new session bound to the engine
+ 3. Ensures the session is properly closed on exit
+
+ :param original_session: The original session to commit
+ :return: A new session suitable for use with create_global_lock
+ """
+ from sqlalchemy.orm import Session as SASession
+
+ log.info("MySQL: Committing session to release metadata locks")
+ original_session.commit()
+
+ lock_session = SASession(bind=settings.engine)
+ try:
+ yield lock_session
+ finally:
+ lock_session.close()
+
+
[email protected]
+def _single_connection_pool() -> Generator[None, None, None]:
+ """
+ Temporarily reconfigure ORM to use exactly one connection.
+
+ This is needed for migrations because some database engines hang forever
+ trying to ALTER TABLEs when multiple connections exist in the pool.
+
+ Saves and restores the AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE environment
variable.
+ """
+ import sqlalchemy.pool
+
+ previous_pool_size =
os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
+ try:
+ os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = "1"
+
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
+ yield
+ finally:
+ os.environ.pop("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE", None)
+ if previous_pool_size:
Review Comment:
Potential environment variable restoration issue: Line 815 uses
`os.environ.pop()` which could fail if the key doesn't exist (though it
shouldn't in this case). More importantly, if `previous_pool_size` was an empty
string (which is valid), line 816 will not restore it because empty strings are
falsy. Consider using `if previous_pool_size is not None:` instead of `if
previous_pool_size:` to properly handle empty string values.
```suggestion
if previous_pool_size is not None:
```
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1061,6 +1144,38 @@ def _revisions_above_min_for_offline(config, revisions)
-> None:
)
+def _run_upgradedb(config, to_revision: str | None, session: Session) -> None:
+ """Run database upgrade with appropriate locking for the dialect."""
+ from alembic import command
+
+ is_mysql = settings.get_engine().dialect.name == "mysql"
+ dialect_label = " (MySQL)" if is_mysql else ""
+ log.info("Migrating the Airflow database%s", dialect_label)
+
+ # MySQL needs a separate lock session; others use the original session
+ session_cm: contextlib.AbstractContextManager[Session] = (
+ _mysql_lock_session_for_migration(session) if is_mysql else
contextlib.nullcontext(session)
+ )
+
+ with (
+ session_cm as work_session,
+ create_global_lock(session=work_session, lock=DBLocks.MIGRATIONS),
+ _single_connection_pool(),
+ ):
+ command.upgrade(config, revision=to_revision or "heads")
+
+ current_revision = _get_current_revision(session=work_session)
+ with _configured_alembic_environment() as env:
+ source_heads = env.script.get_heads()
+
+ if current_revision == source_heads[0]:
+ external_db_manager = RunDBManager()
+ external_db_manager.upgradedb(work_session)
+
+ add_default_pool_if_not_exists(session=work_session)
+ synchronize_log_template(session=work_session)
Review Comment:
Session used after closure: For MySQL, `work_session` is created by
`_mysql_lock_session_for_migration` and will be closed when the context manager
exits at line 1174. However, lines 1175-1176 attempt to use `work_session`
after it has been closed, which will cause errors. These calls should either be
moved inside the context manager or use the original `session` parameter
instead of `work_session`.
```suggestion
add_default_pool_if_not_exists(session=work_session)
synchronize_log_template(session=work_session)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]