This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d195ec4679 Implement start/end/debug for multiple executors in
scheduler job (#38514)
d195ec4679 is described below
commit d195ec4679cd3fce950ac03f2b33b0be6618f40d
Author: Niko Oliveira <[email protected]>
AuthorDate: Tue Apr 9 10:53:50 2024 -0700
Implement start/end/debug for multiple executors in scheduler job (#38514)
---
airflow/jobs/job.py | 5 +-
airflow/jobs/scheduler_job_runner.py | 29 ++++----
tests/jobs/test_scheduler_job.py | 135 ++++++++++++++++++++++++++++++++++-
3 files changed, 155 insertions(+), 14 deletions(-)
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index b1e41499dc..4273f1d334 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -49,6 +49,8 @@ if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
+ from airflow.executors.base_executor import BaseExecutor
+
def _resolve_dagrun_model():
from airflow.models.dagrun import DagRun
@@ -117,12 +119,13 @@ class Job(Base, LoggingMixin):
Only makes sense for SchedulerJob and BackfillJob instances.
"""
- def __init__(self, executor=None, heartrate=None, **kwargs):
+ def __init__(self, executor: BaseExecutor | None = None, heartrate=None,
**kwargs):
# Save init parameters as DB fields
self.heartbeat_failed = False
self.hostname = get_hostname()
if executor:
self.executor = executor
+ self.executors = [executor]
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
self.previous_heartbeat = None
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 0596e7f59f..6fab97750b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -270,8 +270,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.info("%s\n%s received, printing debug\n%s", "-" * 80,
sig_name, "-" * 80)
- self.job.executor.debug_dump()
- self.log.info("-" * 80)
+ for executor in self.job.executors:
+ self.log.info("Debug dump for the executor %s", executor)
+ executor.debug_dump()
+ self.log.info("-" * 80)
def __get_concurrency_maps(self, states: Iterable[TaskInstanceState],
session: Session) -> ConcurrencyMap:
"""
@@ -819,19 +821,21 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
try:
- self.job.executor.job_id = self.job.id
+ callback_sink: PipeCallbackSink | DatabaseCallbackSink
+
if self.processor_agent:
self.log.debug("Using PipeCallbackSink as callback sink.")
- self.job.executor.callback_sink = PipeCallbackSink(
- get_sink_pipe=self.processor_agent.get_callbacks_pipe
- )
+ callback_sink =
PipeCallbackSink(get_sink_pipe=self.processor_agent.get_callbacks_pipe)
else:
from airflow.callbacks.database_callback_sink import
DatabaseCallbackSink
self.log.debug("Using DatabaseCallbackSink as callback sink.")
- self.job.executor.callback_sink = DatabaseCallbackSink()
+ callback_sink = DatabaseCallbackSink()
- self.job.executor.start()
+ for executor in self.job.executors:
+ executor.job_id = self.job.id
+ executor.callback_sink = callback_sink
+ executor.start()
self.register_signals()
@@ -860,10 +864,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.exception("Exception when executing
SchedulerJob._run_scheduler_loop")
raise
finally:
- try:
- self.job.executor.end()
- except Exception:
- self.log.exception("Exception when executing Executor.end")
+ for executor in self.job.executors:
+ try:
+ executor.end()
+ except Exception:
+ self.log.exception("Exception when executing Executor.end
on %s", executor)
if self.processor_agent:
try:
self.processor_agent.end()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 2c57ca2a9f..5f3571dd6a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -25,7 +25,7 @@ from collections import deque
from datetime import timedelta
from typing import Generator
from unittest import mock
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, PropertyMock, patch
import psutil
import pytest
@@ -552,6 +552,23 @@ class TestSchedulerJob:
assert isinstance(scheduler_job.executor.callback_sink,
PipeCallbackSink)
+
@mock.patch("airflow.executors.executor_loader.ExecutorLoader.init_executors")
+
@mock.patch("airflow.executors.executor_loader.ExecutorLoader.get_default_executor")
+ @conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+ def
test_setup_callback_sink_not_standalone_dag_processor_multiple_executors(
+ self, get_default_executor_mock, init_executors_mock
+ ):
+ default_executor = mock.MagicMock(slots_available=8)
+ second_executor = mock.MagicMock(slots_available=8)
+ init_executors_mock.return_value = [default_executor, second_executor]
+ get_default_executor_mock.return_value = default_executor
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull, num_runs=1)
+ self.job_runner._execute()
+
+ for executor in scheduler_job.executors:
+ assert isinstance(executor.callback_sink, PipeCallbackSink)
+
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_setup_callback_sink_standalone_dag_processor(self):
scheduler_job = Job()
@@ -560,6 +577,68 @@ class TestSchedulerJob:
assert isinstance(scheduler_job.executor.callback_sink,
DatabaseCallbackSink)
+ @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+ @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+ @conf_vars({("scheduler", "standalone_dag_processor"): "True"})
+ def test_setup_callback_sink_standalone_dag_processor_multiple_executors(
+ self, executor_mock, executors_mock
+ ):
+ default_executor = mock.MagicMock(slots_available=8)
+ second_executor = mock.MagicMock(slots_available=8)
+ executor_mock.return_value = default_executor
+ executors_mock.return_value = [default_executor, second_executor]
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull, num_runs=1)
+ self.job_runner._execute()
+
+ for executor in scheduler_job.executors:
+ assert isinstance(executor.callback_sink, DatabaseCallbackSink)
+
+ @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+ @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+ @conf_vars({("scheduler", "standalone_dag_processor"): "True"})
+ def test_executor_start_called(self, executor_mock, executors_mock):
+ default_executor = mock.MagicMock(slots_available=8)
+ second_executor = mock.MagicMock(slots_available=8)
+ executor_mock.return_value = default_executor
+ executors_mock.return_value = [default_executor, second_executor]
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull, num_runs=1)
+ self.job_runner._execute()
+
+ scheduler_job.executor.start.assert_called_once()
+ for executor in scheduler_job.executors:
+ executor.start.assert_called_once()
+
+ @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+ @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+ def test_executor_job_id_assigned(self, executor_mock, executors_mock):
+ default_executor = mock.MagicMock(slots_available=8)
+ second_executor = mock.MagicMock(slots_available=8)
+ executor_mock.return_value = default_executor
+ executors_mock.return_value = [default_executor, second_executor]
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull, num_runs=1)
+ self.job_runner._execute()
+
+ assert scheduler_job.executor.job_id == scheduler_job.id
+ for executor in scheduler_job.executors:
+ assert executor.job_id == scheduler_job.id
+
+ @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+ @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+ def test_executor_debug_dump(self, executor_mock, executors_mock):
+ default_executor = mock.MagicMock(slots_available=8)
+ second_executor = mock.MagicMock(slots_available=8)
+ executor_mock.return_value = default_executor
+ executors_mock.return_value = [default_executor, second_executor]
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull, num_runs=1)
+ self.job_runner._debug_dump(1, mock.MagicMock())
+
+ for executor in scheduler_job.executors:
+ executor.debug_dump.assert_called_once()
+
def test_find_executable_task_instances_backfill(self, dag_maker):
dag_id =
"SchedulerJobTest.test_find_executable_task_instances_backfill"
task_id_1 = "dummy"
@@ -1705,6 +1784,31 @@ class TestSchedulerJob:
scheduler_job.executor.end.assert_called_once()
self.job_runner.processor_agent.end.assert_called_once()
+ @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+ @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+ @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
+ def test_executor_end_called_multiple_executors(
+ self, mock_processor_agent, executor_mock, executors_mock
+ ):
+ """
+ Test to make sure executor.end gets called on all executors with a
successful scheduler loop run
+ """
+ default_executor = mock.MagicMock(slots_available=8)
+ second_executor = mock.MagicMock(slots_available=8)
+ executor_mock.return_value = default_executor
+ executors_mock.return_value = [default_executor, second_executor]
+ scheduler_job = Job()
+ assert scheduler_job.executor is default_executor
+ assert scheduler_job.executors == [default_executor, second_executor]
+
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull, num_runs=1)
+ run_job(scheduler_job, execute_callable=self.job_runner._execute)
+ scheduler_job.executor.end.assert_called_once()
+ for executor in scheduler_job.executors:
+ executor.end.assert_called_once()
+
+ self.job_runner.processor_agent.end.assert_called_once()
+
@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
def test_cleanup_methods_all_called(self, mock_processor_agent):
"""
@@ -1723,6 +1827,35 @@ class TestSchedulerJob:
scheduler_job.executor.end.assert_called_once()
mock_processor_agent.return_value.end.reset_mock(side_effect=True)
+ @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+ @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+ @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
+ def test_cleanup_methods_all_called_multiple_executors(
+ self, mock_processor_agent, executor_mock, executors_mock
+ ):
+ """
+ Test to make sure all cleanup methods are called when the scheduler
loop has an exception
+ """
+ default_executor = mock.MagicMock(slots_available=8)
+ second_executor = mock.MagicMock(slots_available=8)
+ executor_mock.return_value = default_executor
+ executors_mock.return_value = [default_executor, second_executor]
+ scheduler_job = Job()
+ assert scheduler_job.executor is default_executor
+ assert scheduler_job.executors == [default_executor, second_executor]
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull, num_runs=1)
+ self.job_runner._run_scheduler_loop =
mock.MagicMock(side_effect=Exception("oops"))
+ mock_processor_agent.return_value.end.side_effect = Exception("double
oops")
+ scheduler_job.executor.end =
mock.MagicMock(side_effect=Exception("triple oops"))
+
+ with pytest.raises(Exception):
+ run_job(scheduler_job, execute_callable=self.job_runner._execute)
+
+ self.job_runner.processor_agent.end.assert_called_once()
+ for executor in scheduler_job.executors:
+ executor.end.assert_called_once()
+ mock_processor_agent.return_value.end.reset_mock(side_effect=True)
+
def test_queued_dagruns_stops_creating_when_max_active_is_reached(self,
dag_maker):
"""This tests that queued dagruns stops creating once max_active_runs
is reached"""
with dag_maker(max_active_runs=10) as dag: