This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b3a8bfaaee Resolve testSchedulerJob internal warning (#39090)
b3a8bfaaee is described below
commit b3a8bfaaee72d6e90cdc29f1b559e06df81a320a
Author: Owen Leung <[email protected]>
AuthorDate: Sat May 4 22:53:38 2024 +0800
Resolve testSchedulerJob internal warning (#39090)
* Resolve testSchedulerJob internal warning
* Fix test_start_queued_dagruns_do_follow_execution_date_order
* raise warnings when concurrency param is used
* replace concurrency with max_active_tasks, remove the deprecation
handling in conftest.py
* Fix the extra new line
---
tests/deprecations_ignore.yml | 16 ----------
tests/jobs/test_scheduler_job.py | 67 +++++++++++++++++++++++++++-------------
2 files changed, 45 insertions(+), 38 deletions(-)
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 131ea00c67..f1fafcca73 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -79,22 +79,6 @@
-
tests/jobs/test_backfill_job.py::TestBackfillJob::test_reset_orphaned_tasks_with_orphans
-
tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_adopt_or_reset_orphaned_tasks
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_fail
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_root_fail
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_root_fail_unfinished
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_success
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_do_schedule_max_active_runs_dag_timed_out
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_find_zombies
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_infinite_pool
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_no_dagruns_would_stuck_in_running
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_not_enough_pool_slots
--
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_start_queued_dagruns_do_follow_execution_date_order
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_zombie_message
-
tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_fallback_task
-
tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_not_file_task
-
tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_old_file_task
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 389172bed1..f122f12265 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -94,6 +94,7 @@ ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER,
"elastic_dag.py")
TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
TRY_NUMBER = 1
@@ -967,7 +968,7 @@ class TestSchedulerJob:
def test_infinite_pool(self, dag_maker):
dag_id = "SchedulerJobTest.test_infinite_pool"
- with dag_maker(dag_id=dag_id, concurrency=16):
+ with dag_maker(dag_id=dag_id, max_active_tasks=16):
EmptyOperator(task_id="dummy", pool="infinite_pool")
scheduler_job = Job()
@@ -994,7 +995,7 @@ class TestSchedulerJob:
def test_not_enough_pool_slots(self, caplog, dag_maker):
dag_id = "SchedulerJobTest.test_test_not_enough_pool_slots"
- with dag_maker(dag_id=dag_id, concurrency=16):
+ with dag_maker(dag_id=dag_id, max_active_tasks=16):
EmptyOperator(task_id="cannot_run", pool="some_pool", pool_slots=4)
EmptyOperator(task_id="can_run", pool="some_pool", pool_slots=1)
@@ -1704,6 +1705,7 @@ class TestSchedulerJob:
with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag:
op1 = EmptyOperator(task_id="op1")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag_maker.create_dagrun()
dr2 = dag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
@@ -1711,6 +1713,7 @@ class TestSchedulerJob:
execution_date=DEFAULT_DATE + datetime.timedelta(1),
start_date=DEFAULT_DATE,
session=session,
+ data_interval=data_interval,
)
scheduler_job = Job()
session.add(scheduler_job)
@@ -2401,12 +2404,13 @@ class TestSchedulerJob:
dag = self.dagbag.get_dag(dag_id)
dagrun_info = dag.next_dagrun_info(None)
assert dagrun_info is not None
-
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dagrun_info.logical_date,
state=None,
session=session,
+ data_interval=data_interval,
)
if advance_execution_date:
@@ -2416,6 +2420,7 @@ class TestSchedulerJob:
execution_date=dr.data_interval_end,
state=None,
session=session,
+ data_interval=data_interval,
)
ex_date = dr.execution_date
@@ -2492,10 +2497,12 @@ class TestSchedulerJob:
# Run both the failed and successful tasks
dag_id = "test_dagrun_states_root_fail_unfinished"
dag = self.dagbag.get_dag(dag_id)
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=None,
+ data_interval=data_interval,
)
self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)
@@ -3977,12 +3984,14 @@ class TestSchedulerJob:
assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE +
timedelta(minutes=2)
# Trigger the Dag externally
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=timezone.utcnow(),
run_type=DagRunType.MANUAL,
session=session,
external_trigger=True,
+ data_interval=data_interval,
)
assert dr is not None
# Run DAG.bulk_write_to_db -- this is run when in
DagFileProcessor.process_file
@@ -4060,13 +4069,14 @@ class TestSchedulerJob:
)
session = settings.Session()
-
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
run1 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
start_date=timezone.utcnow() - timedelta(seconds=2),
session=session,
+ data_interval=data_interval,
)
run1_ti = run1.get_task_instance(task1.task_id, session)
@@ -4077,6 +4087,7 @@ class TestSchedulerJob:
execution_date=DEFAULT_DATE + timedelta(seconds=10),
state=State.QUEUED,
session=session,
+ data_interval=data_interval,
)
scheduler_job = Job()
@@ -4349,9 +4360,9 @@ class TestSchedulerJob:
def test_start_queued_dagruns_do_follow_execution_date_order(self,
dag_maker):
session = settings.Session()
- with dag_maker("test_dag1", max_active_runs=1) as dag:
+ with dag_maker("test_dag1", max_active_runs=1):
EmptyOperator(task_id="mytask")
- date = dag.following_schedule(DEFAULT_DATE)
+ date = DEFAULT_DATE
for i in range(30):
dr = dag_maker.create_dagrun(
run_id=f"dagrun_{i}", run_type=DagRunType.SCHEDULED,
state=State.QUEUED, execution_date=date
@@ -4393,14 +4404,18 @@ class TestSchedulerJob:
session = settings.Session()
# first dag and dagruns
date = timezone.datetime(2016, 1, 1)
+ logical_date = timezone.coerce_datetime(date)
with dag_maker("test_dagrun_states_are_correct_1", max_active_runs=1,
start_date=date) as dag:
task1 = EmptyOperator(task_id="dummy_task")
dr1_running = dag_maker.create_dagrun(run_id="dr1_run_1",
execution_date=date)
+ data_interval = dag.infer_automated_data_interval(logical_date)
dag_maker.create_dagrun(
run_id="dr1_run_2",
state=State.QUEUED,
- execution_date=dag.following_schedule(dr1_running.execution_date),
+ execution_date=dag.next_dagrun_info(
+ last_automated_dagrun=data_interval, restricted=False
+ ).data_interval.start,
)
# second dag and dagruns
date = timezone.datetime(2020, 1, 1)
@@ -4434,7 +4449,7 @@ class TestSchedulerJob:
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
- ti = TaskInstance(task=task1, execution_date=DEFAULT_DATE)
+ ti = TaskInstance(task=task1, run_id=dr1_running.run_id)
ti.refresh_from_db()
ti.state = State.SUCCESS
session.merge(ti)
@@ -4783,12 +4798,13 @@ class TestSchedulerJob:
session.query(Job).delete()
dag = dagbag.get_dag("example_branch_operator")
dag.sync_to_db()
-
+ data_interval =
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
scheduler_job = Job()
@@ -4849,11 +4865,13 @@ class TestSchedulerJob:
dag = dagbag.get_dag("example_branch_operator")
dag.sync_to_db()
+ data_interval =
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
scheduler_job = Job(executor=MockExecutor())
@@ -4917,12 +4935,13 @@ class TestSchedulerJob:
session.query(Job).delete()
dag = dagbag.get_dag("test_example_bash_operator")
dag.sync_to_db(processor_subdir=TEST_DAG_FOLDER)
-
+ data_interval =
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
task = dag.get_task(task_id="run_this_last")
@@ -4998,7 +5017,7 @@ class TestSchedulerJob:
assert active_dag_count == 1
@mock.patch.object(settings, "USE_JOB_SCHEDULE", False)
- def run_scheduler_until_dagrun_terminal(self, job_runner:
SchedulerJobRunner):
+ def run_scheduler_until_dagrun_terminal(self):
"""
Run a scheduler until any dag run reaches a terminal state, or the
scheduler becomes "idle".
@@ -5025,23 +5044,23 @@ class TestSchedulerJob:
num_finished_events: deque[int] = deque([], 3)
do_scheduling_spy = mock.patch.object(
- job_runner,
+ self.job_runner,
"_do_scheduling",
- side_effect=spy_on_return(job_runner._do_scheduling,
num_queued_tis),
+ side_effect=spy_on_return(self.job_runner._do_scheduling,
num_queued_tis),
)
executor_events_spy = mock.patch.object(
- job_runner,
+ self.job_runner,
"_process_executor_events",
- side_effect=spy_on_return(job_runner._process_executor_events,
num_finished_events),
+
side_effect=spy_on_return(self.job_runner._process_executor_events,
num_finished_events),
)
orig_set_state = DagRun.set_state
- def watch_set_state(self: DagRun, state, **kwargs):
+ def watch_set_state(dr: DagRun, state, **kwargs):
if state in (DagRunState.SUCCESS, DagRunState.FAILED):
# Stop the scheduler
- job_runner.num_runs = 1
- orig_set_state(self, state, **kwargs) # type: ignore[call-arg]
+ self.job_runner.num_runs = 1 # type: ignore[attr-defined]
+ orig_set_state(dr, state, **kwargs) # type: ignore[call-arg]
def watch_heartbeat(*args, **kwargs):
if len(num_queued_tis) < 3 or len(num_finished_events) < 3:
@@ -5053,10 +5072,11 @@ class TestSchedulerJob:
), "Scheduler has stalled without setting the DagRun state!"
set_state_spy = mock.patch.object(DagRun, "set_state",
new=watch_set_state)
- heartbeat_spy = mock.patch.object(job_runner, "heartbeat",
new=watch_heartbeat)
+ heartbeat_spy = mock.patch.object(self.job_runner.job, "heartbeat",
new=watch_heartbeat)
+ # with heartbeat_spy, set_state_spy, do_scheduling_spy,
executor_events_spy:
with heartbeat_spy, set_state_spy, do_scheduling_spy,
executor_events_spy:
- run_job(job_runner.job, execute_callable=job_runner._execute)
+ run_job(self.job_runner.job,
execute_callable=self.job_runner._execute)
@pytest.mark.long_running
@pytest.mark.parametrize("dag_id", ["test_mapped_classic",
"test_mapped_taskflow"])
@@ -5068,19 +5088,22 @@ class TestSchedulerJob:
self.dagbag.process_file(str(TEST_DAGS_FOLDER / f"{dag_id}.py"))
dag = self.dagbag.get_dag(dag_id)
assert dag
+ logical_date = timezone.coerce_datetime(timezone.utcnow() -
datetime.timedelta(days=2))
+ data_interval = dag.infer_automated_data_interval(logical_date)
dr = dag.create_dagrun(
+ run_id=f"{dag_id}_1",
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
state=State.RUNNING,
- execution_date=timezone.utcnow() - datetime.timedelta(days=2),
session=session,
+ data_interval=data_interval,
)
executor = SequentialExecutor()
job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(job=job, subdir=dag.fileloc)
- self.run_scheduler_until_dagrun_terminal(job)
+ self.run_scheduler_until_dagrun_terminal()
dr.refresh_from_db(session)
assert dr.state == DagRunState.SUCCESS