Dev-iL commented on PR #61153:
URL: https://github.com/apache/airflow/pull/61153#issuecomment-3932378274

   Comments from Opus:
   
   ---
   
   ## Summary
   
   This PR adds `ExecuteCallback` as a new workload type alongside 
`ExecuteTask` and `RunTrigger`, enabling synchronous callbacks (e.g., 
`SyncCallback` wrapping `SlackWebhookNotifier`) to be dispatched through the 
executor pipeline. It refactors the `workloads.py` single-file module into a 
`workloads/` package, introduces a `BaseWorkload` ORM mixin for unified 
scheduler routing, and updates BaseExecutor, LocalExecutor, CeleryExecutor, the 
scheduler loop, and the deadline model to support callback workloads.
   
   ---
   
   ## Critical Issues
   
   ### 1. Layer Violation: `airflow.models` Importing from `airflow.executors`
   
   The PR introduces imports from `airflow.executors.workloads` into two model 
files:
   
   ```python
   # airflow/models/callback.py
   from airflow.executors.workloads import BaseWorkload
   from airflow.executors.workloads.callback import CallbackFetchMethod
   
   # airflow/models/taskinstance.py
   from airflow.executors.workloads import BaseWorkload
   ```
   
   And adds `BaseWorkload` as a mixin to both `Callback(Base, BaseWorkload)` 
and `TaskInstance(Base, LoggingMixin, BaseWorkload)`. This creates a **circular 
dependency risk** and violates the current layering where models are 
independent of executors.
   
   `CallbackFetchMethod` was originally defined in `airflow/models/callback.py` 
— this PR moves it to `airflow/executors/workloads/callback.py`, then the model 
imports it back. The enum definition should stay in the models layer or move to 
a shared module.
   
   **Recommendation:** Move `BaseWorkload` to `airflow.models.base` or a shared 
module like `airflow._shared.workload_protocol`. Keep `CallbackFetchMethod` in 
`airflow.models.callback` (its original location) or move to `airflow._shared`. 
The executor workloads package should depend on models, not the other way 
around.
   
   ### 2. Arbitrary Code Execution via `import_module` Without Validation
   
   `execute_callback_workload()` in `workloads/callback.py` dynamically imports 
and executes any Python callable based on a `path` string stored in the 
database:
   
   ```python
   module_path, function_name = callback_path.rsplit(".", 1)
   module = import_module(module_path)
   callback_callable = getattr(module, function_name)
   result = callback_callable(**callback_kwargs)
   ```
   
   If an attacker gains write access to the `callback` table (SQL injection, 
compromised admin, etc.), they can execute arbitrary code on executor workers. 
The current `handle_miss` path only stores paths from deserialized SDK callback 
definitions, but there is no validation at the execution boundary.
   
   **Recommendation:** Add allowlist validation or at minimum verify the path 
resolves to a subclass of `BaseNotifier` or a registered callback type before 
executing.
   
   ### 3. CeleryExecutor / BaseExecutor Queue Inconsistency
   
   `BaseExecutor.queue_workload` stores callbacks in `self.queued_callbacks`:
   
   ```python
   self.queued_callbacks[workload.callback.id] = workload
   ```
   
   But `CeleryExecutor.queue_workload` overrides this and stores callbacks in 
`self.queued_tasks`:
   
   ```python
   elif isinstance(workload, workloads.ExecuteCallback):
       self.queued_tasks[workload.callback.key] = workload
   ```
   
   This creates two problems:
   1. **`_get_workloads_to_schedule`** in `BaseExecutor` iterates 
`queued_callbacks` first (for priority), then `queued_tasks`. Since 
CeleryExecutor puts callbacks in `queued_tasks`, callback-over-task priority is 
silently broken for Celery.
   2. **Slot accounting** (`slots_available`, `slots_occupied`) counts 
