Copilot commented on code in PR #67679:
URL: https://github.com/apache/airflow/pull/67679#discussion_r3323352577
##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -422,7 +422,7 @@ def _get_state(self) -> EdgeWorkerState:
return EdgeWorkerState.MAINTENANCE_MODE
return EdgeWorkerState.IDLE
- def _run_job_via_supervisor(self, workload: ExecuteTask, error_file_path:
Path) -> int:
+ def _run_job_via_supervisor(self, workload: ExecuteTypeBody,
error_file_path: Path) -> int:
"""Run a task by calling the supervisor directly (executes inside a
forked child process)."""
_reset_parent_signal_state()
Review Comment:
In the Airflow 3.3+ path, `_run_job_via_supervisor` calls
`BaseExecutor.run_workload()` but ignores its integer return code and later
returns 0. When using the fork/multiprocessing path, this makes the child
process exit with code 0 even if the workload failed (returning from a
`Process` target does not set a non-zero exit status). Propagate the returned
exit code via `SystemExit` (or similar) so the parent correctly marks the job
failed.
##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -102,44 +103,72 @@ def queue_workload(
session: Session = NEW_SESSION,
) -> None:
"""Put new workload to queue. Airflow 3 entry point to execute a
task."""
- if not isinstance(workload, workloads.ExecuteTask):
- raise TypeError(f"Don't know how to queue workload of type
{type(workload).__name__}")
-
- task_instance = workload.ti
- key = task_instance.key
-
- # Check if job already exists with same dag_id, task_id, run_id,
map_index, try_number
- existing_job = session.scalars(
- select(EdgeJobModel).where(
- EdgeJobModel.dag_id == key.dag_id,
- EdgeJobModel.task_id == key.task_id,
- EdgeJobModel.run_id == key.run_id,
- EdgeJobModel.map_index == key.map_index,
- EdgeJobModel.try_number == key.try_number,
- )
- ).first()
-
- if existing_job:
- existing_job.state = TaskInstanceState.QUEUED
- existing_job.queue = task_instance.queue
- existing_job.concurrency_slots = task_instance.pool_slots
- existing_job.command = workload.model_dump_json()
- existing_job.team_name = self.team_name
- else:
- session.add(
- EdgeJobModel(
- dag_id=key.dag_id,
- task_id=key.task_id,
- run_id=key.run_id,
- map_index=key.map_index,
- try_number=key.try_number,
- state=TaskInstanceState.QUEUED,
- queue=task_instance.queue,
- concurrency_slots=task_instance.pool_slots,
- command=workload.model_dump_json(),
- team_name=self.team_name,
+ if is_callback_execute(workload):
+ from airflow.providers.edge3.models.types import
EXECUTE_CALLBACK_TAG
+
+ existing_job = session.scalars(
+ select(EdgeJobModel).where(
+ EdgeJobModel.dag_id == EXECUTE_CALLBACK_TAG,
+ EdgeJobModel.task_id == workload.callback.id,
+ EdgeJobModel.run_id ==
f"{EXECUTE_CALLBACK_TAG}-{workload.callback.id}",
)
Review Comment:
Callback jobs are inserted/updated without `team_name`, but `/jobs/fetch`
always filters by `EdgeJobModel.team_name == body.team_name`. This means
ExecuteCallback workloads queued by a non-default team executor will never be
fetched by that team's workers (and may be picked up by default-team workers).
Include `team_name` (and the fixed map_index/try_number) in the lookup to avoid
cross-team collisions and to match the fetch filter.
##########
providers/edge3/docs/edge_executor.rst:
##########
@@ -169,6 +169,39 @@ Here is an example setting pool_slots for a task:
task_with_template()
+
+.. _edge_executor:execute_callback:
+
+Support ExecuteCallback in Worker
+---------------------------------
+
+In addition to executing tasks, the EdgeExecutor can also dispatch
executor-level
+callbacks (``ExecuteCallback`` workloads, e.g. deadline callbacks) to edge
workers.
+When the scheduler hands an ``ExecuteCallback`` to
``EdgeExecutor.queue_workload``,
+it is enqueued into the same job queue (``EdgeJobModel``) that is used for task
+workloads, so an edge worker picks it up alongside regular tasks without any
+additional configuration.
+
+Callback jobs share the ``EdgeJobModel`` table with task jobs. They are
+distinguished by reserved values in the identifier columns:
+
+- ``dag_id`` is set to the constant tag ``ExecuteCallback``.
+- ``task_id`` is set to the callback key (the callback ID).
+- ``run_id`` is set to ``ExecuteCallback-<callback_key>``.
+- ``map_index`` is fixed to ``-1`` and ``try_number`` to ``0``.
+
+When the worker fetches such a job through the worker API, the command payload
is
+deserialized back into an ``ExecuteCallback`` workload (instead of an
+``ExecuteTask``) based on these identifiers. The worker then runs the callback
+through ``BaseExecutor.run_workload`` rather than the task supervisor flow
used for
+normal tasks.
Review Comment:
The wording here implies only callbacks use `BaseExecutor.run_workload`
while tasks use a different supervisor flow. In Airflow 3.3+ the worker
executes *both* ExecuteTask and ExecuteCallback via `BaseExecutor.run_workload`
(or `airflow.sdk.execution_time.execute_workload` in the subprocess path), so
the documentation should reflect that to avoid confusion.
##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -504,7 +504,7 @@ def _launch_job_subprocess(self, workload: ExecuteTask) ->
tuple[subprocess.Pope
)
return process, stderr_file_path
Review Comment:
`_launch_job_subprocess()` now accepts `ExecuteTypeBody`, which can be an
`ExecuteCallback` workload on Airflow 3.3+. `ExecuteCallback` does not have
`.ti`, so logging `workload.ti.id` will raise `AttributeError` and prevent
callback jobs from launching. Use `workload.display_name` (available on both
ExecuteTask and ExecuteCallback) for logging.
##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -102,44 +103,72 @@ def queue_workload(
session: Session = NEW_SESSION,
) -> None:
"""Put new workload to queue. Airflow 3 entry point to execute a
task."""
- if not isinstance(workload, workloads.ExecuteTask):
- raise TypeError(f"Don't know how to queue workload of type
{type(workload).__name__}")
-
- task_instance = workload.ti
- key = task_instance.key
-
- # Check if job already exists with same dag_id, task_id, run_id,
map_index, try_number
- existing_job = session.scalars(
- select(EdgeJobModel).where(
- EdgeJobModel.dag_id == key.dag_id,
- EdgeJobModel.task_id == key.task_id,
- EdgeJobModel.run_id == key.run_id,
- EdgeJobModel.map_index == key.map_index,
- EdgeJobModel.try_number == key.try_number,
- )
- ).first()
-
- if existing_job:
- existing_job.state = TaskInstanceState.QUEUED
- existing_job.queue = task_instance.queue
- existing_job.concurrency_slots = task_instance.pool_slots
- existing_job.command = workload.model_dump_json()
- existing_job.team_name = self.team_name
- else:
- session.add(
- EdgeJobModel(
- dag_id=key.dag_id,
- task_id=key.task_id,
- run_id=key.run_id,
- map_index=key.map_index,
- try_number=key.try_number,
- state=TaskInstanceState.QUEUED,
- queue=task_instance.queue,
- concurrency_slots=task_instance.pool_slots,
- command=workload.model_dump_json(),
- team_name=self.team_name,
+ if is_callback_execute(workload):
+ from airflow.providers.edge3.models.types import
EXECUTE_CALLBACK_TAG
+
+ existing_job = session.scalars(
+ select(EdgeJobModel).where(
+ EdgeJobModel.dag_id == EXECUTE_CALLBACK_TAG,
+ EdgeJobModel.task_id == workload.callback.id,
+ EdgeJobModel.run_id ==
f"{EXECUTE_CALLBACK_TAG}-{workload.callback.id}",
)
- )
+ ).first()
+
+ if existing_job:
+ existing_job.state = TaskInstanceState.QUEUED
+ existing_job.command = workload.model_dump_json()
+ else:
+ session.add(
+ EdgeJobModel(
+ dag_id=EXECUTE_CALLBACK_TAG,
+ task_id=str(workload.callback.id),
+
run_id=f"{EXECUTE_CALLBACK_TAG}-{workload.callback.id}",
+ map_index=-1,
+ try_number=0,
+ queue=self.conf.get_mandatory_value("operators",
"default_queue"),
+ concurrency_slots=1,
+ state=TaskInstanceState.QUEUED,
+ command=workload.model_dump_json(),
+ )
+ )
Review Comment:
When creating an ExecuteCallback job row, `team_name` is not set. Because
the fetch endpoint filters on `team_name`, this can strand callback jobs (or
route them to default-team workers). Set `team_name=self.team_name` when
inserting callback jobs (and consider setting it when updating existing jobs as
well).
##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -518,7 +518,7 @@ def _launch_job_fork(self, workload: ExecuteTask) ->
tuple[Process, Path]:
logger.info("Launched task fork pid=%d for %s", process.pid,
workload.ti.id)
return process, error_file_path
Review Comment:
`_launch_job_fork()` logs `workload.ti.id`, but `ExecuteCallback` workloads
(Airflow 3.3+) don't have `.ti`, causing an `AttributeError` and preventing
callback jobs from launching. Use `workload.display_name` for a
workload-agnostic identifier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]