uranusjr commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r758351332



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses 
names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
[email protected]
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), 
timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), 
timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL
+            pass
+
+        yield None
+    finally:
+        # The session may have been "closed" (which is fine, the lock lasts 
more than a transaction) -- ensure
+        # we get a usable connection
+        conn = session.connection()
+        if dialect.name == 'postgresql':
+            conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
+            conn.execute(text('SELECT pg_advisory_unlock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("select RELEASE_LOCK(:id)"), id=str(lock))
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL

Review comment:
       ```suggestion
               # TODO: make locking work for MSSQL
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses 
names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
[email protected]
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), 
timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), 
timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL

Review comment:
       ```suggestion
               # TODO: make locking work for MSSQL
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
[email protected]
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses 
names, so we use the ``_name_``
+    field.

Review comment:
       ```suggestion
       Postgres uses int64 lock ids so we use the integer value, MySQL uses 
names, so we
       call ``str()`, which is implemented using the ``_name_`` field.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to