Copilot commented on code in PR #67679:
URL: https://github.com/apache/airflow/pull/67679#discussion_r3323352577


##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -422,7 +422,7 @@ def _get_state(self) -> EdgeWorkerState:
             return EdgeWorkerState.MAINTENANCE_MODE
         return EdgeWorkerState.IDLE
 
-    def _run_job_via_supervisor(self, workload: ExecuteTask, error_file_path: 
Path) -> int:
+    def _run_job_via_supervisor(self, workload: ExecuteTypeBody, 
error_file_path: Path) -> int:
         """Run a task by calling the supervisor directly (executes inside a 
forked child process)."""
         _reset_parent_signal_state()
 

Review Comment:
   In the Airflow 3.3+ path, `_run_job_via_supervisor` calls 
`BaseExecutor.run_workload()` but ignores its integer return code and later 
returns 0. When using the fork/multiprocessing path, this makes the child 
process exit with code 0 even if the workload failed (returning from a 
`Process` target does not set a non-zero exit status). Propagate the returned 
exit code via `SystemExit` (or similar) so the parent correctly marks the job 
failed.



##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -102,44 +103,72 @@ def queue_workload(
         session: Session = NEW_SESSION,
     ) -> None:
         """Put new workload to queue. Airflow 3 entry point to execute a 
task."""
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise TypeError(f"Don't know how to queue workload of type 
{type(workload).__name__}")
-
-        task_instance = workload.ti
-        key = task_instance.key
-
-        # Check if job already exists with same dag_id, task_id, run_id, 
map_index, try_number
-        existing_job = session.scalars(
-            select(EdgeJobModel).where(
-                EdgeJobModel.dag_id == key.dag_id,
-                EdgeJobModel.task_id == key.task_id,
-                EdgeJobModel.run_id == key.run_id,
-                EdgeJobModel.map_index == key.map_index,
-                EdgeJobModel.try_number == key.try_number,
-            )
-        ).first()
-
-        if existing_job:
-            existing_job.state = TaskInstanceState.QUEUED
-            existing_job.queue = task_instance.queue
-            existing_job.concurrency_slots = task_instance.pool_slots
-            existing_job.command = workload.model_dump_json()
-            existing_job.team_name = self.team_name
-        else:
-            session.add(
-                EdgeJobModel(
-                    dag_id=key.dag_id,
-                    task_id=key.task_id,
-                    run_id=key.run_id,
-                    map_index=key.map_index,
-                    try_number=key.try_number,
-                    state=TaskInstanceState.QUEUED,
-                    queue=task_instance.queue,
-                    concurrency_slots=task_instance.pool_slots,
-                    command=workload.model_dump_json(),
-                    team_name=self.team_name,
+        if is_callback_execute(workload):
+            from airflow.providers.edge3.models.types import 
EXECUTE_CALLBACK_TAG
+
+            existing_job = session.scalars(
+                select(EdgeJobModel).where(
+                    EdgeJobModel.dag_id == EXECUTE_CALLBACK_TAG,
+                    EdgeJobModel.task_id == workload.callback.id,
+                    EdgeJobModel.run_id == 
f"{EXECUTE_CALLBACK_TAG}-{workload.callback.id}",
                 )

Review Comment:
   Callback jobs are inserted/updated without `team_name`, but `/jobs/fetch` 
always filters by `EdgeJobModel.team_name == body.team_name`. This means 
ExecuteCallback workloads queued by a non-default team executor will never be 
fetched by that team's workers (and may be picked up by default-team workers). 
Include `team_name` (and the fixed map_index/try_number) in the lookup to avoid 
cross-team collisions and to match the fetch filter.



##########
providers/edge3/docs/edge_executor.rst:
##########
@@ -169,6 +169,39 @@ Here is an example setting pool_slots for a task:
 
         task_with_template()
 
+
+.. _edge_executor:execute_callback:
+
+Support ExecuteCallback in Worker
+---------------------------------
+
+In addition to executing tasks, the EdgeExecutor can also dispatch 
executor-level
+callbacks (``ExecuteCallback`` workloads, e.g. deadline callbacks) to edge 
workers.
+When the scheduler hands an ``ExecuteCallback`` to 
``EdgeExecutor.queue_workload``,
+it is enqueued into the same job queue (``EdgeJobModel``) that is used for task
+workloads, so an edge worker picks it up alongside regular tasks without any
+additional configuration.
+
+Callback jobs share the ``EdgeJobModel`` table with task jobs. They are
+distinguished by reserved values in the identifier columns:
+
+- ``dag_id`` is set to the constant tag ``ExecuteCallback``.
+- ``task_id`` is set to the callback key (the callback ID).
+- ``run_id`` is set to ``ExecuteCallback-<callback_key>``.
+- ``map_index`` is fixed to ``-1`` and ``try_number`` to ``0``.
+
+When the worker fetches such a job through the worker API, the command payload 
is
+deserialized back into an ``ExecuteCallback`` workload (instead of an
+``ExecuteTask``) based on these identifiers. The worker then runs the callback
+through ``BaseExecutor.run_workload`` rather than the task supervisor flow 
used for
+normal tasks.

Review Comment:
   The wording here implies only callbacks use `BaseExecutor.run_workload` 
while tasks use a different supervisor flow. In Airflow 3.3+ the worker 
executes *both* ExecuteTask and ExecuteCallback via `BaseExecutor.run_workload` 
(or `airflow.sdk.execution_time.execute_workload` in the subprocess path), so 
the documentation should reflect that to avoid confusion.



##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -504,7 +504,7 @@ def _launch_job_subprocess(self, workload: ExecuteTask) -> 
tuple[subprocess.Pope
         )
         return process, stderr_file_path

Review Comment:
   `_launch_job_subprocess()` now accepts `ExecuteTypeBody`, which can be an 
`ExecuteCallback` workload on Airflow 3.3+. `ExecuteCallback` does not have 
`.ti`, so logging `workload.ti.id` will raise `AttributeError` and prevent 
callback jobs from launching. Use `workload.display_name` (available on both 
ExecuteTask and ExecuteCallback) for logging.



##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -102,44 +103,72 @@ def queue_workload(
         session: Session = NEW_SESSION,
     ) -> None:
         """Put new workload to queue. Airflow 3 entry point to execute a 
task."""
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise TypeError(f"Don't know how to queue workload of type 
{type(workload).__name__}")
-
-        task_instance = workload.ti
-        key = task_instance.key
-
-        # Check if job already exists with same dag_id, task_id, run_id, 
map_index, try_number
-        existing_job = session.scalars(
-            select(EdgeJobModel).where(
-                EdgeJobModel.dag_id == key.dag_id,
-                EdgeJobModel.task_id == key.task_id,
-                EdgeJobModel.run_id == key.run_id,
-                EdgeJobModel.map_index == key.map_index,
-                EdgeJobModel.try_number == key.try_number,
-            )
-        ).first()
-
-        if existing_job:
-            existing_job.state = TaskInstanceState.QUEUED
-            existing_job.queue = task_instance.queue
-            existing_job.concurrency_slots = task_instance.pool_slots
-            existing_job.command = workload.model_dump_json()
-            existing_job.team_name = self.team_name
-        else:
-            session.add(
-                EdgeJobModel(
-                    dag_id=key.dag_id,
-                    task_id=key.task_id,
-                    run_id=key.run_id,
-                    map_index=key.map_index,
-                    try_number=key.try_number,
-                    state=TaskInstanceState.QUEUED,
-                    queue=task_instance.queue,
-                    concurrency_slots=task_instance.pool_slots,
-                    command=workload.model_dump_json(),
-                    team_name=self.team_name,
+        if is_callback_execute(workload):
+            from airflow.providers.edge3.models.types import 
EXECUTE_CALLBACK_TAG
+
+            existing_job = session.scalars(
+                select(EdgeJobModel).where(
+                    EdgeJobModel.dag_id == EXECUTE_CALLBACK_TAG,
+                    EdgeJobModel.task_id == workload.callback.id,
+                    EdgeJobModel.run_id == 
f"{EXECUTE_CALLBACK_TAG}-{workload.callback.id}",
                 )
-            )
+            ).first()
+
+            if existing_job:
+                existing_job.state = TaskInstanceState.QUEUED
+                existing_job.command = workload.model_dump_json()
+            else:
+                session.add(
+                    EdgeJobModel(
+                        dag_id=EXECUTE_CALLBACK_TAG,
+                        task_id=str(workload.callback.id),
+                        
run_id=f"{EXECUTE_CALLBACK_TAG}-{workload.callback.id}",
+                        map_index=-1,
+                        try_number=0,
+                        queue=self.conf.get_mandatory_value("operators", 
"default_queue"),
+                        concurrency_slots=1,
+                        state=TaskInstanceState.QUEUED,
+                        command=workload.model_dump_json(),
+                    )
+                )

Review Comment:
   When creating an ExecuteCallback job row, `team_name` is not set. Because 
the fetch endpoint filters on `team_name`, this can strand callback jobs (or 
route them to default-team workers). Set `team_name=self.team_name` when 
inserting callback jobs (and consider setting it when updating existing jobs as 
well).



##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -518,7 +518,7 @@ def _launch_job_fork(self, workload: ExecuteTask) -> 
tuple[Process, Path]:
         logger.info("Launched task fork pid=%d for %s", process.pid, 
workload.ti.id)
         return process, error_file_path

Review Comment:
   `_launch_job_fork()` logs `workload.ti.id`, but `ExecuteCallback` workloads 
(Airflow 3.3+) don't have `.ti`, causing an `AttributeError` and preventing 
callback jobs from launching. Use `workload.display_name` for a 
workload-agnostic identifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to