This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 903dd8d07f Revert wrong migration revertion and revert the right one
(#31429)
903dd8d07f is described below
commit 903dd8d07fb29ed96584fb6d1438dacb8e5d76f1
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri May 19 20:03:12 2023 +0200
Revert wrong migration revertion and revert the right one (#31429)
* Revert "Revert "Save scheduler execution time by caching dags (#30704)"
(#31413)"
This reverts commit e6f21174ab42092b4ccc9f264627aa43b48b5a02.
* Revert "Save scheduler execution time by adding new Index idea for
dag_run (#30827)"
This reverts commit c63b7774cdba29394ec746b381f45e816dcb0830.
---
airflow/jobs/scheduler_job_runner.py | 18 +++++--
...2_7_0_add_index_on_last_scheduling_decision_.py | 56 ----------------------
airflow/models/dagrun.py | 9 ----
docs/apache-airflow/img/airflow_erd.sha256 | 2 +-
docs/apache-airflow/img/airflow_erd.svg | 4 +-
docs/apache-airflow/migrations-ref.rst | 5 +-
6 files changed, 19 insertions(+), 75 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 128a82d7ee..775ac759ff 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -28,8 +28,9 @@ import warnings
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timedelta
+from functools import lru_cache, partial
from pathlib import Path
-from typing import TYPE_CHECKING, Any, Collection, Iterable, Iterator
+from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
from sqlalchemy import and_, func, not_, or_, text
from sqlalchemy.exc import OperationalError
@@ -1052,8 +1053,13 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs,
session)
# Send the callbacks after we commit to ensure the context is up to
date when it gets run
+ # cache saves time during scheduling of many dag_runs for same dag
+ cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
+ partial(self.dagbag.get_dag, session=session)
+ )
for dag_run, callback_to_run in callback_tuples:
- dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
+ dag = cached_get_dag(dag_run.dag_id)
+
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table",
dag_run.dag_id)
continue
@@ -1317,8 +1323,14 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
tags={"dag_id": dag.dag_id},
)
+ # cache saves time during scheduling of many dag_runs for same dag
+ cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
+ partial(self.dagbag.get_dag, session=session)
+ )
+
for dag_run in dag_runs:
- dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
+ dag = dag_run.dag = cached_get_dag(dag_run.dag_id)
+
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table",
dag_run.dag_id)
continue
diff --git
a/airflow/migrations/versions/0126_2_7_0_add_index_on_last_scheduling_decision_.py
b/airflow/migrations/versions/0126_2_7_0_add_index_on_last_scheduling_decision_.py
deleted file mode 100644
index 6f94ce27dd..0000000000
---
a/airflow/migrations/versions/0126_2_7_0_add_index_on_last_scheduling_decision_.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Add index on last_scheduling_decision NULLS FIRST, execution_date, state
for queued dagrun
-
-Revision ID: 14db5484317e
-Revises: 937cbd173ca1
-Create Date: 2023-05-14 21:16:42.399167
-
-"""
-from __future__ import annotations
-
-from alembic import op
-from sqlalchemy import text
-
-# revision identifiers, used by Alembic.
-revision = "14db5484317e"
-down_revision = "937cbd173ca1"
-branch_labels = None
-depends_on = None
-airflow_version = "2.7.0"
-
-
-def upgrade():
- """Apply Add index on last_scheduling_decision NULLS FIRST,
execution_date, state for queued dagrun"""
- conn = op.get_bind()
- if conn.dialect.name == "postgresql":
- with op.batch_alter_table("dag_run") as batch_op:
- batch_op.create_index(
- "idx_last_scheduling_decision_queued",
- [text("last_scheduling_decision NULLS FIRST"),
"execution_date", "state"],
- postgresql_where=text("state='queued'"),
- )
-
-
-def downgrade():
- """Unapply Add index on last_scheduling_decision NULLS FIRST,
execution_date, state for queued dagrun"""
- conn = op.get_bind()
- if conn.dialect.name == "postgresql":
- with op.batch_alter_table("dag_run") as batch_op:
- batch_op.drop_index("idx_last_scheduling_decision_queued")
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index dba6fbf748..42845b34bd 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -144,15 +144,6 @@ class DagRun(Base, LoggingMixin):
UniqueConstraint("dag_id", "execution_date",
name="dag_run_dag_id_execution_date_key"),
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
Index("idx_last_scheduling_decision", last_scheduling_decision),
- Index(
- "idx_last_scheduling_decision_queued",
- # Not possible to add .nulls_first(), because only postgresql can
handle Index like that.
- # Migration script which contains postgres dialect check adds
NULLS FIST to index.
- last_scheduling_decision,
- execution_date,
- _state,
- postgresql_where=text("state='queued'"),
- ),
Index("idx_dag_run_dag_id", dag_id),
Index(
"idx_dag_run_running_dags",
diff --git a/docs/apache-airflow/img/airflow_erd.sha256
b/docs/apache-airflow/img/airflow_erd.sha256
index 8cde30d727..1f2c2c1f34 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-811b1c45f8fa985feacacffafc30c82a6049bb33948c33bb218c13c48f971097
\ No newline at end of file
+4987842fd67d29e194f1117e127d3291ba60d3fbc3e81cba75ce93884c263321
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg
b/docs/apache-airflow/img/airflow_erd.svg
index b531759ac4..8439c226f8 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1225,7 +1225,7 @@
<g id="edge48" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1137.01,-488.65C1161.12,-465.83 1186.07,-443.06 1210,-422 1216.27,-416.48
1222.72,-410.89 1229.27,-405.29"/>
-<text text-anchor="start" x="1198.27" y="-394.09" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1219.27" y="-394.09" font-family="Times,serif"
font-size="14.00">1</text>
<text text-anchor="start" x="1137.01" y="-477.45" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--xcom -->
@@ -1239,7 +1239,7 @@
<g id="edge50" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1137.01,-524.15C1161.12,-501.83 1186.07,-479.06 1210,-458 1216.66,-452.14
1223.53,-446.19 1230.5,-440.21"/>
-<text text-anchor="start" x="1220.5" y="-444.01" font-family="Times,serif"
font-size="14.00">1</text>
+<text text-anchor="start" x="1199.5" y="-444.01" font-family="Times,serif"
font-size="14.00">0..N</text>
<text text-anchor="start" x="1137.01" y="-512.95" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--xcom -->
diff --git a/docs/apache-airflow/migrations-ref.rst
b/docs/apache-airflow/migrations-ref.rst
index c29c0fbeb8..2fdc682025 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,10 +39,7 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version |
Description |
+=================================+===================+===================+==============================================================+
-| ``14db5484317e`` (head) | ``937cbd173ca1`` | ``2.7.0`` |
Add index on last_scheduling_decision NULLS FIRST, |
-| | | |
execution_date, state for queued dagrun |
-+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
-| ``937cbd173ca1`` | ``98ae134e6fff`` | ``2.7.0`` |
Add index to task_instance table |
+| ``937cbd173ca1`` (head) | ``98ae134e6fff`` | ``2.7.0`` |
Add index to task_instance table |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``98ae134e6fff`` | ``6abdffdd4815`` | ``2.6.0`` |
Increase length of user identifier columns in ``ab_user`` |
| | | |
and ``ab_register_user`` tables |