This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d93f965d84 Resolve backfill job deprecations in tests (#39961)
d93f965d84 is described below
commit d93f965d84c28de1c22b105b337a8cd25d27a119
Author: Gopal Dirisala <[email protected]>
AuthorDate: Sat Jun 1 10:11:23 2024 +0530
Resolve backfill job deprecations in tests (#39961)
---
tests/deprecations_ignore.yml | 11 --------
tests/jobs/test_backfill_job.py | 56 ++++++++++++++++++++---------------------
2 files changed, 28 insertions(+), 39 deletions(-)
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index a2dbf9c9cb..0bff1b3580 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -46,17 +46,6 @@
- tests/core/test_stats.py::TestStats::test_load_allow_list_validator
- tests/core/test_stats.py::TestStats::test_load_block_list_validator
- tests/core/test_stats.py::TestStats::test_load_custom_statsd_client
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_depends_on_past_backwards
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past
-- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_pooled_tasks
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_failed_tasks
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_failed_tasks_without_flag
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_upstream_failed_tasks
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_run_rescheduled
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_skip_active_scheduled_dagrun
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_reset_orphaned_tasks_with_orphans
--
tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear
-- tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters
- tests/models/test_cleartasks.py::TestClearTasks::test_dags_clear
-
tests/models/test_dag.py::TestDag::test_bulk_write_to_db_interval_save_runtime
- tests/models/test_dag.py::TestDag::test_bulk_write_to_db_max_active_runs
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 210d62dda4..7242986cc0 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -71,6 +71,7 @@ pytestmark = pytest.mark.db_test
logger = logging.getLogger(__name__)
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+DEFAULT_DAG_RUN_ID = "test1"
@pytest.fixture(scope="module")
@@ -676,7 +677,7 @@ class TestBackfillJob:
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_run_rescheduled",
task_id="test_backfill_run_rescheduled_task-1"
)
- dag_maker.create_dagrun(state=None)
+ dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
executor = MockExecutor()
@@ -689,7 +690,7 @@ class TestBackfillJob:
)
run_job(job=job, execute_callable=job_runner._execute)
- ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"),
execution_date=DEFAULT_DATE)
+ ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.UP_FOR_RESCHEDULE)
@@ -702,7 +703,7 @@ class TestBackfillJob:
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
- ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"),
execution_date=DEFAULT_DATE)
+ ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
@@ -741,10 +742,7 @@ class TestBackfillJob:
dag_id="test_backfill_skip_active_scheduled_dagrun",
task_id="test_backfill_skip_active_scheduled_dagrun-1",
)
- dag_maker.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- state=State.RUNNING,
- )
+ dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.RUNNING, run_id=DEFAULT_DAG_RUN_ID)
executor = MockExecutor()
@@ -760,9 +758,7 @@ class TestBackfillJob:
run_job(job=job, execute_callable=job_runner._execute)
assert "Backfill cannot be created for DagRun" in
caplog.messages[0]
- ti = TI(
- task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"),
execution_date=DEFAULT_DATE
- )
+ ti =
TI(task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
# since DAG backfill is skipped, task state should be none
assert ti.state == State.NONE
@@ -771,7 +767,7 @@ class TestBackfillJob:
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_rerun_failed",
task_id="test_backfill_rerun_failed_task-1"
)
- dag_maker.create_dagrun(state=None)
+ dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
executor = MockExecutor()
@@ -784,7 +780,7 @@ class TestBackfillJob:
)
run_job(job=job, execute_callable=job_runner._execute)
- ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"),
execution_date=DEFAULT_DATE)
+ ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.FAILED)
@@ -797,7 +793,7 @@ class TestBackfillJob:
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
- ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"),
execution_date=DEFAULT_DATE)
+ ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
@@ -806,7 +802,7 @@ class TestBackfillJob:
op1 =
EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-1")
op2 =
EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-2")
op1.set_upstream(op2)
- dag_maker.create_dagrun(state=None)
+ dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
executor = MockExecutor()
@@ -819,7 +815,7 @@ class TestBackfillJob:
)
run_job(job=job, execute_callable=job_runner._execute)
- ti =
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"),
execution_date=DEFAULT_DATE)
+ ti =
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.UPSTREAM_FAILED)
@@ -832,7 +828,7 @@ class TestBackfillJob:
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
- ti =
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"),
execution_date=DEFAULT_DATE)
+ ti =
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
@@ -840,7 +836,7 @@ class TestBackfillJob:
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_rerun_failed",
task_id="test_backfill_rerun_failed_task-1"
)
- dag_maker.create_dagrun(state=None)
+ dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
executor = MockExecutor()
@@ -853,7 +849,7 @@ class TestBackfillJob:
)
run_job(job=job, execute_callable=job_runner._execute)
- ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"),
execution_date=DEFAULT_DATE)
+ ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"),
run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.FAILED)
@@ -1012,7 +1008,8 @@ class TestBackfillJob:
run_job(job=job, execute_callable=job_runner._execute)
except AirflowTaskTimeout:
pass
- ti = TI(task=dag.get_task("test_backfill_pooled_task"),
execution_date=DEFAULT_DATE)
+ run_id = f"backfill__{DEFAULT_DATE.isoformat()}"
+ ti = TI(task=dag.get_task("test_backfill_pooled_task"), run_id=run_id)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
@@ -1034,8 +1031,9 @@ class TestBackfillJob:
)
run_job(job=job, execute_callable=job_runner._execute)
+ run_id = f"backfill__{run_date.isoformat()}"
# ti should have succeeded
- ti = TI(dag.tasks[0], run_date)
+ ti = TI(dag.tasks[0], run_id=run_id)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
@@ -1058,7 +1056,8 @@ class TestBackfillJob:
job_runner = BackfillJobRunner(job=job, dag=dag,
ignore_first_depends_on_past=True, **kwargs)
run_job(job=job, execute_callable=job_runner._execute)
- ti = TI(dag.get_task("test_dop_task"), end_date)
+ run_id = f"backfill__{end_date.isoformat()}"
+ ti = TI(dag.get_task("test_dop_task"), run_id=run_id)
ti.refresh_from_db()
# runs fine forwards
assert ti.state == State.SUCCESS
@@ -1441,15 +1440,16 @@ class TestBackfillJob:
with timeout(seconds=30):
run_job(job=job, execute_callable=job_runner._execute)
- ti_subdag = TI(task=dag.get_task("daily_job"),
execution_date=DEFAULT_DATE)
+ run_id = f"backfill__{DEFAULT_DATE.isoformat()}"
+ ti_subdag = TI(task=dag.get_task("daily_job"), run_id=run_id)
ti_subdag.refresh_from_db()
assert ti_subdag.state == State.SUCCESS
- ti_irrelevant = TI(task=dag.get_task("daily_job_irrelevant"),
execution_date=DEFAULT_DATE)
+ ti_irrelevant = TI(task=dag.get_task("daily_job_irrelevant"),
run_id=run_id)
ti_irrelevant.refresh_from_db()
assert ti_irrelevant.state == State.SUCCESS
- ti_downstream = TI(task=dag.get_task("daily_job_downstream"),
execution_date=DEFAULT_DATE)
+ ti_downstream = TI(task=dag.get_task("daily_job_downstream"),
run_id=run_id)
ti_downstream.refresh_from_db()
assert ti_downstream.state == State.SUCCESS
@@ -1530,7 +1530,7 @@ class TestBackfillJob:
dr = dag_maker.create_dagrun(state=None)
job = Job()
job_runner = BackfillJobRunner(job=job, dag=dag)
- ti = TI(task1, dr.execution_date)
+ ti = TI(task1, run_id=dr.run_id)
ti.refresh_from_db()
ti_status = BackfillJobRunner._DagRunTaskStatus()
@@ -1742,15 +1742,15 @@ class TestBackfillJob:
job = Job()
job_runner = BackfillJobRunner(job=job, dag=dag)
# create dagruns
- dr1 = dag_maker.create_dagrun(state=State.RUNNING)
+ dr1 = dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID,
state=State.RUNNING)
dr2 = dag.create_dagrun(run_id="test2", state=State.SUCCESS)
# create taskinstances and set states
dr1_tis = []
dr2_tis = []
for task, state in zip(tasks, states):
- ti1 = TI(task, dr1.execution_date)
- ti2 = TI(task, dr2.execution_date)
+ ti1 = TI(task, run_id=dr1.run_id)
+ ti2 = TI(task, run_id=dr2.run_id)
ti1.refresh_from_db()
ti2.refresh_from_db()
ti1.state = state