This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa9da93a8c Handle executor events in `dag.test()` (#41625)
aa9da93a8c is described below
commit aa9da93a8cd150dd386af7d3524d79c7c201bdc9
Author: Vincent <[email protected]>
AuthorDate: Thu Aug 22 09:33:28 2024 -0400
Handle executor events in `dag.test()` (#41625)
---
airflow/jobs/scheduler_job_runner.py | 37 +++++++++++++++++++++++++++---------
airflow/models/dag.py | 10 ++++++++++
2 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index baf91dc40d..b662c3215a 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -739,9 +739,24 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
session.bulk_save_objects(objects=objects, preserve_order=False)
def _process_executor_events(self, executor: BaseExecutor, session:
Session) -> int:
- """Respond to executor events."""
if not self._standalone_dag_processor and not self.processor_agent:
raise ValueError("Processor agent is not started.")
+
+ return SchedulerJobRunner.process_executor_events(
+ executor=executor, dag_bag=self.dagbag, job_id=self.job.id,
session=session
+ )
+
+ @classmethod
+ def process_executor_events(
+ cls, executor: BaseExecutor, dag_bag: DagBag, job_id: str | None,
session: Session
+ ) -> int:
+ """
+ Respond to executor events.
+
+ This is a classmethod because this is also used in `dag.test()`.
+ `dag.test` execute DAGs with no scheduler, therefore it needs to
handle the events pushed by the
+ executors as well.
+ """
ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int]
= {}
event_buffer = executor.get_event_buffer()
tis_with_right_state: list[TaskInstanceKey] = []
@@ -751,7 +766,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# We create map (dag_id, task_id, execution_date) -> in-memory
try_number
ti_primary_key_to_try_number_map[ti_key.primary] =
ti_key.try_number
- self.log.info("Received executor event with state %s for task
instance %s", state, ti_key)
+ cls.logger().info("Received executor event with state %s for task
instance %s", state, ti_key)
if state in (
TaskInstanceState.FAILED,
TaskInstanceState.SUCCESS,
@@ -778,7 +793,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
ti.external_executor_id = info
- self.log.info("Setting external_id for %s to %s", ti, info)
+ cls.logger().info("Setting external_id for %s to %s", ti, info)
continue
msg = (
@@ -788,7 +803,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"job_id=%s, pool=%s, queue=%s, priority_weight=%d,
operator=%s, queued_dttm=%s, "
"queued_by_job_id=%s, pid=%s"
)
- self.log.info(
+ cls.logger().info(
msg,
ti.dag_id,
ti.task_id,
@@ -876,9 +891,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# All of this could also happen if the state is "running",
# but that is handled by the zombie detection.
- ti_queued = ti.try_number == buffer_key.try_number and ti.state ==
TaskInstanceState.QUEUED
+ ti_queued = ti.try_number == buffer_key.try_number and ti.state in
(
+ TaskInstanceState.SCHEDULED,
+ TaskInstanceState.QUEUED,
+ TaskInstanceState.RUNNING,
+ )
ti_requeued = (
- ti.queued_by_job_id != self.job.id # Another scheduler has
queued this task again
+ ti.queued_by_job_id != job_id # Another scheduler has queued
this task again
or executor.has_task(ti) # This scheduler has this task
already
)
@@ -894,15 +913,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
if info is not None:
msg += " Extra info: %s" % info # noqa: RUF100, UP031,
flynt
- self.log.error(msg)
+ cls.logger().error(msg)
session.add(Log(event="state mismatch", extra=msg,
task_instance=ti.key))
# Get task from the Serialized DAG
try:
- dag = self.dagbag.get_dag(ti.dag_id)
+ dag = dag_bag.get_dag(ti.dag_id)
task = dag.get_task(ti.task_id)
except Exception:
- self.log.exception("Marking task instance %s as %s", ti,
state)
+ cls.logger().exception("Marking task instance %s as %s",
ti, state)
ti.set_state(state)
continue
ti.task = task
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index b685b28343..bdc508951a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2695,6 +2695,11 @@ class DAG(LoggingMixin):
# - if ``use_executor`` is True, sends the task instances to the
executor with
# ``BaseExecutor.queue_task_instance``
if use_executor:
+ from airflow.models.dagbag import DagBag
+
+ dag_bag = DagBag()
+ dag_bag.bag_dag(self)
+
executor = ExecutorLoader.get_default_executor()
executor.start()
@@ -2743,6 +2748,11 @@ class DAG(LoggingMixin):
self.log.exception("Task failed; ti=%s", ti)
if use_executor:
executor.heartbeat()
+ from airflow.jobs.scheduler_job_runner import
SchedulerJobRunner
+
+ SchedulerJobRunner.process_executor_events(
+ executor=executor, dag_bag=dag_bag, job_id=None,
session=session
+ )
if use_executor:
executor.end()
return dr