This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d93f965d84 Resolve backfill job  deprecations in tests (#39961)
d93f965d84 is described below

commit d93f965d84c28de1c22b105b337a8cd25d27a119
Author: Gopal Dirisala <[email protected]>
AuthorDate: Sat Jun 1 10:11:23 2024 +0530

    Resolve backfill job  deprecations in tests (#39961)
---
 tests/deprecations_ignore.yml   | 11 --------
 tests/jobs/test_backfill_job.py | 56 ++++++++++++++++++++---------------------
 2 files changed, 28 insertions(+), 39 deletions(-)

diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index a2dbf9c9cb..0bff1b3580 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -46,17 +46,6 @@
 - tests/core/test_stats.py::TestStats::test_load_allow_list_validator
 - tests/core/test_stats.py::TestStats::test_load_block_list_validator
 - tests/core/test_stats.py::TestStats::test_load_custom_statsd_client
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_depends_on_past_backwards
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past
-- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_pooled_tasks
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_failed_tasks
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_failed_tasks_without_flag
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_upstream_failed_tasks
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_run_rescheduled
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_skip_active_scheduled_dagrun
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_reset_orphaned_tasks_with_orphans
-- 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear
-- tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters
 - tests/models/test_cleartasks.py::TestClearTasks::test_dags_clear
 - 
tests/models/test_dag.py::TestDag::test_bulk_write_to_db_interval_save_runtime
 - tests/models/test_dag.py::TestDag::test_bulk_write_to_db_max_active_runs
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 210d62dda4..7242986cc0 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -71,6 +71,7 @@ pytestmark = pytest.mark.db_test
 logger = logging.getLogger(__name__)
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+DEFAULT_DAG_RUN_ID = "test1"
 
 
 @pytest.fixture(scope="module")
@@ -676,7 +677,7 @@ class TestBackfillJob:
         dag = self._get_dummy_dag(
             dag_maker, dag_id="test_backfill_run_rescheduled", 
task_id="test_backfill_run_rescheduled_task-1"
         )
-        dag_maker.create_dagrun(state=None)
+        dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
 
         executor = MockExecutor()
 
@@ -689,7 +690,7 @@ class TestBackfillJob:
         )
         run_job(job=job, execute_callable=job_runner._execute)
 
-        ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), 
execution_date=DEFAULT_DATE)
+        ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         ti.set_state(State.UP_FOR_RESCHEDULE)
 
@@ -702,7 +703,7 @@ class TestBackfillJob:
             rerun_failed_tasks=True,
         )
         run_job(job=job, execute_callable=job_runner._execute)
-        ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), 
execution_date=DEFAULT_DATE)
+        ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
@@ -741,10 +742,7 @@ class TestBackfillJob:
             dag_id="test_backfill_skip_active_scheduled_dagrun",
             task_id="test_backfill_skip_active_scheduled_dagrun-1",
         )
-        dag_maker.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
-        )
+        dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.RUNNING, run_id=DEFAULT_DAG_RUN_ID)
 
         executor = MockExecutor()
 
@@ -760,9 +758,7 @@ class TestBackfillJob:
             run_job(job=job, execute_callable=job_runner._execute)
             assert "Backfill cannot be created for DagRun" in 
caplog.messages[0]
 
-        ti = TI(
-            task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"), 
execution_date=DEFAULT_DATE
-        )
+        ti = 
TI(task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         # since DAG backfill is skipped, task state should be none
         assert ti.state == State.NONE
@@ -771,7 +767,7 @@ class TestBackfillJob:
         dag = self._get_dummy_dag(
             dag_maker, dag_id="test_backfill_rerun_failed", 
task_id="test_backfill_rerun_failed_task-1"
         )
-        dag_maker.create_dagrun(state=None)
+        dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
 
         executor = MockExecutor()
 
@@ -784,7 +780,7 @@ class TestBackfillJob:
         )
         run_job(job=job, execute_callable=job_runner._execute)
 
-        ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), 
execution_date=DEFAULT_DATE)
+        ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         ti.set_state(State.FAILED)
 
@@ -797,7 +793,7 @@ class TestBackfillJob:
             rerun_failed_tasks=True,
         )
         run_job(job=job, execute_callable=job_runner._execute)
-        ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), 
execution_date=DEFAULT_DATE)
+        ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
@@ -806,7 +802,7 @@ class TestBackfillJob:
             op1 = 
EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-1")
             op2 = 
EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-2")
             op1.set_upstream(op2)
-        dag_maker.create_dagrun(state=None)
+        dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
 
         executor = MockExecutor()
 
@@ -819,7 +815,7 @@ class TestBackfillJob:
         )
         run_job(job=job, execute_callable=job_runner._execute)
 
-        ti = 
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), 
execution_date=DEFAULT_DATE)
+        ti = 
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         ti.set_state(State.UPSTREAM_FAILED)
 
