ashb commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r758379886
##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
"""
session.execute('select 1 as is_alive;')
log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+ """
+ Cross-db Identifiers for advisory global database locks.
+
+ Postgres uses int64 lock ids so we use the integer value, MySQL uses
names, so we use the ``_name_``
+ field.
+ """
+
+ MIGRATIONS = enum.auto()
+ SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+ def __str__(self):
+ return f"airflow_{self._name_}"
+
+
[email protected]
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+ """Contextmanager that will create and teardown a global db lock."""
+ conn = session.connection()
+ dialect = conn.dialect
+ try:
+ if dialect.name == 'postgresql':
+ conn.execute(text('SET LOCK_TIMEOUT to :timeout'),
timeout=lock_timeout)
+ conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+ elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+ conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock),
timeout=lock_timeout)
+ elif dialect.name == 'mssql':
+ # TODO: make locking works for MSSQL
+ pass
+
+ yield None
+ finally:
+ # The session may have been "closed" (which is fine, the lock lasts
more than a transaction) -- ensure
+ # we get a usable connection
+ conn = session.connection()
Review comment:
This is the problem and the cause of the failures -- if we close the
connection we can't unlock someone else's lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]