`len(queued_tasks) + len(queued_callbacks)`. For CeleryExecutor, 
`queued_callbacks` is always empty, so slot math is correct by accident, but 
the semantic inconsistency is a maintenance trap.
   
   **Recommendation:** Either have CeleryExecutor also use `queued_callbacks` 
for callback workloads, or document and enforce that 
`_get_workloads_to_schedule` is not used by CeleryExecutor. A better approach 
would be to standardize on `queued_callbacks` in `BaseExecutor.queue_workload` 
(which CeleryExecutor should call via `super()`) and then override 
`_process_workloads` to handle callbacks from the correct dict.
   
   ---
   
   ## Architectural Concerns
   
   ### 4. Callback Routing Depends on Data Dict, Not ORM Relationships
   
   The scheduler reconstructs `DagRun` for each callback in 
`_enqueue_executor_callbacks` via:
   
   ```python
   if isinstance(callback.data, dict) and "dag_run_id" in callback.data:
       dag_run = session.get(DagRun, dag_run_id)
   elif isinstance(callback.data, dict) and "dag_id" in callback.data:
       dag_run = session.scalars(
           select(DagRun).where(DagRun.dag_id == dag_id)
           .order_by(DagRun.execution_date.desc()).limit(1)
       ).first()
   ```
   
   The fallback path (lookup by `dag_id` with `ORDER BY execution_date DESC 
LIMIT 1`) is dangerous — it could match the **wrong** DagRun if a DAG has 
multiple concurrent runs. Similarly, executor routing reads 
`callback.data.get("executor")` from the JSON blob. These should be first-class 
columns or relationships, not buried in a serialized dict.
   
   **Recommendation:** This is acceptable for an initial implementation but 
should be tracked as a fast-follow. Convert the existing TODO to a GitHub issue 
and specifically flag the fallback DagRun lookup as a known correctness risk.
   
   ### 5. Shared Parallelism Pool for Tasks and Callbacks
   
   Callbacks share the same `parallelism` slots as tasks:
   
   ```python
   num_occupied_slots = sum(executor.slots_occupied for executor in 
self.executors)
   max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
   ```
   
   And in `BaseExecutor`:
   ```python
   def slots_available(self):
       return self.parallelism - len(self.running) - len(self.queued_tasks) - 
len(self.queued_callbacks)
   ```
   
   This means a fully-loaded executor could indefinitely delay deadline alert 
callbacks — exactly the kind of alert that should have guaranteed delivery. 
Conversely, a burst of callbacks could starve task execution.
   
   **Recommendation:** Consider a reserved callback slot count (e.g., 
`max_callback_slots = min(N, parallelism * 0.1)`) or at minimum document this 
limitation. The AIP doesn't specify callback delivery guarantees, but "deadline 
missed but alert never fired" is a poor user experience.
   
   ### 6. `WorkloadKey` Union Type Pollutes Executor Interface
   
   The PR widens `BaseExecutor.running` from `set[TaskInstanceKey]` to 
`set[WorkloadKey]` (i.e., `set[TaskInstanceKey | str]`) and `event_buffer` 
similarly. `CallbackKey` is a bare `str` alias defined inside `TYPE_CHECKING`, 
so at runtime the distinction between a callback key and an arbitrary string is 
invisible. This forces `type: ignore` comments in the hybrid executors and 
weakens type safety.
   
   ```python
   # celery_kubernetes_executor.py
   return self.celery_executor.running.union(self.kubernetes_executor.running)  
# type: ignore[return-value, arg-type]
   ```
   
   **Recommendation:** Consider a proper `CallbackKey` wrapper class (not just 
`str` alias) that shares a protocol with `TaskInstanceKey`, or keep callbacks 
in separate data structures in the executor layer rather than mixing them into 
`running`/`event_buffer`.
   
   ### 7. Dead `DeadlineCallbackState` Enum
   
   The PR adds a `DeadlineCallbackState` enum to `airflow/models/deadline.py`:
   
   ```python
   class DeadlineCallbackState(str, Enum):
       """All possible states of deadline callbacks once the deadline is 
missed."""
       ...
   ```
   
   This enum is **never referenced** anywhere in the codebase — not in queries, 
not in state transitions, not in tests. It appears to be leftover from an 
earlier design iteration.
   
   **Recommendation:** Remove `DeadlineCallbackState` to avoid dead code.
   
   ---
   
   ## Code Quality Issues
   
   ### 8. Typo in Docstring
   
   `workloads/callback.py`:
   ```python
   # If the callback is a callabale, call it.  If it is a class, instantiate it.
   ```
   Should be "callable".
   
   ### 9. Fragile Callable-vs-Class Heuristic
   
   ```python
   result = callback_callable(**callback_kwargs)
   if callable(result):
       context = callback_kwargs.get("context", {})
       result = result(context)
   ```
   
   This assumes: if calling the path returns a callable, it must be a class 
instance (like `BaseNotifier`) that needs to be called again with context. But 
any function that returns a callable (e.g., a factory function) would be 
double-invoked. The `SyncCallback` / `AsyncCallback` distinction should drive 
explicit dispatch, not runtime inspection of return values.
   
   **Recommendation:** Use the `fetch_method` field or add a `callback_type` 
field to dispatch explicitly between "call this function" and "instantiate this 
class, then call the instance."
   
   ### 10. `BaseWorkload` Mixin Uses `NotImplementedError` Instead of `abc.ABC`
   
   ```python
   class BaseWorkload:
       def get_dag_id(self) -> str | None:
           raise NotImplementedError(f"{self.__class__.__name__} must implement 
get_dag_id()")
       def get_executor_name(self) -> str | None:
           raise NotImplementedError(f"{self.__class__.__name__} must implement 
get_executor_name()")
   ```
   
   This is a mixin added to `TaskInstance` and `Callback`, both of which are 
ORM models with hundreds of existing tests. If any code path accidentally calls 
these on an unpatched subclass, it will raise at runtime rather than being 
caught by type checking. Consider using `abc.ABC` or a `Protocol` instead.
   
   ### 11. `_enqueue_executor_callbacks` Queries for QUEUED but `handle_miss` 
Sets State to QUEUED via `queue()`
   
   The state lifecycle for executor callbacks is:
   ```
   PENDING (Callback.__init__) → QUEUED (handle_miss → callback.queue()) → 
RUNNING (_enqueue_executor_callbacks) → SUCCESS/FAILED (process_executor_events)
   ```
   
   However, `_enqueue_executor_callbacks` queries for `CallbackState.QUEUED` 
and immediately sets `CallbackState.RUNNING` before the executor has actually 
started processing. If the executor crashes between queue and execution, the 
callback is stuck in `RUNNING` with no recovery path. There is no heartbeat or 
timeout mechanism for callback execution.
   
   **Recommendation:** Consider keeping the state as `QUEUED` until the 
executor confirms it has started (via event buffer), similar to how task 
instances transition through `QUEUED → RUNNING` based on executor events. 
Alternatively, add a callback execution timeout.
   
   ### 12. `process_executor_events` Maps Between State Enum Types
   
   The scheduler maps executor-provided `TaskInstanceState` to `CallbackState` 
