This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 903dd8d07f Revert wrong migration revertion and revert the right one 
(#31429)
903dd8d07f is described below

commit 903dd8d07fb29ed96584fb6d1438dacb8e5d76f1
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri May 19 20:03:12 2023 +0200

    Revert wrong migration revertion and revert the right one (#31429)
    
    * Revert "Revert "Save scheduler execution time by caching dags (#30704)" 
(#31413)"
    
    This reverts commit e6f21174ab42092b4ccc9f264627aa43b48b5a02.
    
    * Revert "Save scheduler execution time by adding new Index idea for 
dag_run (#30827)"
    
    This reverts commit c63b7774cdba29394ec746b381f45e816dcb0830.
---
 airflow/jobs/scheduler_job_runner.py               | 18 +++++--
 ...2_7_0_add_index_on_last_scheduling_decision_.py | 56 ----------------------
 airflow/models/dagrun.py                           |  9 ----
 docs/apache-airflow/img/airflow_erd.sha256         |  2 +-
 docs/apache-airflow/img/airflow_erd.svg            |  4 +-
 docs/apache-airflow/migrations-ref.rst             |  5 +-
 6 files changed, 19 insertions(+), 75 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 128a82d7ee..775ac759ff 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -28,8 +28,9 @@ import warnings
 from collections import Counter
 from dataclasses import dataclass
 from datetime import datetime, timedelta
+from functools import lru_cache, partial
 from pathlib import Path
-from typing import TYPE_CHECKING, Any, Collection, Iterable, Iterator
+from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
 
 from sqlalchemy import and_, func, not_, or_, text
 from sqlalchemy.exc import OperationalError
@@ -1052,8 +1053,13 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
             callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, 
session)
 
         # Send the callbacks after we commit to ensure the context is up to 
date when it gets run
+        # cache saves time during scheduling of many dag_runs for same dag
+        cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
+            partial(self.dagbag.get_dag, session=session)
+        )
         for dag_run, callback_to_run in callback_tuples:
-            dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
+            dag = cached_get_dag(dag_run.dag_id)
+
             if not dag:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_run.dag_id)
                 continue
@@ -1317,8 +1323,14 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
                     tags={"dag_id": dag.dag_id},
                 )
 
+        # cache saves time during scheduling of many dag_runs for same dag
+        cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
+            partial(self.dagbag.get_dag, session=session)
+        )
+
         for dag_run in dag_runs:
-            dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
+            dag = dag_run.dag = cached_get_dag(dag_run.dag_id)
+
             if not dag:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_run.dag_id)
                 continue
diff --git 
a/airflow/migrations/versions/0126_2_7_0_add_index_on_last_scheduling_decision_.py
 
b/airflow/migrations/versions/0126_2_7_0_add_index_on_last_scheduling_decision_.py
deleted file mode 100644
index 6f94ce27dd..0000000000
--- 
a/airflow/migrations/versions/0126_2_7_0_add_index_on_last_scheduling_decision_.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Add index on last_scheduling_decision NULLS FIRST, execution_date, state 
for queued dagrun
-
-Revision ID: 14db5484317e
-Revises: 937cbd173ca1
-Create Date: 2023-05-14 21:16:42.399167
-
-"""
-from __future__ import annotations
-
-from alembic import op
-from sqlalchemy import text
-
-# revision identifiers, used by Alembic.
-revision = "14db5484317e"
-down_revision = "937cbd173ca1"
-branch_labels = None
-depends_on = None
-airflow_version = "2.7.0"
-
-
-def upgrade():
-    """Apply Add index on last_scheduling_decision NULLS FIRST, 
execution_date, state for queued dagrun"""
-    conn = op.get_bind()
-    if conn.dialect.name == "postgresql":
-        with op.batch_alter_table("dag_run") as batch_op:
-            batch_op.create_index(
-                "idx_last_scheduling_decision_queued",
-                [text("last_scheduling_decision NULLS FIRST"), 
"execution_date", "state"],
-                postgresql_where=text("state='queued'"),
-            )
-
-
-def downgrade():
-    """Unapply Add index on last_scheduling_decision NULLS FIRST, 
execution_date, state for queued dagrun"""
-    conn = op.get_bind()
-    if conn.dialect.name == "postgresql":
-        with op.batch_alter_table("dag_run") as batch_op:
-            batch_op.drop_index("idx_last_scheduling_decision_queued")
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index dba6fbf748..42845b34bd 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -144,15 +144,6 @@ class DagRun(Base, LoggingMixin):
         UniqueConstraint("dag_id", "execution_date", 
name="dag_run_dag_id_execution_date_key"),
         UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
         Index("idx_last_scheduling_decision", last_scheduling_decision),
-        Index(
-            "idx_last_scheduling_decision_queued",
-            # Not possible to add .nulls_first(), because only postgresql can 
handle Index like that.
-            # Migration script which contains postgres dialect check adds 
NULLS FIST to index.
-            last_scheduling_decision,
-            execution_date,
-            _state,
-            postgresql_where=text("state='queued'"),
-        ),
         Index("idx_dag_run_dag_id", dag_id),
         Index(
             "idx_dag_run_running_dags",
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index 8cde30d727..1f2c2c1f34 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-811b1c45f8fa985feacacffafc30c82a6049bb33948c33bb218c13c48f971097
\ No newline at end of file
+4987842fd67d29e194f1117e127d3291ba60d3fbc3e81cba75ce93884c263321
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index b531759ac4..8439c226f8 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1225,7 +1225,7 @@
 <g id="edge48" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1137.01,-488.65C1161.12,-465.83 1186.07,-443.06 1210,-422 1216.27,-416.48 
1222.72,-410.89 1229.27,-405.29"/>
-<text text-anchor="start" x="1198.27" y="-394.09" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1219.27" y="-394.09" font-family="Times,serif" 
font-size="14.00">1</text>
 <text text-anchor="start" x="1137.01" y="-477.45" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
@@ -1239,7 +1239,7 @@
 <g id="edge50" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1137.01,-524.15C1161.12,-501.83 1186.07,-479.06 1210,-458 1216.66,-452.14 
1223.53,-446.19 1230.5,-440.21"/>
-<text text-anchor="start" x="1220.5" y="-444.01" font-family="Times,serif" 
font-size="14.00">1</text>
+<text text-anchor="start" x="1199.5" y="-444.01" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="1137.01" y="-512.95" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index c29c0fbeb8..2fdc682025 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,10 +39,7 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | 
Description                                                  |
 
+=================================+===================+===================+==============================================================+
-| ``14db5484317e`` (head)         | ``937cbd173ca1``  | ``2.7.0``         | 
Add index on last_scheduling_decision NULLS FIRST,           |
-|                                 |                   |                   | 
execution_date, state for queued dagrun                      |
-+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
-| ``937cbd173ca1``                | ``98ae134e6fff``  | ``2.7.0``         | 
Add index to task_instance table                             |
+| ``937cbd173ca1`` (head)         | ``98ae134e6fff``  | ``2.7.0``         | 
Add index to task_instance table                             |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``98ae134e6fff``                | ``6abdffdd4815``  | ``2.6.0``         | 
Increase length of user identifier columns in ``ab_user``    |
 |                                 |                   |                   | 
and ``ab_register_user`` tables                              |

Reply via email to