Nataneljpwd commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2646187195
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1256,14 +1406,16 @@ def _handle_fab_downgrade(*, session: Session) -> None:
fab_version,
)
return
- connection = settings.get_engine().connect()
- insp = inspect(connection)
- if not fab_version and insp.has_table("ab_user"):
- log.info(
- "FAB migration version not found, but FAB tables exist. "
- "FAB provider is not required for downgrade.",
- )
- return
+
+ # Use context manager to ensure connection is closed
Review Comment:
do we need this comment? as, to me at least, it seems obvious that if I use
a context manager for a connection, it is to ensure it's closed when I finish
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -700,39 +721,102 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
- # Use AUTOCOMMIT for DDL to avoid metadata lock issues
- with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
- from alembic import command
+ log.info("Stamping migration head")
+ command.stamp(config, "head")
- log.info("Stamping migration head")
- command.stamp(config, "head")
+ log.info("Airflow database tables created")
+
+
+def _create_db_from_orm(session):
+ """Create database tables from ORM models and stamp alembic version."""
+ log.info("Creating Airflow database tables from the ORM")
+ _setup_debug_logging_if_needed()
- log.info("Airflow database tables created")
+ if get_dialect_name(session) == "mysql":
+ _create_db_from_orm_mysql(session)
+ else:
+ _create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+ import atexit
import faulthandler
- import threading
+ from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
- # Enable Fault Handler
+ # Enable faulthandler for debugging long-running threads and deadlocks,
+ # but disable it before interpreter shutdown to avoid segfaults during
+ # cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
- # Print Active Threads and Stack Traces Periodically
- def dump_stacks():
- while True:
- for thread_id, frame in sys._current_frames().items():
- log.info("\nThread %s stack:", thread_id)
- traceback.print_stack(frame)
- time.sleep(300)
+ # Cancel any pending traceback dumps and disable faulthandler before exit
+ # to prevent it from interfering with C extension cleanup
+ def cleanup_faulthandler():
+ with suppress(Exception):
+ faulthandler.cancel_dump_traceback_later()
+ with suppress(Exception):
+ faulthandler.disable()
+
+ atexit.register(cleanup_faulthandler)
+
+ # Set up periodic traceback dumps for debugging hanging tests/threads
+ faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
+
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+ """
+ Create a MySQL-specific lock session for migration operations.
+
+ This context manager:
+ 1. Commits the original session to release metadata locks
+ 2. Creates a new session bound to the engine
+ 3. Ensures the session is properly closed on exit
+
+ :param original_session: The original session to commit
+ :return: A new session suitable for use with create_global_lock
+ """
+ from sqlalchemy.orm import Session as SASession
+
+ log.info("MySQL: Committing session to release metadata locks")
+ original_session.commit()
+
+ lock_session = SASession(bind=settings.engine)
+ try:
+ yield lock_session
+ finally:
+ lock_session.close()
- threading.Thread(target=dump_stacks, daemon=True).start()
+
[email protected]
+def _single_connection_pool() -> Generator[None, None, None]:
+ """
+ Temporarily reconfigure ORM to use exactly one connection.
+
+ This is needed for migrations because some database engines hang forever
+ trying to ALTER TABLEs when multiple connections exist in the pool.
+
+ Saves and restores the AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE environment
variable.
+ """
+ import sqlalchemy.pool
+
+ val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
+ try:
+ os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = "1"
+
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
+ yield
+ finally:
+ if val is None:
+ os.environ.pop("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE", None)
Review Comment:
do we have to have this if along with the else? and pop environ? can't we
just pop the environ always? I think it can be rewritten as such:
```python
os.environ.pop("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE", None)
if val:
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = val
settings.reconfigure_orm()
```
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1340,58 +1498,108 @@ def __str__(self):
@contextlib.contextmanager
-def create_global_lock(
- session: Session,
- lock: DBLocks,
- lock_timeout: int = 1800,
+def _create_global_lock_mysql(lock: DBLocks, lock_timeout: int) ->
Generator[None, None, None]:
+ """
+ Create a global advisory lock for MySQL.
+
+ Uses a dedicated AUTOCOMMIT connection because:
+ - GET_LOCK is session-level, not transaction-level
+ - DDL operations cause implicit commits that would break transaction
wrappers
+ """
+ server_version = settings.get_engine().dialect.server_version_info
+ if not (server_version and server_version >= (5, 6)):
Review Comment:
According to
[this](https://airflow.apache.org/docs/apache-airflow/3.1.5/installation/prerequisites.html),
airflow 3 does not support versions lower than mysql 8, and even [airflow
2.1.4](https://airflow.apache.org/docs/apache-airflow/2.1.4/installation/prerequisites.html)
supported mysql 5.7 and above, so this if can probably be removed
##########
airflow-core/src/airflow/utils/db_manager.py:
##########
@@ -48,6 +49,21 @@ def __init__(self, session):
super().__init__()
self.session = session
+ def _release_mysql_metadata_locks(self) -> None:
+ """
+ Release MySQL metadata locks by committing the session.
+
+ MySQL requires metadata locks to be released before DDL operations.
+ This is done by committing the current transaction.
+ This method is a no-op for non-MySQL databases.
+ """
+ if get_dialect_name(self.session) != "mysql":
+ return
Review Comment:
is this really needed? is it ever used **not** for mysql? it does have mysql
in it's name, so logically, someone won't use it for pgsql, at least in my
opinion it is simpler and more intuitive to check the dialect wherever I use
the method rather than do it in the method, as it can cause confusion when
first getting into this code.
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -700,39 +721,102 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
- # Use AUTOCOMMIT for DDL to avoid metadata lock issues
- with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
- from alembic import command
+ log.info("Stamping migration head")
+ command.stamp(config, "head")
- log.info("Stamping migration head")
- command.stamp(config, "head")
+ log.info("Airflow database tables created")
+
+
+def _create_db_from_orm(session):
+ """Create database tables from ORM models and stamp alembic version."""
+ log.info("Creating Airflow database tables from the ORM")
+ _setup_debug_logging_if_needed()
- log.info("Airflow database tables created")
+ if get_dialect_name(session) == "mysql":
+ _create_db_from_orm_mysql(session)
+ else:
+ _create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+ import atexit
import faulthandler
- import threading
+ from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
- # Enable Fault Handler
+ # Enable faulthandler for debugging long-running threads and deadlocks,
+ # but disable it before interpreter shutdown to avoid segfaults during
+ # cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
- # Print Active Threads and Stack Traces Periodically
- def dump_stacks():
- while True:
- for thread_id, frame in sys._current_frames().items():
- log.info("\nThread %s stack:", thread_id)
- traceback.print_stack(frame)
- time.sleep(300)
+ # Cancel any pending traceback dumps and disable faulthandler before exit
+ # to prevent it from interfering with C extension cleanup
+ def cleanup_faulthandler():
+ with suppress(Exception):
+ faulthandler.cancel_dump_traceback_later()
+ with suppress(Exception):
+ faulthandler.disable()
+
+ atexit.register(cleanup_faulthandler)
+
+ # Set up periodic traceback dumps for debugging hanging tests/threads
+ faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
+
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+ """
+ Create a MySQL-specific lock session for migration operations.
+
+ This context manager:
+ 1. Commits the original session to release metadata locks
+ 2. Creates a new session bound to the engine
+ 3. Ensures the session is properly closed on exit
+
+ :param original_session: The original session to commit
+ :return: A new session suitable for use with create_global_lock
+ """
+ from sqlalchemy.orm import Session as SASession
+
+ log.info("MySQL: Committing session to release metadata locks")
+ original_session.commit()
+
+ lock_session = SASession(bind=settings.engine)
+ try:
+ yield lock_session
+ finally:
+ lock_session.close()
- threading.Thread(target=dump_stacks, daemon=True).start()
+
[email protected]
+def _single_connection_pool() -> Generator[None, None, None]:
+ """
+ Temporarily reconfigure ORM to use exactly one connection.
+
+ This is needed for migrations because some database engines hang forever
+ trying to ALTER TABLEs when multiple connections exist in the pool.
+
+ Saves and restores the AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE environment
variable.
+ """
+ import sqlalchemy.pool
+
+ val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
Review Comment:
It is a nit pick, but giving the `val` variable a better name could be nice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]