Nataneljpwd commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2645209669


##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -685,12 +685,32 @@ def _create_db_from_orm(session):
 
     log.info("Creating Airflow database tables from the ORM")
 
-    # Debug setup if requested
     _setup_debug_logging_if_needed()
 
-    log.info("Creating context")
+    if get_dialect_name(session) == "mysql":

Review Comment:
   Same here, quite a bit of MySQL specific logic, maybe it is better to move 
it to a different method?



##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1120,36 +1195,47 @@ def upgradedb(
         exit(1)
 
     if not _get_current_revision(session=session) and not to_revision:
-        # Don't load default connections
         # New DB; initialize and exit
         initdb(session=session)
         return
-    with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
-        import sqlalchemy.pool
 
+    if settings.get_engine().dialect.name == "mysql":

Review Comment:
   Same here, quite a bit of MySQL specific logic, could be worth it to move it 
to another method



##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1236,7 +1372,7 @@ def _get_fab_migration_version(*, session: Session) -> 
str | None:
         return None
 
 
-def _handle_fab_downgrade(*, session: Session) -> None:
+def _handle_fab_downgrade(*, session: Session, is_mysql: bool) -> None:

Review Comment:
   Are we sure we want to use a boolean here and change the api of the method 
instead of just having another function Incase of MySQL, and when calling it, 
have an if statement to decide which method to call



##########
airflow-core/src/airflow/utils/db_manager.py:
##########
@@ -48,6 +49,24 @@ def __init__(self, session):
         super().__init__()
         self.session = session
 
+    def _is_mysql(self) -> bool:
+        """Check if the database is MySQL."""
+        return get_dialect_name(self.session) == "mysql"
+
+    def _release_metadata_locks(self) -> None:

Review Comment:
   Maybe it's a good idea to, in the method name, add MySQL if it is MySQL 
specific



##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1163,17 +1249,52 @@ def resetdb(session: Session = NEW_SESSION, skip_init: 
bool = False):
 
     import_all_models()
 
-    connection = settings.engine.connect()
+    if get_dialect_name(session) == "mysql":

Review Comment:
   There is quite a bit of logic here specific for MySQL, maybe it is a good 
idea to separate it to another method



##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -1345,18 +1490,59 @@ def create_global_lock(
     lock: DBLocks,
     lock_timeout: int = 1800,
 ) -> Generator[None, None, None]:
-    """Contextmanager that will create and teardown a global db lock."""
-    bind = session.get_bind()
-    if hasattr(bind, "connect"):
-        conn = bind.connect()
-    else:
-        conn = bind
+    """
+    Contextmanager that will create and teardown a global db lock.
+
+    For MySQL, uses a dedicated AUTOCOMMIT connection because:
+    - GET_LOCK is session-level, not transaction-level
+    - DDL operations cause implicit commits that would break transaction 
wrappers
+
+    For PostgreSQL, uses transactional advisory locks as before.
+    """
     dialect_name = get_dialect_name(session)
-    try:
-        if dialect_name == "postgresql":
+
+    if dialect_name == "mysql":

Review Comment:
   Same here, a lot of MySQL specific code which could be separated to a 
different method, improving readability 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to