This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 696b91fafe Speed up log template sync by avoiding ORM (#30119)
696b91fafe is described below

commit 696b91fafe4a557f179098e0609eb9d9dcb73f72
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Mar 16 19:32:47 2023 +0800

    Speed up log template sync by avoiding ORM (#30119)
---
 airflow/utils/db.py | 50 ++++++++++++++++++++++++++------------------------
 1 file changed, 26 insertions(+), 24 deletions(-)

diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 7c5bcd8848..50eed1d649 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -714,8 +714,8 @@ def initdb(session: Session = NEW_SESSION, 
load_connections: bool = True):
     if conf.getboolean("database", "LOAD_DEFAULT_CONNECTIONS") and 
load_connections:
         create_default_connections(session=session)
     # Add default pool & sync log_template
-    add_default_pool_if_not_exists()
-    synchronize_log_template()
+    add_default_pool_if_not_exists(session=session)
+    synchronize_log_template(session=session)
 
 
 def _get_alembic_config():
@@ -859,24 +859,32 @@ def synchronize_log_template(*, session: Session = 
NEW_SESSION) -> None:
     This checks if the last row fully matches the current config values, and
     insert a new row if not.
     """
+    # NOTE: SELECT queries in this function are INTENTIONALLY written with the
+    # SQL builder style, not the ORM query API. This avoids configuring the ORM
+    # unless we need to insert something, speeding up CLI in general.
+
     from airflow.models.tasklog import LogTemplate
 
-    def log_template_exists():
-        metadata = reflect_tables([LogTemplate], session)
-        log_template_table = metadata.tables.get(LogTemplate.__tablename__)
-        return log_template_table is not None
+    metadata = reflect_tables([LogTemplate], session)
+    log_template_table: Table | None = 
metadata.tables.get(LogTemplate.__tablename__)
 
-    if not log_template_exists():
+    if log_template_table is None:
         log.info("Log template table does not exist (added in 2.3.0); skipping 
log template sync.")
         return
 
     filename = conf.get("logging", "log_filename_template")
     elasticsearch_id = conf.get("elasticsearch", "log_id_template")
 
-    # First check if we have an empty table. If so, and the default values 
exist,
-    # we will seed the table with the values from pre 2.3.0, so old logs will
-    # still be retrievable.
-    if not session.query(LogTemplate.id).first():
+    stored = session.execute(
+        select(
+            log_template_table.c.filename,
+            log_template_table.c.elasticsearch_id,
+        ).order_by(log_template_table.c.id.desc()),
+    ).first()
+
+    # If we have an empty table, and the default values exist, we will seed the
+    # table with values from pre 2.3.0, so old logs will still be retrievable.
+    if not stored:
         is_default_log_id = elasticsearch_id == 
conf.airflow_defaults.get("elasticsearch", "log_id_template")
         is_default_filename = filename == conf.airflow_defaults.get("logging", 
"log_filename_template")
         if is_default_log_id and is_default_filename:
@@ -886,7 +894,6 @@ def synchronize_log_template(*, session: Session = 
NEW_SESSION) -> None:
                     
elasticsearch_id="{dag_id}-{task_id}-{execution_date}-{try_number}",
                 )
             )
-            session.flush()
 
     # Before checking if the _current_ value exists, we need to check if the 
old config value we upgraded in
     # place exists!
@@ -894,29 +901,24 @@ def synchronize_log_template(*, session: Session = 
NEW_SESSION) -> None:
     pre_upgrade_elasticsearch_id = conf.upgraded_values.get(
         ("elasticsearch", "log_id_template"), elasticsearch_id
     )
-
     if pre_upgrade_filename != filename or pre_upgrade_elasticsearch_id != 
elasticsearch_id:
         # The previous non-upgraded value likely won't be the _latest_ value 
(as after we've recorded the
         # recorded the upgraded value it will be second-to-newest), so we'll 
have to just search which is okay
         # as this is a table with a tiny number of rows
-        row = (
-            session.query(LogTemplate.id)
-            .filter(
+        row = session.execute(
+            select(log_template_table.c.id)
+            .where(
                 or_(
-                    LogTemplate.filename == pre_upgrade_filename,
-                    LogTemplate.elasticsearch_id == 
pre_upgrade_elasticsearch_id,
+                    log_template_table.c.filename == pre_upgrade_filename,
+                    log_template_table.c.elasticsearch_id == 
pre_upgrade_elasticsearch_id,
                 )
             )
-            .order_by(LogTemplate.id.desc())
-            .first()
-        )
+            .order_by(log_template_table.c.id.desc())
+        ).first()
         if not row:
             session.add(
                 LogTemplate(filename=pre_upgrade_filename, 
elasticsearch_id=pre_upgrade_elasticsearch_id)
             )
-            session.flush()
-
-    stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
 
     if not stored or stored.filename != filename or stored.elasticsearch_id != 
elasticsearch_id:
         session.add(LogTemplate(filename=filename, 
elasticsearch_id=elasticsearch_id))

Reply via email to