This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new b94c7993db0 [v3-0-test] Fix speed of 
test_rtif_deletion_stale_data_error test (#54542) (#54546)
b94c7993db0 is described below

commit b94c7993db0befbb49c1cad1d23fe00df5a19193
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Aug 15 14:07:23 2025 +0100

    [v3-0-test] Fix speed of test_rtif_deletion_stale_data_error test (#54542) 
(#54546)
    
    This test was previously creating and running 40 tasks, when all we actually
    need to do is to put the DB in the state we expect (RTIF rows exist) and 
then
    run one task. This takes the test time form 31s down to 4s.
    
    This was tested that it still correctly tests the correct behaviour by
    removing the `session.flush()` that was added in the same change where this
    test was added, #42928 / ced319f.
    (cherry picked from commit 5a411d7e2181c01ac5048431823e88ba05874387)
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 .../tests/unit/models/test_renderedtifields.py     | 39 +++++++++++++---------
 1 file changed, 23 insertions(+), 16 deletions(-)

diff --git a/airflow-core/tests/unit/models/test_renderedtifields.py 
b/airflow-core/tests/unit/models/test_renderedtifields.py
index e53582c563e..aa74710b0db 100644
--- a/airflow-core/tests/unit/models/test_renderedtifields.py
+++ b/airflow-core/tests/unit/models/test_renderedtifields.py
@@ -22,6 +22,7 @@ from __future__ import annotations
 import os
 from collections import Counter
 from datetime import date, timedelta
+from typing import TYPE_CHECKING
 from unittest import mock
 
 import pendulum
@@ -36,12 +37,16 @@ from airflow.models.taskmap import TaskMap
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.sdk import task as task_decorator
+from airflow.utils.state import TaskInstanceState
 from airflow.utils.task_instance_session import 
set_current_task_instance_session
 from airflow.utils.timezone import datetime
 
 from tests_common.test_utils.asserts import assert_queries_count
 from tests_common.test_utils.db import clear_db_dags, clear_db_runs, 
clear_rendered_ti_fields
 
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+
 pytestmark = pytest.mark.db_test
 
 
@@ -416,31 +421,33 @@ class TestRenderedTaskInstanceFields:
                 ],
             )
 
-        def run_task(date):
+        def popuate_rtif(date):
             run_id = f"abc_{date.to_date_string()}"
             dr = session.scalar(select(DagRun).where(DagRun.logical_date == 
date, DagRun.run_id == run_id))
             if not dr:
                 dr = dag_maker.create_dagrun(logical_date=date, run_id=run_id)
-            ti = dr.task_instances[0]
-            ti.state = None
-            ti.try_number += 1
-            session.commit()
-            ti.task = task
-            ti.run()
+            ti: TaskInstance = dr.task_instances[0]
+            ti.state = TaskInstanceState.SUCCESS
+
+            rtif = RTIF(ti=ti, render_templates=False, rendered_fields={"a": 
"1"})
+            session.merge(rtif)
+            session.flush()
             return dr
 
         base_date = pendulum.datetime(2021, 1, 1)
         exec_dates = [base_date.add(days=x) for x in range(40)]
-        for date_ in exec_dates:
-            run_task(date=date_)
+        for when in exec_dates:
+            popuate_rtif(date=when)
 
         session.commit()
         session.expunge_all()
 
-        # find oldest date
-        date = session.scalar(
-            
select(DagRun.logical_date).join(RTIF.dag_run).order_by(DagRun.logical_date).limit(1)
-        )
-        date = pendulum.instance(date)
-        # rerun the old date. this will fail
-        run_task(date=date)
+        # find oldest dag run
+        dr = 
session.scalar(select(DagRun).join(RTIF.dag_run).order_by(DagRun.run_after).limit(1))
+        assert dr
+        ti: TaskInstance = dr.task_instances[0]
+        ti.state = None
+        session.flush()
+        # rerun the old run. this will shouldn't fail
+        ti.task = task
+        ti.run()

Reply via email to