kaxil commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r757564067
##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
"""
session.execute('select 1 as is_alive;')
log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+ """
+ Cross-db Identifiers for advisory global database locks.
+
+ Postgres uses int64 lock ids so we use the integer value, MySQL uses
names, so we use the ``_name_``
+ field.
+ """
+
+ INIT = enum.auto()
+ MIGRATIONS = enum.auto()
+ SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+ def __str__(self):
+ return f"airflow_{self._name_}"
+
+
[email protected]
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+ """Contextmanager that will create and teardown a global db lock."""
+ conn = session.connection()
+ dialect = conn.dialect
+ try:
+ if dialect.name == 'postgresql':
+ conn.execute(text('SET LOCK_TIMEOUT to :timeout'),
timeout=lock_timeout)
+ conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
Review comment:
```suggestion
conn.execute(text('SELECT PG_ADVISORY_LOCK(:id)'), id=lock.value)
```
##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
"""
session.execute('select 1 as is_alive;')
log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+ """
+ Cross-db Identifiers for advisory global database locks.
+
+ Postgres uses int64 lock ids so we use the integer value, MySQL uses
names, so we use the ``_name_``
+ field.
+ """
+
+ INIT = enum.auto()
+ MIGRATIONS = enum.auto()
Review comment:
Any reason we need both `init` and `migrations` as `initdb()` runs
`upgradedb()` too?
##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
"""
session.execute('select 1 as is_alive;')
log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+ """
+ Cross-db Identifiers for advisory global database locks.
+
+ Postgres uses int64 lock ids so we use the integer value, MySQL uses
names, so we use the ``_name_``
+ field.
+ """
+
+ INIT = enum.auto()
+ MIGRATIONS = enum.auto()
+ SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+ def __str__(self):
+ return f"airflow_{self._name_}"
+
+
[email protected]
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+ """Contextmanager that will create and teardown a global db lock."""
+ conn = session.connection()
+ dialect = conn.dialect
+ try:
+ if dialect.name == 'postgresql':
+ conn.execute(text('SET LOCK_TIMEOUT to :timeout'),
timeout=lock_timeout)
+ conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+ elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+ conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock),
timeout=lock_timeout)
+ elif dialect.name == 'mssql':
+ # TODO: make locking works for MSSQL
+ pass
+
+ yield None
+ finally:
+ if dialect.name == 'postgresql':
+ conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
+ conn.execute(text('SELECT pg_advisory_unlock(:id)'), id=lock.value)
Review comment:
```suggestion
conn.execute(text('SELECT PG_ADVISORY_UNLOCK(:id)'),
id=lock.value)
```
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -234,8 +234,25 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session =
:type max_tis: int
:return: list[airflow.models.TaskInstance]
"""
+ from airflow.utils.db import DBLocks
+
executable_tis: List[TI] = []
+ if session.get_bind().dialect.name == "postgresql":
+ # Optimization: to avoid littering the DB errors of "ERROR:
canceling statement due to lock
+ # timeout", try to take out a transactional advisory lock (unlocks
automatically on
+ # COMMIT/ROLLBACK)
+ lock_acquired = session.execute(
+ text("SELECT pg_try_advisory_xact_lock(:id)").bindparams(
+ id=DBLocks.SCHEDULER_CRITICAL_SECTION
Review comment:
```suggestion
id=DBLocks.SCHEDULER_CRITICAL_SECTION.value
```
##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
"""
session.execute('select 1 as is_alive;')
log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+ """
+ Cross-db Identifiers for advisory global database locks.
+
+ Postgres uses int64 lock ids so we use the integer value, MySQL uses
names, so we use the ``_name_``
+ field.
+ """
+
+ INIT = enum.auto()
+ MIGRATIONS = enum.auto()
+ SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+ def __str__(self):
+ return f"airflow_{self._name_}"
+
+
[email protected]
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+ """Contextmanager that will create and teardown a global db lock."""
+ conn = session.connection()
+ dialect = conn.dialect
+ try:
+ if dialect.name == 'postgresql':
+ conn.execute(text('SET LOCK_TIMEOUT to :timeout'),
timeout=lock_timeout)
+ conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
Review comment:
nit as others are CAP in above and below lines
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]