This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa9da93a8c Handle executor events in `dag.test()` (#41625)
aa9da93a8c is described below

commit aa9da93a8cd150dd386af7d3524d79c7c201bdc9
Author: Vincent <[email protected]>
AuthorDate: Thu Aug 22 09:33:28 2024 -0400

    Handle executor events in `dag.test()` (#41625)
---
 airflow/jobs/scheduler_job_runner.py | 37 +++++++++++++++++++++++++++---------
 airflow/models/dag.py                | 10 ++++++++++
 2 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index baf91dc40d..b662c3215a 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -739,9 +739,24 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         session.bulk_save_objects(objects=objects, preserve_order=False)
 
     def _process_executor_events(self, executor: BaseExecutor, session: 
Session) -> int:
-        """Respond to executor events."""
         if not self._standalone_dag_processor and not self.processor_agent:
             raise ValueError("Processor agent is not started.")
+
+        return SchedulerJobRunner.process_executor_events(
+            executor=executor, dag_bag=self.dagbag, job_id=self.job.id, 
session=session
+        )
+
+    @classmethod
+    def process_executor_events(
+        cls, executor: BaseExecutor, dag_bag: DagBag, job_id: str | None, 
session: Session
+    ) -> int:
+        """
+        Respond to executor events.
+
+        This is a classmethod because this is also used in `dag.test()`.
+        `dag.test` execute DAGs with no scheduler, therefore it needs to 
handle the events pushed by the
+        executors as well.
+        """
         ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] 
= {}
         event_buffer = executor.get_event_buffer()
         tis_with_right_state: list[TaskInstanceKey] = []
@@ -751,7 +766,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             # We create map (dag_id, task_id, execution_date) -> in-memory 
try_number
             ti_primary_key_to_try_number_map[ti_key.primary] = 
ti_key.try_number
 
-            self.log.info("Received executor event with state %s for task 
instance %s", state, ti_key)
+            cls.logger().info("Received executor event with state %s for task 
instance %s", state, ti_key)
             if state in (
                 TaskInstanceState.FAILED,
                 TaskInstanceState.SUCCESS,
@@ -778,7 +793,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
             if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
                 ti.external_executor_id = info
-                self.log.info("Setting external_id for %s to %s", ti, info)
+                cls.logger().info("Setting external_id for %s to %s", ti, info)
                 continue
 
             msg = (
@@ -788,7 +803,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 "job_id=%s, pool=%s, queue=%s, priority_weight=%d, 
operator=%s, queued_dttm=%s, "
                 "queued_by_job_id=%s, pid=%s"
             )
-            self.log.info(
+            cls.logger().info(
                 msg,
                 ti.dag_id,
                 ti.task_id,
@@ -876,9 +891,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             # All of this could also happen if the state is "running",
             # but that is handled by the zombie detection.
 
-            ti_queued = ti.try_number == buffer_key.try_number and ti.state == 
TaskInstanceState.QUEUED
+            ti_queued = ti.try_number == buffer_key.try_number and ti.state in 
(
+                TaskInstanceState.SCHEDULED,
+                TaskInstanceState.QUEUED,
+                TaskInstanceState.RUNNING,
+            )
             ti_requeued = (
-                ti.queued_by_job_id != self.job.id  # Another scheduler has 
queued this task again
+                ti.queued_by_job_id != job_id  # Another scheduler has queued 
this task again
                 or executor.has_task(ti)  # This scheduler has this task 
already
             )
 
@@ -894,15 +913,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 )
                 if info is not None:
                     msg += " Extra info: %s" % info  # noqa: RUF100, UP031, 
flynt
-                self.log.error(msg)
+                cls.logger().error(msg)
                 session.add(Log(event="state mismatch", extra=msg, 
task_instance=ti.key))
 
                 # Get task from the Serialized DAG
                 try:
-                    dag = self.dagbag.get_dag(ti.dag_id)
+                    dag = dag_bag.get_dag(ti.dag_id)
                     task = dag.get_task(ti.task_id)
                 except Exception:
-                    self.log.exception("Marking task instance %s as %s", ti, 
state)
+                    cls.logger().exception("Marking task instance %s as %s", 
ti, state)
                     ti.set_state(state)
                     continue
                 ti.task = task
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index b685b28343..bdc508951a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2695,6 +2695,11 @@ class DAG(LoggingMixin):
             # - if ``use_executor`` is True, sends the task instances to the 
executor with
             #   ``BaseExecutor.queue_task_instance``
             if use_executor:
+                from airflow.models.dagbag import DagBag
+
+                dag_bag = DagBag()
+                dag_bag.bag_dag(self)
+
                 executor = ExecutorLoader.get_default_executor()
                 executor.start()
 
@@ -2743,6 +2748,11 @@ class DAG(LoggingMixin):
                             self.log.exception("Task failed; ti=%s", ti)
                 if use_executor:
                     executor.heartbeat()
+                    from airflow.jobs.scheduler_job_runner import 
SchedulerJobRunner
+
+                    SchedulerJobRunner.process_executor_events(
+                        executor=executor, dag_bag=dag_bag, job_id=None, 
session=session
+                    )
             if use_executor:
                 executor.end()
         return dr

Reply via email to