This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d195ec4679 Implement start/end/debug for multiple executors in 
scheduler job (#38514)
d195ec4679 is described below

commit d195ec4679cd3fce950ac03f2b33b0be6618f40d
Author: Niko Oliveira <[email protected]>
AuthorDate: Tue Apr 9 10:53:50 2024 -0700

    Implement start/end/debug for multiple executors in scheduler job (#38514)
---
 airflow/jobs/job.py                  |   5 +-
 airflow/jobs/scheduler_job_runner.py |  29 ++++----
 tests/jobs/test_scheduler_job.py     | 135 ++++++++++++++++++++++++++++++++++-
 3 files changed, 155 insertions(+), 14 deletions(-)

diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index b1e41499dc..4273f1d334 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -49,6 +49,8 @@ if TYPE_CHECKING:
 
     from sqlalchemy.orm.session import Session
 
+    from airflow.executors.base_executor import BaseExecutor
+
 
 def _resolve_dagrun_model():
     from airflow.models.dagrun import DagRun
@@ -117,12 +119,13 @@ class Job(Base, LoggingMixin):
     Only makes sense for SchedulerJob and BackfillJob instances.
     """
 
-    def __init__(self, executor=None, heartrate=None, **kwargs):
+    def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
         # Save init parameters as DB fields
         self.heartbeat_failed = False
         self.hostname = get_hostname()
         if executor:
             self.executor = executor
+            self.executors = [executor]
         self.start_date = timezone.utcnow()
         self.latest_heartbeat = timezone.utcnow()
         self.previous_heartbeat = None
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 0596e7f59f..6fab97750b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -270,8 +270,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, 
sig_name, "-" * 80)
 
-        self.job.executor.debug_dump()
-        self.log.info("-" * 80)
+        for executor in self.job.executors:
+            self.log.info("Debug dump for the executor %s", executor)
+            executor.debug_dump()
+            self.log.info("-" * 80)
 
     def __get_concurrency_maps(self, states: Iterable[TaskInstanceState], 
session: Session) -> ConcurrencyMap:
         """
@@ -819,19 +821,21 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
 
         try:
-            self.job.executor.job_id = self.job.id
+            callback_sink: PipeCallbackSink | DatabaseCallbackSink
+
             if self.processor_agent:
                 self.log.debug("Using PipeCallbackSink as callback sink.")
-                self.job.executor.callback_sink = PipeCallbackSink(
-                    get_sink_pipe=self.processor_agent.get_callbacks_pipe
-                )
+                callback_sink = 
PipeCallbackSink(get_sink_pipe=self.processor_agent.get_callbacks_pipe)
             else:
                 from airflow.callbacks.database_callback_sink import 
DatabaseCallbackSink
 
                 self.log.debug("Using DatabaseCallbackSink as callback sink.")
-                self.job.executor.callback_sink = DatabaseCallbackSink()
+                callback_sink = DatabaseCallbackSink()
 
-            self.job.executor.start()
+            for executor in self.job.executors:
+                executor.job_id = self.job.id
+                executor.callback_sink = callback_sink
+                executor.start()
 
             self.register_signals()
 
@@ -860,10 +864,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             self.log.exception("Exception when executing 
SchedulerJob._run_scheduler_loop")
             raise
         finally:
-            try:
-                self.job.executor.end()
-            except Exception:
-                self.log.exception("Exception when executing Executor.end")
+            for executor in self.job.executors:
+                try:
+                    executor.end()
+                except Exception:
+                    self.log.exception("Exception when executing Executor.end 
on %s", executor)
             if self.processor_agent:
                 try:
                     self.processor_agent.end()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 2c57ca2a9f..5f3571dd6a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -25,7 +25,7 @@ from collections import deque
 from datetime import timedelta
 from typing import Generator
 from unittest import mock
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, PropertyMock, patch
 
 import psutil
 import pytest
@@ -552,6 +552,23 @@ class TestSchedulerJob:
 
         assert isinstance(scheduler_job.executor.callback_sink, 
PipeCallbackSink)
 
+    
@mock.patch("airflow.executors.executor_loader.ExecutorLoader.init_executors")
+    
@mock.patch("airflow.executors.executor_loader.ExecutorLoader.get_default_executor")
+    @conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+    def 
test_setup_callback_sink_not_standalone_dag_processor_multiple_executors(
+        self, get_default_executor_mock, init_executors_mock
+    ):
+        default_executor = mock.MagicMock(slots_available=8)
+        second_executor = mock.MagicMock(slots_available=8)
+        init_executors_mock.return_value = [default_executor, second_executor]
+        get_default_executor_mock.return_value = default_executor
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull, num_runs=1)
+        self.job_runner._execute()
+
+        for executor in scheduler_job.executors:
+            assert isinstance(executor.callback_sink, PipeCallbackSink)
+
     @conf_vars({("scheduler", "standalone_dag_processor"): "True"})
     def test_setup_callback_sink_standalone_dag_processor(self):
         scheduler_job = Job()
@@ -560,6 +577,68 @@ class TestSchedulerJob:
 
         assert isinstance(scheduler_job.executor.callback_sink, 
DatabaseCallbackSink)
 
+    @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+    @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+    @conf_vars({("scheduler", "standalone_dag_processor"): "True"})
+    def test_setup_callback_sink_standalone_dag_processor_multiple_executors(
+        self, executor_mock, executors_mock
+    ):
+        default_executor = mock.MagicMock(slots_available=8)
+        second_executor = mock.MagicMock(slots_available=8)
+        executor_mock.return_value = default_executor
+        executors_mock.return_value = [default_executor, second_executor]
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull, num_runs=1)
+        self.job_runner._execute()
+
+        for executor in scheduler_job.executors:
+            assert isinstance(executor.callback_sink, DatabaseCallbackSink)
+
+    @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+    @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+    @conf_vars({("scheduler", "standalone_dag_processor"): "True"})
+    def test_executor_start_called(self, executor_mock, executors_mock):
+        default_executor = mock.MagicMock(slots_available=8)
+        second_executor = mock.MagicMock(slots_available=8)
+        executor_mock.return_value = default_executor
+        executors_mock.return_value = [default_executor, second_executor]
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull, num_runs=1)
+        self.job_runner._execute()
+
+        scheduler_job.executor.start.assert_called_once()
+        for executor in scheduler_job.executors:
+            executor.start.assert_called_once()
+
+    @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+    @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+    def test_executor_job_id_assigned(self, executor_mock, executors_mock):
+        default_executor = mock.MagicMock(slots_available=8)
+        second_executor = mock.MagicMock(slots_available=8)
+        executor_mock.return_value = default_executor
+        executors_mock.return_value = [default_executor, second_executor]
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull, num_runs=1)
+        self.job_runner._execute()
+
+        assert scheduler_job.executor.job_id == scheduler_job.id
+        for executor in scheduler_job.executors:
+            assert executor.job_id == scheduler_job.id
+
+    @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+    @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+    def test_executor_debug_dump(self, executor_mock, executors_mock):
+        default_executor = mock.MagicMock(slots_available=8)
+        second_executor = mock.MagicMock(slots_available=8)
+        executor_mock.return_value = default_executor
+        executors_mock.return_value = [default_executor, second_executor]
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull, num_runs=1)
+        self.job_runner._debug_dump(1, mock.MagicMock())
+
+        for executor in scheduler_job.executors:
+            executor.debug_dump.assert_called_once()
+
     def test_find_executable_task_instances_backfill(self, dag_maker):
         dag_id = 
