nuclearpinguin commented on a change in pull request #7370: [AIRFLOW-6590] Use batch db operations in jobs URL: https://github.com/apache/airflow/pull/7370#discussion_r377103522
########## File path: airflow/utils/sqlalchemy.py ########## @@ -20,48 +20,73 @@ import json import logging import os +import time +import traceback import pendulum from dateutil import relativedelta from sqlalchemy import event, exc from sqlalchemy.types import DateTime, Text, TypeDecorator +from airflow.configuration import conf + log = logging.getLogger(__name__) + utc = pendulum.timezone('UTC') def setup_event_handlers(engine): """ Setups event handlers. """ + # pylint: disable=unused-argument @event.listens_for(engine, "connect") - def connect(dbapi_connection, connection_record): # pylint: disable=unused-argument + def connect(dbapi_connection, connection_record): connection_record.info['pid'] = os.getpid() if engine.dialect.name == "sqlite": @event.listens_for(engine, "connect") - def set_sqlite_pragma(dbapi_connection, connection_record): # pylint: disable=unused-argument + def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() # this ensures sanity in mysql when storing datetimes (not required for postgres) if engine.dialect.name == "mysql": @event.listens_for(engine, "connect") - def set_mysql_timezone(dbapi_connection, connection_record): # pylint: disable=unused-argument + def set_mysql_timezone(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() cursor.execute("SET time_zone = '+00:00'") cursor.close() @event.listens_for(engine, "checkout") - def checkout(dbapi_connection, connection_record, connection_proxy): # pylint: disable=unused-argument + def checkout(dbapi_connection, connection_record, connection_proxy): pid = os.getpid() if connection_record.info['pid'] != pid: connection_record.connection = connection_proxy.connection = None raise exc.DisconnectionError( "Connection record belongs to pid {}, " "attempting to check out in pid {}".format(connection_record.info['pid'], pid) ) + if conf.getboolean('debug', 'sqlalchemy_stats', fallback=False): + @event.listens_for(engine, "before_cursor_execute") + def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): + conn.info.setdefault('query_start_time', []).append(time.time()) Review comment: Oh, I haven't known that 👌 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services