o-nikolas commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2830069032
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3201,22 +3313,28 @@ def _try_to_load_executor(self, ti: TaskInstance,
session, team_name=NOTSET) ->
# No executor found for that team, fall back to global
default
executor = self.executor
else:
- # An executor is specified on the TaskInstance (as a str), so we
need to find it in the list of executors
+ # An executor is specified on the workload (as a str), so we need
to find it in the list of executors
for _executor in self.executors:
- if _executor.name and ti.executor in (_executor.name.alias,
_executor.name.module_path):
+ if _executor.name and workload.get_executor_name() in (
+ _executor.name.alias,
+ _executor.name.module_path,
+ ):
# The executor must either match the team or be global
(i.e. team_name is None)
if team_name and _executor.team_name == team_name or
_executor.team_name is None:
executor = _executor
if executor is not None:
- self.log.debug("Found executor %s for task %s (team: %s)",
executor.name, ti, team_name)
+ self.log.debug("Found executor %s for workload %s (team: %s)",
executor.name, workload, team_name)
else:
# This case should not happen unless some (as of now unknown) edge
case occurs or direct DB
# modification, since the DAG parser will validate the tasks in
the DAG and ensure the executor
# they request is available and if not, disallow the DAG to be
scheduled.
# Keeping this exception handling because this is a critical issue
if we do somehow find
# ourselves here and the user should get some feedback about that.
- self.log.warning("Executor, %s, was not found but a Task was
configured to use it", ti.executor)
+ self.log.warning(
+ "Executor, %s, was not found but a workload was configured to
use it",
Review Comment:
Maybe "Task or Callback" instead of "workload"? Will users know what a
workload is? Here and elsewhere, anytime it's user facing I'm not sure we want
to use "workload", at least not without a considered marketing push to make
users aware of what it is? Or maybe they already are and I'm being too cautious
😆
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -979,6 +985,70 @@ def _critical_section_enqueue_task_instances(self,
session: Session) -> int:
return len(queued_tis)
+ def _enqueue_executor_callbacks(self, session: Session) -> None:
+ """
+ Enqueue ExecutorCallback workloads to executors.
+
+ Similar to _enqueue_task_instances, but for callbacks that need to run
on executors.
+ Queries for QUEUED ExecutorCallback instances and routes them to the
appropriate executor.
+
+ :param session: The database session
+ """
+ num_occupied_slots = sum(executor.slots_occupied for executor in
self.job.executors)
+ max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+
+ if max_callbacks <= 0:
+ self.log.debug("No available slots for callbacks; all executors at
capacity")
+ return
+
+ queued_callbacks = session.scalars(
+ select(ExecutorCallback)
+ .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+ .where(ExecutorCallback.state == CallbackState.QUEUED)
+ .order_by(ExecutorCallback.priority_weight.desc())
+ .limit(max_callbacks)
+ ).all()
+
+ if not queued_callbacks:
+ return
+
+ # Route callbacks to executors using the generalized routing method
+ executor_to_callbacks = self._executor_to_workloads(queued_callbacks,
session)
+
+ # Enqueue callbacks for each executor
+ for executor, callbacks in executor_to_callbacks.items():
+ for callback in callbacks:
+ if not isinstance(callback, ExecutorCallback):
+ # Can't happen since we queried ExecutorCallback, but
satisfies mypy.
+ continue
+ dag_run = None
+ if isinstance(callback.data, dict) and "dag_run_id" in
callback.data:
+ dag_run_id = callback.data["dag_run_id"]
+ dag_run = session.get(DagRun, dag_run_id)
+ elif isinstance(callback.data, dict) and "dag_id" in
callback.data:
+ # Fallback: try to find the latest dag_run for the dag_id
+ dag_id = callback.data["dag_id"]
+ dag_run = session.scalars(
+ select(DagRun)
+ .where(DagRun.dag_id == dag_id)
+ .order_by(DagRun.execution_date.desc())
+ .limit(1)
+ ).first()
+
+ if dag_run is None:
+ self.log.warning("Could not find DagRun for callback %s",
callback.id)
+ continue
+
+ workload = workloads.ExecuteCallback.make(
+ callback=callback,
+ dag_run=dag_run,
+ generator=executor.jwt_generator,
+ )
+
+ executor.queue_workload(workload, session=session)
+ callback.state = CallbackState.RUNNING
Review Comment:
Looks better now, it's going to queued once we queue it with the executor
instead of directly to running. This is more correct because there is no
guarantee that the executor is able to get the workload running.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]