nuclearpinguin commented on a change in pull request #7370: [AIRFLOW-6590] Use 
batch db operations in jobs
URL: https://github.com/apache/airflow/pull/7370#discussion_r377103522
 
 

 ##########
 File path: airflow/utils/sqlalchemy.py
 ##########
 @@ -20,48 +20,73 @@
 import json
 import logging
 import os
+import time
+import traceback
 
 import pendulum
 from dateutil import relativedelta
 from sqlalchemy import event, exc
 from sqlalchemy.types import DateTime, Text, TypeDecorator
 
+from airflow.configuration import conf
+
 log = logging.getLogger(__name__)
+
 utc = pendulum.timezone('UTC')
 
 
 def setup_event_handlers(engine):
     """
     Setups event handlers.
     """
+    # pylint: disable=unused-argument
     @event.listens_for(engine, "connect")
-    def connect(dbapi_connection, connection_record):  # pylint: 
disable=unused-argument
+    def connect(dbapi_connection, connection_record):
         connection_record.info['pid'] = os.getpid()
 
     if engine.dialect.name == "sqlite":
         @event.listens_for(engine, "connect")
-        def set_sqlite_pragma(dbapi_connection, connection_record):  # pylint: 
disable=unused-argument
+        def set_sqlite_pragma(dbapi_connection, connection_record):
             cursor = dbapi_connection.cursor()
             cursor.execute("PRAGMA foreign_keys=ON")
             cursor.close()
 
     # this ensures sanity in mysql when storing datetimes (not required for 
postgres)
     if engine.dialect.name == "mysql":
         @event.listens_for(engine, "connect")
-        def set_mysql_timezone(dbapi_connection, connection_record):  # 
pylint: disable=unused-argument
+        def set_mysql_timezone(dbapi_connection, connection_record):
             cursor = dbapi_connection.cursor()
             cursor.execute("SET time_zone = '+00:00'")
             cursor.close()
 
     @event.listens_for(engine, "checkout")
-    def checkout(dbapi_connection, connection_record, connection_proxy):  # 
pylint: disable=unused-argument
+    def checkout(dbapi_connection, connection_record, connection_proxy):
         pid = os.getpid()
         if connection_record.info['pid'] != pid:
             connection_record.connection = connection_proxy.connection = None
             raise exc.DisconnectionError(
                 "Connection record belongs to pid {}, "
                 "attempting to check out in pid 
{}".format(connection_record.info['pid'], pid)
             )
+    if conf.getboolean('debug', 'sqlalchemy_stats', fallback=False):
+        @event.listens_for(engine, "before_cursor_execute")
+        def before_cursor_execute(conn, cursor, statement, parameters, 
context, executemany):
+            conn.info.setdefault('query_start_time', []).append(time.time())
 
 Review comment:
   Oh, I haven't known that 👌

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to