This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 26d7919387d Move test of DagRun.update_state to better place (#42845)
26d7919387d is described below
commit 26d7919387d960aca91601733adbb54c1f050d16
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Oct 9 08:15:37 2024 -0700
Move test of DagRun.update_state to better place (#42845)
Previously this lived in test_scheduler_job.py
It only really tested the behavior of DagRun.update_state.
As far as I can tell, it checks that if you null out the state on a TI of a
finished dag, and then you call ``update_state``, then the DR will be set to
running.
---
tests/jobs/test_scheduler_job.py | 35 -----------------------------------
tests/models/test_dagrun.py | 37 ++++++++++++++++++++++++++++++++++++-
2 files changed, 36 insertions(+), 36 deletions(-)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d26369f5d72..bc2be0a12c7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2950,41 +2950,6 @@ class TestSchedulerJob:
dagrun_state=State.FAILED,
)
- def test_dagrun_root_fail_unfinished(self):
- """
- DagRuns with one unfinished and one failed root task -> RUNNING
- """
- # TODO: this should live in test_dagrun.py
- # Run both the failed and successful tasks
- dag_id = "test_dagrun_states_root_fail_unfinished"
- dag = self.dagbag.get_dag(dag_id)
- data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if
AIRFLOW_V_3_0_PLUS else {}
- dr = dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- execution_date=DEFAULT_DATE,
- state=None,
- data_interval=data_interval,
- **triggered_by_kwargs,
- )
- self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)
-
- # todo: AIP-78 remove this test along with DAG.run()
- # this only tests the backfill job runner, not the scheduler
- with pytest.warns(RemovedInAirflow3Warning):
- for _ in _mock_executor(self.null_exec):
- with pytest.raises(AirflowException):
- dag.run(start_date=dr.execution_date,
end_date=dr.execution_date)
-
- # Mark the successful task as never having run since we want to see if
the
- # dagrun will be in a running state despite having an unfinished task.
- with create_session() as session:
- ti = dr.get_task_instance("test_dagrun_unfinished",
session=session)
- ti.state = State.NONE
- session.commit()
- dr.update_state()
- assert dr.state == State.RUNNING
-
def test_dagrun_root_after_dagrun_unfinished(self, mock_executor):
"""
DagRuns with one successful and one future root task -> SUCCESS
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 9184f561b3d..dac2982b0ba 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -37,7 +37,7 @@ from airflow.models.taskinstance import TaskInstance,
TaskInstanceNote, clear_ta
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import ShortCircuitOperator
+from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.stats import Stats
@@ -1050,6 +1050,41 @@ class TestDagRun:
assert ti_success.state in State.success_states
assert ti_failed.state in State.failed_states
+ def test_update_state_one_unfinished(self, dag_maker, session):
+ """
+ Previously this lived in test_scheduler_job.py
+
+ It only really tested the behavior of DagRun.update_state.
+
+ As far as I can tell, it checks that if you null out the state on a TI
of a finished dag,
+ and then you call ``update_state``, then the DR will be set to running.
+ """
+ with dag_maker(session=session) as dag:
+ PythonOperator(task_id="t1", python_callable=lambda: print)
+ PythonOperator(task_id="t2", python_callable=lambda: print)
+ dr = dag.create_dagrun(
+ state=DagRunState.FAILED,
+ triggered_by=DagRunTriggeredByType.TEST,
+ run_id="abc123",
+ session=session,
+ )
+ for ti in dr.get_task_instances(session=session):
+ ti.state = TaskInstanceState.FAILED
+ session.commit()
+ session.expunge_all()
+ dr = session.get(DagRun, dr.id)
+ assert dr.state == DagRunState.FAILED
+ ti = dr.get_task_instance("t1", session=session)
+ ti.state = State.NONE
+ session.commit()
+ dr = session.get(DagRun, dr.id)
+ assert dr.state == DagRunState.FAILED
+ dr.dag = dag
+ dr.update_state(session=session)
+ session.commit()
+ dr = session.get(DagRun, dr.id)
+ assert dr.state == State.RUNNING
+
@pytest.mark.parametrize(
("run_type", "expected_tis"),