This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 26d7919387d Move test of DagRun.update_state to better place (#42845)
26d7919387d is described below

commit 26d7919387d960aca91601733adbb54c1f050d16
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Oct 9 08:15:37 2024 -0700

    Move test of DagRun.update_state to better place (#42845)
    
    Previously this lived in test_scheduler_job.py
    
    It only really tested the behavior of DagRun.update_state.
    
    As far as I can tell, it checks that if you null out the state on a TI of a 
finished dag, and then you call ``update_state``, then the DR will be set to 
running.
---
 tests/jobs/test_scheduler_job.py | 35 -----------------------------------
 tests/models/test_dagrun.py      | 37 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d26369f5d72..bc2be0a12c7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2950,41 +2950,6 @@ class TestSchedulerJob:
                 dagrun_state=State.FAILED,
             )
 
-    def test_dagrun_root_fail_unfinished(self):
-        """
-        DagRuns with one unfinished and one failed root task -> RUNNING
-        """
-        # TODO: this should live in test_dagrun.py
-        # Run both the failed and successful tasks
-        dag_id = "test_dagrun_states_root_fail_unfinished"
-        dag = self.dagbag.get_dag(dag_id)
-        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=DEFAULT_DATE,
-            state=None,
-            data_interval=data_interval,
-            **triggered_by_kwargs,
-        )
-        self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)
-
-        # todo: AIP-78 remove this test along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-        with pytest.warns(RemovedInAirflow3Warning):
-            for _ in _mock_executor(self.null_exec):
-                with pytest.raises(AirflowException):
-                    dag.run(start_date=dr.execution_date, 
end_date=dr.execution_date)
-
-        # Mark the successful task as never having run since we want to see if 
the
-        # dagrun will be in a running state despite having an unfinished task.
-        with create_session() as session:
-            ti = dr.get_task_instance("test_dagrun_unfinished", 
session=session)
-            ti.state = State.NONE
-            session.commit()
-        dr.update_state()
-        assert dr.state == State.RUNNING
-
     def test_dagrun_root_after_dagrun_unfinished(self, mock_executor):
         """
         DagRuns with one successful and one future root task -> SUCCESS
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 9184f561b3d..dac2982b0ba 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -37,7 +37,7 @@ from airflow.models.taskinstance import TaskInstance, 
TaskInstanceNote, clear_ta
 from airflow.models.taskmap import TaskMap
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import ShortCircuitOperator
+from airflow.operators.python import PythonOperator, ShortCircuitOperator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.stats import Stats
@@ -1050,6 +1050,41 @@ class TestDagRun:
         assert ti_success.state in State.success_states
         assert ti_failed.state in State.failed_states
 
+    def test_update_state_one_unfinished(self, dag_maker, session):
+        """
+        Previously this lived in test_scheduler_job.py
+
+        It only really tested the behavior of DagRun.update_state.
+
+        As far as I can tell, it checks that if you null out the state on a TI 
of a finished dag,
+        and then you call ``update_state``, then the DR will be set to running.
+        """
+        with dag_maker(session=session) as dag:
+            PythonOperator(task_id="t1", python_callable=lambda: print)
+            PythonOperator(task_id="t2", python_callable=lambda: print)
+        dr = dag.create_dagrun(
+            state=DagRunState.FAILED,
+            triggered_by=DagRunTriggeredByType.TEST,
+            run_id="abc123",
+            session=session,
+        )
+        for ti in dr.get_task_instances(session=session):
+            ti.state = TaskInstanceState.FAILED
+        session.commit()
+        session.expunge_all()
+        dr = session.get(DagRun, dr.id)
+        assert dr.state == DagRunState.FAILED
+        ti = dr.get_task_instance("t1", session=session)
+        ti.state = State.NONE
+        session.commit()
+        dr = session.get(DagRun, dr.id)
+        assert dr.state == DagRunState.FAILED
+        dr.dag = dag
+        dr.update_state(session=session)
+        session.commit()
+        dr = session.get(DagRun, dr.id)
+        assert dr.state == State.RUNNING
+
 
 @pytest.mark.parametrize(
     ("run_type", "expected_tis"),

Reply via email to