@@ -832,7 +828,7 @@ class TestBackfillJob:
             rerun_failed_tasks=True,
         )
         run_job(job=job, execute_callable=job_runner._execute)
-        ti = 
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), 
execution_date=DEFAULT_DATE)
+        ti = 
TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
@@ -840,7 +836,7 @@ class TestBackfillJob:
         dag = self._get_dummy_dag(
             dag_maker, dag_id="test_backfill_rerun_failed", 
task_id="test_backfill_rerun_failed_task-1"
         )
-        dag_maker.create_dagrun(state=None)
+        dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)
 
         executor = MockExecutor()
 
@@ -853,7 +849,7 @@ class TestBackfillJob:
         )
         run_job(job=job, execute_callable=job_runner._execute)
 
-        ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), 
execution_date=DEFAULT_DATE)
+        ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), 
run_id=DEFAULT_DAG_RUN_ID)
         ti.refresh_from_db()
         ti.set_state(State.FAILED)
 
@@ -1012,7 +1008,8 @@ class TestBackfillJob:
                 run_job(job=job, execute_callable=job_runner._execute)
         except AirflowTaskTimeout:
             pass
-        ti = TI(task=dag.get_task("test_backfill_pooled_task"), 
execution_date=DEFAULT_DATE)
+        run_id = f"backfill__{DEFAULT_DATE.isoformat()}"
+        ti = TI(task=dag.get_task("test_backfill_pooled_task"), run_id=run_id)
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
@@ -1034,8 +1031,9 @@ class TestBackfillJob:
         )
         run_job(job=job, execute_callable=job_runner._execute)
 
+        run_id = f"backfill__{run_date.isoformat()}"
         # ti should have succeeded
-        ti = TI(dag.tasks[0], run_date)
+        ti = TI(dag.tasks[0], run_id=run_id)
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
@@ -1058,7 +1056,8 @@ class TestBackfillJob:
         job_runner = BackfillJobRunner(job=job, dag=dag, 
ignore_first_depends_on_past=True, **kwargs)
         run_job(job=job, execute_callable=job_runner._execute)
 
-        ti = TI(dag.get_task("test_dop_task"), end_date)
+        run_id = f"backfill__{end_date.isoformat()}"
+        ti = TI(dag.get_task("test_dop_task"), run_id=run_id)
         ti.refresh_from_db()
         # runs fine forwards
         assert ti.state == State.SUCCESS
@@ -1441,15 +1440,16 @@ class TestBackfillJob:
         with timeout(seconds=30):
             run_job(job=job, execute_callable=job_runner._execute)
 
-        ti_subdag = TI(task=dag.get_task("daily_job"), 
execution_date=DEFAULT_DATE)
+        run_id = f"backfill__{DEFAULT_DATE.isoformat()}"
+        ti_subdag = TI(task=dag.get_task("daily_job"), run_id=run_id)
         ti_subdag.refresh_from_db()
         assert ti_subdag.state == State.SUCCESS
 
-        ti_irrelevant = TI(task=dag.get_task("daily_job_irrelevant"), 
execution_date=DEFAULT_DATE)
+        ti_irrelevant = TI(task=dag.get_task("daily_job_irrelevant"), 
run_id=run_id)
         ti_irrelevant.refresh_from_db()
         assert ti_irrelevant.state == State.SUCCESS
 
-        ti_downstream = TI(task=dag.get_task("daily_job_downstream"), 
execution_date=DEFAULT_DATE)
+        ti_downstream = TI(task=dag.get_task("daily_job_downstream"), 
run_id=run_id)
         ti_downstream.refresh_from_db()
         assert ti_downstream.state == State.SUCCESS
 
@@ -1530,7 +1530,7 @@ class TestBackfillJob:
         dr = dag_maker.create_dagrun(state=None)
         job = Job()
         job_runner = BackfillJobRunner(job=job, dag=dag)
-        ti = TI(task1, dr.execution_date)
+        ti = TI(task1, run_id=dr.run_id)
         ti.refresh_from_db()
 
         ti_status = BackfillJobRunner._DagRunTaskStatus()
@@ -1742,15 +1742,15 @@ class TestBackfillJob:
         job = Job()
         job_runner = BackfillJobRunner(job=job, dag=dag)
         # create dagruns
-        dr1 = dag_maker.create_dagrun(state=State.RUNNING)
+        dr1 = dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID, 
state=State.RUNNING)
         dr2 = dag.create_dagrun(run_id="test2", state=State.SUCCESS)
 
         # create taskinstances and set states
         dr1_tis = []
         dr2_tis = []
         for task, state in zip(tasks, states):
-            ti1 = TI(task, dr1.execution_date)
-            ti2 = TI(task, dr2.execution_date)
+            ti1 = TI(task, run_id=dr1.run_id)
+            ti2 = TI(task, run_id=dr2.run_id)
             ti1.refresh_from_db()
             ti2.refresh_from_db()
             ti1.state = state

Reply via email to