Dev-iL commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2650249171
##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -700,39 +721,101 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
- # Use AUTOCOMMIT for DDL to avoid metadata lock issues
- with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
- from alembic import command
+ log.info("Stamping migration head")
+ command.stamp(config, "head")
- log.info("Stamping migration head")
- command.stamp(config, "head")
+ log.info("Airflow database tables created")
- log.info("Airflow database tables created")
+
+def _create_db_from_orm(session):
+ """Create database tables from ORM models and stamp alembic version."""
+ log.info("Creating Airflow database tables from the ORM")
+ _setup_debug_logging_if_needed()
+
+ if get_dialect_name(session) == "mysql":
+ _create_db_from_orm_mysql(session)
+ else:
+ _create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+ import atexit
import faulthandler
- import threading
+ from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
- # Enable Fault Handler
+ # Enable faulthandler for debugging long-running threads and deadlocks,
+ # but disable it before interpreter shutdown to avoid segfaults during
+ # cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
- # Print Active Threads and Stack Traces Periodically
- def dump_stacks():
- while True:
- for thread_id, frame in sys._current_frames().items():
- log.info("\nThread %s stack:", thread_id)
- traceback.print_stack(frame)
- time.sleep(300)
+ # Cancel any pending traceback dumps and disable faulthandler before exit
+ # to prevent it from interfering with C extension cleanup
+ def cleanup_faulthandler():
+ with suppress(Exception):
+ faulthandler.cancel_dump_traceback_later()
+ with suppress(Exception):
+ faulthandler.disable()
+
+ atexit.register(cleanup_faulthandler)
+
+ # Set up periodic traceback dumps for debugging hanging tests/threads
+ faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
- threading.Thread(target=dump_stacks, daemon=True).start()
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+ """
+ Create a MySQL-specific lock session for migration operations.
+
+ This context manager:
+ 1. Commits the original session to release metadata locks
+ 2. Creates a new session bound to the engine
+ 3. Ensures the session is properly closed on exit
+
+ :param original_session: The original session to commit
+ :return: A new session suitable for use with create_global_lock
+ """
+ from sqlalchemy.orm import Session as SASession
+
+ log.info("MySQL: Committing session to release metadata locks")
+ original_session.commit()
+
+ lock_session = SASession(bind=settings.engine)
Review Comment:
Couldn't get this one working, ended up reverting it. The code behaves
differently on 2.x and 3.x - might need to add a compat shim or try/except.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]