This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 696b91fafe Speed up log template sync by avoiding ORM (#30119)
696b91fafe is described below
commit 696b91fafe4a557f179098e0609eb9d9dcb73f72
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Mar 16 19:32:47 2023 +0800
Speed up log template sync by avoiding ORM (#30119)
---
airflow/utils/db.py | 50 ++++++++++++++++++++++++++------------------------
1 file changed, 26 insertions(+), 24 deletions(-)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 7c5bcd8848..50eed1d649 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -714,8 +714,8 @@ def initdb(session: Session = NEW_SESSION,
load_connections: bool = True):
if conf.getboolean("database", "LOAD_DEFAULT_CONNECTIONS") and
load_connections:
create_default_connections(session=session)
# Add default pool & sync log_template
- add_default_pool_if_not_exists()
- synchronize_log_template()
+ add_default_pool_if_not_exists(session=session)
+ synchronize_log_template(session=session)
def _get_alembic_config():
@@ -859,24 +859,32 @@ def synchronize_log_template(*, session: Session =
NEW_SESSION) -> None:
This checks if the last row fully matches the current config values, and
insert a new row if not.
"""
+ # NOTE: SELECT queries in this function are INTENTIONALLY written with the
+ # SQL builder style, not the ORM query API. This avoids configuring the ORM
+ # unless we need to insert something, speeding up CLI in general.
+
from airflow.models.tasklog import LogTemplate
- def log_template_exists():
- metadata = reflect_tables([LogTemplate], session)
- log_template_table = metadata.tables.get(LogTemplate.__tablename__)
- return log_template_table is not None
+ metadata = reflect_tables([LogTemplate], session)
+ log_template_table: Table | None =
metadata.tables.get(LogTemplate.__tablename__)
- if not log_template_exists():
+ if log_template_table is None:
log.info("Log template table does not exist (added in 2.3.0); skipping
log template sync.")
return
filename = conf.get("logging", "log_filename_template")
elasticsearch_id = conf.get("elasticsearch", "log_id_template")
- # First check if we have an empty table. If so, and the default values
exist,
- # we will seed the table with the values from pre 2.3.0, so old logs will
- # still be retrievable.
- if not session.query(LogTemplate.id).first():
+ stored = session.execute(
+ select(
+ log_template_table.c.filename,
+ log_template_table.c.elasticsearch_id,
+ ).order_by(log_template_table.c.id.desc()),
+ ).first()
+
+ # If we have an empty table, and the default values exist, we will seed the
+ # table with values from pre 2.3.0, so old logs will still be retrievable.
+ if not stored:
is_default_log_id = elasticsearch_id ==
conf.airflow_defaults.get("elasticsearch", "log_id_template")
is_default_filename = filename == conf.airflow_defaults.get("logging",
"log_filename_template")
if is_default_log_id and is_default_filename:
@@ -886,7 +894,6 @@ def synchronize_log_template(*, session: Session =
NEW_SESSION) -> None:
elasticsearch_id="{dag_id}-{task_id}-{execution_date}-{try_number}",
)
)
- session.flush()
# Before checking if the _current_ value exists, we need to check if the
old config value we upgraded in
# place exists!
@@ -894,29 +901,24 @@ def synchronize_log_template(*, session: Session =
NEW_SESSION) -> None:
pre_upgrade_elasticsearch_id = conf.upgraded_values.get(
("elasticsearch", "log_id_template"), elasticsearch_id
)
-
if pre_upgrade_filename != filename or pre_upgrade_elasticsearch_id !=
elasticsearch_id:
# The previous non-upgraded value likely won't be the _latest_ value
(as after we've recorded the
# recorded the upgraded value it will be second-to-newest), so we'll
have to just search which is okay
# as this is a table with a tiny number of rows
- row = (
- session.query(LogTemplate.id)
- .filter(
+ row = session.execute(
+ select(log_template_table.c.id)
+ .where(
or_(
- LogTemplate.filename == pre_upgrade_filename,
- LogTemplate.elasticsearch_id ==
pre_upgrade_elasticsearch_id,
+ log_template_table.c.filename == pre_upgrade_filename,
+ log_template_table.c.elasticsearch_id ==
pre_upgrade_elasticsearch_id,
)
)
- .order_by(LogTemplate.id.desc())
- .first()
- )
+ .order_by(log_template_table.c.id.desc())
+ ).first()
if not row:
session.add(
LogTemplate(filename=pre_upgrade_filename,
elasticsearch_id=pre_upgrade_elasticsearch_id)
)
- session.flush()
-
- stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
if not stored or stored.filename != filename or stored.elasticsearch_id !=
elasticsearch_id:
session.add(LogTemplate(filename=filename,
elasticsearch_id=elasticsearch_id))