for callback events:
   
   ```python
   # Callback event (key is string UUID)
   if state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS):
       callback_keys_with_events.append(key)
   ...
   if state == TaskInstanceState.SUCCESS:
       callback.state = CallbackState.SUCCESS
   elif state == TaskInstanceState.FAILED:
       callback.state = CallbackState.FAILED
   ```
   
   The code includes a comment noting this is intentional ("executor layer uses 
generic completion states, scheduler converts to proper types"). While 
functionally correct, this coupling means that if either enum changes 
independently, the mapping silently breaks. Consider defining the mapping in a 
single place (e.g., a dict) rather than scattered `if/elif` chains.
   
   ---
   
   ## AIP Alignment
   
   ### What the AIP specifies:
   - DAG-level deadlines with callbacks that work with the existing Notifier 
system
   - Callbacks must not cause extra DAG file parsing
   - OTel metrics from day one
   - Future: dedicated sidecar process for callbacks
   
   ### PR alignment:
   - **Positive:** Callbacks flow through the executor pipeline without parsing 
DAG files. The existing `deadline_alerts.deadline_missed` metric is preserved.
   - **Positive:** Extensible — `supports_callbacks` flag lets executors opt in 
incrementally.
   - **Positive:** Both `AsyncCallback` (via triggerer) and `SyncCallback` (via 
executor) are now supported.
   - **Gap:** No new OTel metric for callback execution outcome 
(`deadline_alerts_triggered` counter from AIP is not added here; only the 
existing `deadline_missed` stat is emitted).
   - **Gap:** No callback execution duration metric.
   - **Deviation:** The AIP mentions a "dedicated sidecar process for 
callbacks" as future work. This PR routes callbacks through the main executor 
pool instead, which is a reasonable interim approach but should be documented 
as a known limitation.
   
   ---
   
   ## Test Coverage Assessment
   
   ### Covered:
   - `supports_callbacks` flag on BaseExecutor and LocalExecutor
   - `queue_workload` with ExecuteCallback (with and without support)
   - Callback prioritization over tasks in `_get_workloads_to_schedule`
   - `execute_callback_workload` success, missing path, import error, execution 
error
   - LocalExecutor callback processing (including `queued_callbacks` cleanup)
   - DeadlineAlert accepts both AsyncCallback and SyncCallback
   
   ### Missing:
   - CeleryExecutor end-to-end callback processing (only task-type workloads 
are tested)
   - Scheduler `process_executor_events` handling of callback state transitions 
(`RUNNING` → `SUCCESS`/`FAILED`)
   - Multi-executor callback routing (callback specifies a particular executor)
   - Fallback DagRun lookup by `dag_id` when `dag_run_id` is missing 
(correctness risk)
   - Callback cleanup when DagRun is deleted mid-execution
   - `_executor_to_workloads` with mixed TaskInstance and ExecutorCallback 
inputs
   - Parallelism exhaustion scenario (all slots occupied, callbacks waiting)
   - Callback stuck in RUNNING after executor crash (no recovery test)
   
   ---
   
   ## Minor Nits
   
   1. The `workloads/callback.py` `CallbackFetchMethod` enum has a 
`DAG_ATTRIBUTE` member marked "for future use" — consider removing it until 
it's needed to avoid dead code.
   2. The edge worker API schema (`v2-edge-generated.yaml`) renames 
`TaskInstance` → `TaskInstanceDTO`. Ensure the provider version constraint 
enforces this for edge workers on older versions.
   3. `docs/spelling_wordlist.txt` adds `WorkloadKey` — also add 
`WorkloadState`, `CallbackDTO`, `SchedulerWorkload` if they appear in docs.
   4. `send_task_to_executor` is renamed to `send_workload_to_executor` in 
`celery_executor_utils.py` with a backward-compat alias. The alias is good, but 
the import in `celery_executor.py` still references the old name — verify this 
is intentional and tested.
   
   ---
   
   ## Verdict
   
   The core design — treating callbacks as first-class executor workloads — is 
sound and aligns with the AIP's goal of reliable deadline alert delivery. 
However, the PR needs attention on:
   
   1. **Must fix before merge:** Fix the layer violation (`models` → 
`executors` import direction); resolve the CeleryExecutor/BaseExecutor queue 
inconsistency (`queued_callbacks` vs `queued_tasks`); remove dead 
`DeadlineCallbackState` enum.
   2. **Should fix:** Add callback execution validation (allowlist or type 
check); fix the callable-vs-class heuristic; guard the fallback DagRun lookup; 
fix the typo.
   3. **Track as follow-up:** ORM foreign key for `dag_run_id`; dedicated 
callback parallelism pool; OTel callback execution metrics; `CallbackKey` 
wrapper type; callback execution timeout/recovery.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to