This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new b94c7993db0 [v3-0-test] Fix speed of
test_rtif_deletion_stale_data_error test (#54542) (#54546)
b94c7993db0 is described below
commit b94c7993db0befbb49c1cad1d23fe00df5a19193
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Aug 15 14:07:23 2025 +0100
[v3-0-test] Fix speed of test_rtif_deletion_stale_data_error test (#54542)
(#54546)
This test was previously creating and running 40 tasks, when all we actually
need to do is to put the DB in the state we expect (RTIF rows exist) and
then
run one task. This takes the test time form 31s down to 4s.
This was tested that it still correctly tests the correct behaviour by
removing the `session.flush()` that was added in the same change where this
test was added, #42928 / ced319f.
(cherry picked from commit 5a411d7e2181c01ac5048431823e88ba05874387)
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
.../tests/unit/models/test_renderedtifields.py | 39 +++++++++++++---------
1 file changed, 23 insertions(+), 16 deletions(-)
diff --git a/airflow-core/tests/unit/models/test_renderedtifields.py
b/airflow-core/tests/unit/models/test_renderedtifields.py
index e53582c563e..aa74710b0db 100644
--- a/airflow-core/tests/unit/models/test_renderedtifields.py
+++ b/airflow-core/tests/unit/models/test_renderedtifields.py
@@ -22,6 +22,7 @@ from __future__ import annotations
import os
from collections import Counter
from datetime import date, timedelta
+from typing import TYPE_CHECKING
from unittest import mock
import pendulum
@@ -36,12 +37,16 @@ from airflow.models.taskmap import TaskMap
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import task as task_decorator
+from airflow.utils.state import TaskInstanceState
from airflow.utils.task_instance_session import
set_current_task_instance_session
from airflow.utils.timezone import datetime
from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.db import clear_db_dags, clear_db_runs,
clear_rendered_ti_fields
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+
pytestmark = pytest.mark.db_test
@@ -416,31 +421,33 @@ class TestRenderedTaskInstanceFields:
],
)
- def run_task(date):
+ def popuate_rtif(date):
run_id = f"abc_{date.to_date_string()}"
dr = session.scalar(select(DagRun).where(DagRun.logical_date ==
date, DagRun.run_id == run_id))
if not dr:
dr = dag_maker.create_dagrun(logical_date=date, run_id=run_id)
- ti = dr.task_instances[0]
- ti.state = None
- ti.try_number += 1
- session.commit()
- ti.task = task
- ti.run()
+ ti: TaskInstance = dr.task_instances[0]
+ ti.state = TaskInstanceState.SUCCESS
+
+ rtif = RTIF(ti=ti, render_templates=False, rendered_fields={"a":
"1"})
+ session.merge(rtif)
+ session.flush()
return dr
base_date = pendulum.datetime(2021, 1, 1)
exec_dates = [base_date.add(days=x) for x in range(40)]
- for date_ in exec_dates:
- run_task(date=date_)
+ for when in exec_dates:
+ popuate_rtif(date=when)
session.commit()
session.expunge_all()
- # find oldest date
- date = session.scalar(
-
select(DagRun.logical_date).join(RTIF.dag_run).order_by(DagRun.logical_date).limit(1)
- )
- date = pendulum.instance(date)
- # rerun the old date. this will fail
- run_task(date=date)
+ # find oldest dag run
+ dr =
session.scalar(select(DagRun).join(RTIF.dag_run).order_by(DagRun.run_after).limit(1))
+ assert dr
+ ti: TaskInstance = dr.task_instances[0]
+ ti.state = None
+ session.flush()
+ # rerun the old run. this will shouldn't fail
+ ti.task = task
+ ti.run()