"SchedulerJobTest.test_find_executable_task_instances_backfill"
         task_id_1 = "dummy"
@@ -1705,6 +1784,31 @@ class TestSchedulerJob:
         scheduler_job.executor.end.assert_called_once()
         self.job_runner.processor_agent.end.assert_called_once()
 
+    @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+    @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+    @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
+    def test_executor_end_called_multiple_executors(
+        self, mock_processor_agent, executor_mock, executors_mock
+    ):
+        """
+        Test to make sure executor.end gets called on all executors with a 
successful scheduler loop run
+        """
+        default_executor = mock.MagicMock(slots_available=8)
+        second_executor = mock.MagicMock(slots_available=8)
+        executor_mock.return_value = default_executor
+        executors_mock.return_value = [default_executor, second_executor]
+        scheduler_job = Job()
+        assert scheduler_job.executor is default_executor
+        assert scheduler_job.executors == [default_executor, second_executor]
+
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull, num_runs=1)
+        run_job(scheduler_job, execute_callable=self.job_runner._execute)
+        scheduler_job.executor.end.assert_called_once()
+        for executor in scheduler_job.executors:
+            executor.end.assert_called_once()
+
+        self.job_runner.processor_agent.end.assert_called_once()
+
     @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
     def test_cleanup_methods_all_called(self, mock_processor_agent):
         """
@@ -1723,6 +1827,35 @@ class TestSchedulerJob:
         scheduler_job.executor.end.assert_called_once()
         mock_processor_agent.return_value.end.reset_mock(side_effect=True)
 
+    @mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
+    @mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
+    @mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
+    def test_cleanup_methods_all_called_multiple_executors(
+        self, mock_processor_agent, executor_mock, executors_mock
+    ):
+        """
+        Test to make sure all cleanup methods are called when the scheduler 
loop has an exception
+        """
+        default_executor = mock.MagicMock(slots_available=8)
+        second_executor = mock.MagicMock(slots_available=8)
+        executor_mock.return_value = default_executor
+        executors_mock.return_value = [default_executor, second_executor]
+        scheduler_job = Job()
+        assert scheduler_job.executor is default_executor
+        assert scheduler_job.executors == [default_executor, second_executor]
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull, num_runs=1)
+        self.job_runner._run_scheduler_loop = 
mock.MagicMock(side_effect=Exception("oops"))
+        mock_processor_agent.return_value.end.side_effect = Exception("double 
oops")
+        scheduler_job.executor.end = 
mock.MagicMock(side_effect=Exception("triple oops"))
+
+        with pytest.raises(Exception):
+            run_job(scheduler_job, execute_callable=self.job_runner._execute)
+
+        self.job_runner.processor_agent.end.assert_called_once()
+        for executor in scheduler_job.executors:
+            executor.end.assert_called_once()
+        mock_processor_agent.return_value.end.reset_mock(side_effect=True)
+
     def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, 
dag_maker):
         """This tests that queued dagruns stops creating once max_active_runs 
is reached"""
         with dag_maker(max_active_runs=10) as dag:

Reply via email to