Dev-iL commented on PR #61153:
URL: https://github.com/apache/airflow/pull/61153#issuecomment-3932378274
Comments from Opus:
---
## Summary
This PR adds `ExecuteCallback` as a new workload type alongside
`ExecuteTask` and `RunTrigger`, enabling synchronous callbacks (e.g.,
`SyncCallback` wrapping `SlackWebhookNotifier`) to be dispatched through the
executor pipeline. It refactors the `workloads.py` single-file module into a
`workloads/` package, introduces a `BaseWorkload` ORM mixin for unified
scheduler routing, and updates BaseExecutor, LocalExecutor, CeleryExecutor, the
scheduler loop, and the deadline model to support callback workloads.
---
## Critical Issues
### 1. Layer Violation: `airflow.models` Importing from `airflow.executors`
The PR introduces imports from `airflow.executors.workloads` into two model
files:
```python
# airflow/models/callback.py
from airflow.executors.workloads import BaseWorkload
from airflow.executors.workloads.callback import CallbackFetchMethod
# airflow/models/taskinstance.py
from airflow.executors.workloads import BaseWorkload
```
And adds `BaseWorkload` as a mixin to both `Callback(Base, BaseWorkload)`
and `TaskInstance(Base, LoggingMixin, BaseWorkload)`. This creates a **circular
dependency risk** and violates the current layering where models are
independent of executors.
`CallbackFetchMethod` was originally defined in `airflow/models/callback.py`
— this PR moves it to `airflow/executors/workloads/callback.py`, then the model
imports it back. The enum definition should stay in the models layer or move to
a shared module.
**Recommendation:** Move `BaseWorkload` to `airflow.models.base` or a shared
module like `airflow._shared.workload_protocol`. Keep `CallbackFetchMethod` in
`airflow.models.callback` (its original location) or move to `airflow._shared`.
The executor workloads package should depend on models, not the other way
around.
### 2. Arbitrary Code Execution via `import_module` Without Validation
`execute_callback_workload()` in `workloads/callback.py` dynamically imports
and executes any Python callable based on a `path` string stored in the
database:
```python
module_path, function_name = callback_path.rsplit(".", 1)
module = import_module(module_path)
callback_callable = getattr(module, function_name)
result = callback_callable(**callback_kwargs)
```
If an attacker gains write access to the `callback` table (SQL injection,
compromised admin, etc.), they can execute arbitrary code on executor workers.
The current `handle_miss` path only stores paths from deserialized SDK callback
definitions, but there is no validation at the execution boundary.
**Recommendation:** Add allowlist validation or at minimum verify the path
resolves to a subclass of `BaseNotifier` or a registered callback type before
executing.
### 3. CeleryExecutor / BaseExecutor Queue Inconsistency
`BaseExecutor.queue_workload` stores callbacks in `self.queued_callbacks`:
```python
self.queued_callbacks[workload.callback.id] = workload
```
But `CeleryExecutor.queue_workload` overrides this and stores callbacks in
`self.queued_tasks`:
```python
elif isinstance(workload, workloads.ExecuteCallback):
self.queued_tasks[workload.callback.key] = workload
```
This creates two problems:
1. **`_get_workloads_to_schedule`** in `BaseExecutor` iterates
`queued_callbacks` first (for priority), then `queued_tasks`. Since
CeleryExecutor puts callbacks in `queued_tasks`, callback-over-task priority is
silently broken for Celery.
2. **Slot accounting** (`slots_available`, `slots_occupied`) counts
`len(queued_tasks) + len(queued_callbacks)`. For CeleryExecutor,
`queued_callbacks` is always empty, so slot math is correct by accident, but
the semantic inconsistency is a maintenance trap.
**Recommendation:** Either have CeleryExecutor also use `queued_callbacks`
for callback workloads, or document and enforce that
`_get_workloads_to_schedule` is not used by CeleryExecutor. A better approach
would be to standardize on `queued_callbacks` in `BaseExecutor.queue_workload`
(which CeleryExecutor should call via `super()`) and then override
`_process_workloads` to handle callbacks from the correct dict.
---
## Architectural Concerns
### 4. Callback Routing Depends on Data Dict, Not ORM Relationships
The scheduler reconstructs `DagRun` for each callback in
`_enqueue_executor_callbacks` via:
```python
if isinstance(callback.data, dict) and "dag_run_id" in callback.data:
dag_run = session.get(DagRun, dag_run_id)
elif isinstance(callback.data, dict) and "dag_id" in callback.data:
dag_run = session.scalars(
select(DagRun).where(DagRun.dag_id == dag_id)
.order_by(DagRun.execution_date.desc()).limit(1)
).first()
```
The fallback path (lookup by `dag_id` with `ORDER BY execution_date DESC
LIMIT 1`) is dangerous — it could match the **wrong** DagRun if a DAG has
multiple concurrent runs. Similarly, executor routing reads
`callback.data.get("executor")` from the JSON blob. These should be first-class
columns or relationships, not buried in a serialized dict.
**Recommendation:** This is acceptable for an initial implementation but
should be tracked as a fast-follow. Convert the existing TODO to a GitHub issue
and specifically flag the fallback DagRun lookup as a known correctness risk.
### 5. Shared Parallelism Pool for Tasks and Callbacks
Callbacks share the same `parallelism` slots as tasks:
```python
num_occupied_slots = sum(executor.slots_occupied for executor in
self.executors)
max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
```
And in `BaseExecutor`:
```python
def slots_available(self):
return self.parallelism - len(self.running) - len(self.queued_tasks) -
len(self.queued_callbacks)
```
This means a fully-loaded executor could indefinitely delay deadline alert
callbacks — exactly the kind of alert that should have guaranteed delivery.
Conversely, a burst of callbacks could starve task execution.
**Recommendation:** Consider a reserved callback slot count (e.g.,
`max_callback_slots = min(N, parallelism * 0.1)`) or at minimum document this
limitation. The AIP doesn't specify callback delivery guarantees, but "deadline
missed but alert never fired" is a poor user experience.
### 6. `WorkloadKey` Union Type Pollutes Executor Interface
The PR widens `BaseExecutor.running` from `set[TaskInstanceKey]` to
`set[WorkloadKey]` (i.e., `set[TaskInstanceKey | str]`) and `event_buffer`
similarly. `CallbackKey` is a bare `str` alias defined inside `TYPE_CHECKING`,
so at runtime the distinction between a callback key and an arbitrary string is
invisible. This forces `type: ignore` comments in the hybrid executors and
weakens type safety.
```python
# celery_kubernetes_executor.py
return self.celery_executor.running.union(self.kubernetes_executor.running)
# type: ignore[return-value, arg-type]
```
**Recommendation:** Consider a proper `CallbackKey` wrapper class (not just
`str` alias) that shares a protocol with `TaskInstanceKey`, or keep callbacks
in separate data structures in the executor layer rather than mixing them into
`running`/`event_buffer`.
### 7. Dead `DeadlineCallbackState` Enum
The PR adds a `DeadlineCallbackState` enum to `airflow/models/deadline.py`:
```python
class DeadlineCallbackState(str, Enum):
"""All possible states of deadline callbacks once the deadline is
missed."""
...
```
This enum is **never referenced** anywhere in the codebase — not in queries,
not in state transitions, not in tests. It appears to be leftover from an
earlier design iteration.
**Recommendation:** Remove `DeadlineCallbackState` to avoid dead code.
---
## Code Quality Issues
### 8. Typo in Docstring
`workloads/callback.py`:
```python
# If the callback is a callabale, call it. If it is a class, instantiate it.
```
Should be "callable".
### 9. Fragile Callable-vs-Class Heuristic
```python
result = callback_callable(**callback_kwargs)
if callable(result):
context = callback_kwargs.get("context", {})
result = result(context)
```
This assumes: if calling the path returns a callable, it must be a class
instance (like `BaseNotifier`) that needs to be called again with context. But
any function that returns a callable (e.g., a factory function) would be
double-invoked. The `SyncCallback` / `AsyncCallback` distinction should drive
explicit dispatch, not runtime inspection of return values.
**Recommendation:** Use the `fetch_method` field or add a `callback_type`
field to dispatch explicitly between "call this function" and "instantiate this
class, then call the instance."
### 10. `BaseWorkload` Mixin Uses `NotImplementedError` Instead of `abc.ABC`
```python
class BaseWorkload:
def get_dag_id(self) -> str | None:
raise NotImplementedError(f"{self.__class__.__name__} must implement
get_dag_id()")
def get_executor_name(self) -> str | None:
raise NotImplementedError(f"{self.__class__.__name__} must implement
get_executor_name()")
```
This is a mixin added to `TaskInstance` and `Callback`, both of which are
ORM models with hundreds of existing tests. If any code path accidentally calls
these on an unpatched subclass, it will raise at runtime rather than being
caught by type checking. Consider using `abc.ABC` or a `Protocol` instead.
### 11. `_enqueue_executor_callbacks` Queries for QUEUED but `handle_miss`
Sets State to QUEUED via `queue()`
The state lifecycle for executor callbacks is:
```
PENDING (Callback.__init__) → QUEUED (handle_miss → callback.queue()) →
RUNNING (_enqueue_executor_callbacks) → SUCCESS/FAILED (process_executor_events)
```
However, `_enqueue_executor_callbacks` queries for `CallbackState.QUEUED`
and immediately sets `CallbackState.RUNNING` before the executor has actually
started processing. If the executor crashes between queue and execution, the
callback is stuck in `RUNNING` with no recovery path. There is no heartbeat or
timeout mechanism for callback execution.
**Recommendation:** Consider keeping the state as `QUEUED` until the
executor confirms it has started (via event buffer), similar to how task
instances transition through `QUEUED → RUNNING` based on executor events.
Alternatively, add a callback execution timeout.
### 12. `process_executor_events` Maps Between State Enum Types
The scheduler maps executor-provided `TaskInstanceState` to `CallbackState`
for callback events:
```python
# Callback event (key is string UUID)
if state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS):
callback_keys_with_events.append(key)
...
if state == TaskInstanceState.SUCCESS:
callback.state = CallbackState.SUCCESS
elif state == TaskInstanceState.FAILED:
callback.state = CallbackState.FAILED
```
The code includes a comment noting this is intentional ("executor layer uses
generic completion states, scheduler converts to proper types"). While
functionally correct, this coupling means that if either enum changes
independently, the mapping silently breaks. Consider defining the mapping in a
single place (e.g., a dict) rather than scattered `if/elif` chains.
---
## AIP Alignment
### What the AIP specifies:
- DAG-level deadlines with callbacks that work with the existing Notifier
system
- Callbacks must not cause extra DAG file parsing
- OTel metrics from day one
- Future: dedicated sidecar process for callbacks
### PR alignment:
- **Positive:** Callbacks flow through the executor pipeline without parsing
DAG files. The existing `deadline_alerts.deadline_missed` metric is preserved.
- **Positive:** Extensible — `supports_callbacks` flag lets executors opt in
incrementally.
- **Positive:** Both `AsyncCallback` (via triggerer) and `SyncCallback` (via
executor) are now supported.
- **Gap:** No new OTel metric for callback execution outcome
(`deadline_alerts_triggered` counter from AIP is not added here; only the
existing `deadline_missed` stat is emitted).
- **Gap:** No callback execution duration metric.
- **Deviation:** The AIP mentions a "dedicated sidecar process for
callbacks" as future work. This PR routes callbacks through the main executor
pool instead, which is a reasonable interim approach but should be documented
as a known limitation.
---
## Test Coverage Assessment
### Covered:
- `supports_callbacks` flag on BaseExecutor and LocalExecutor
- `queue_workload` with ExecuteCallback (with and without support)
- Callback prioritization over tasks in `_get_workloads_to_schedule`
- `execute_callback_workload` success, missing path, import error, execution
error
- LocalExecutor callback processing (including `queued_callbacks` cleanup)
- DeadlineAlert accepts both AsyncCallback and SyncCallback
### Missing:
- CeleryExecutor end-to-end callback processing (only task-type workloads
are tested)
- Scheduler `process_executor_events` handling of callback state transitions
(`RUNNING` → `SUCCESS`/`FAILED`)
- Multi-executor callback routing (callback specifies a particular executor)
- Fallback DagRun lookup by `dag_id` when `dag_run_id` is missing
(correctness risk)
- Callback cleanup when DagRun is deleted mid-execution
- `_executor_to_workloads` with mixed TaskInstance and ExecutorCallback
inputs
- Parallelism exhaustion scenario (all slots occupied, callbacks waiting)
- Callback stuck in RUNNING after executor crash (no recovery test)
---
## Minor Nits
1. The `workloads/callback.py` `CallbackFetchMethod` enum has a
`DAG_ATTRIBUTE` member marked "for future use" — consider removing it until
it's needed to avoid dead code.
2. The edge worker API schema (`v2-edge-generated.yaml`) renames
`TaskInstance` → `TaskInstanceDTO`. Ensure the provider version constraint
enforces this for edge workers on older versions.
3. `docs/spelling_wordlist.txt` adds `WorkloadKey` — also add
`WorkloadState`, `CallbackDTO`, `SchedulerWorkload` if they appear in docs.
4. `send_task_to_executor` is renamed to `send_workload_to_executor` in
`celery_executor_utils.py` with a backward-compat alias. The alias is good, but
the import in `celery_executor.py` still references the old name — verify this
is intentional and tested.
---
## Verdict
The core design — treating callbacks as first-class executor workloads — is
sound and aligns with the AIP's goal of reliable deadline alert delivery.
However, the PR needs attention on:
1. **Must fix before merge:** Fix the layer violation (`models` →
`executors` import direction); resolve the CeleryExecutor/BaseExecutor queue
inconsistency (`queued_callbacks` vs `queued_tasks`); remove dead
`DeadlineCallbackState` enum.
2. **Should fix:** Add callback execution validation (allowlist or type
check); fix the callable-vs-class heuristic; guard the fallback DagRun lookup;
fix the typo.
3. **Track as follow-up:** ORM foreign key for `dag_run_id`; dedicated
callback parallelism pool; OTel callback execution metrics; `CallbackKey`
wrapper type; callback execution timeout/recovery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]