TL;DR: The new callback supervised process added in PR #62645 [1] needs some -
but not all - of the same comms channels that tasks use today. Before I wire
them up, I'd like to restructure `ToSupervisor` in `comms.py` [2] so we can
compose the right channel set per process type instead of maintaining parallel
and redundant flat lists. I've sketched out a few approaches below, but there
are likely a hundred ways to slice this so maybe someone will see a cleaner
design. I'm all ears. If nobody objects, I'll proceed with the "security
level" style grouping (Option 1).
To be clear, this is purely reorganizing the existing type definitions into
logical building blocks. There will be no protocol changes, no new message
types, etc. Some subset of those building blocks will then be used to assemble
the default callback access.
---
Background
`ToSupervisor` in `task-sdk/…/comms.py` [2] is currently a single flat union of
~40 message types; every request a task process might send to its supervisor.
The new `CallbackSubprocess` for executor callbacks [1] has an empty message
set, meaning callback code can't do anything that requires the supervisor to
proxy to the API server. No `Connection.get()`, no `Variable.get()`, etc. To
complicate matters, Callbacks are arbitrary user code. People will put Slack
notifications, PagerDuty alerts, custom cleanup logic, and more in them, so we
don't want to just blindly give them unrestricted API access. For example, I
figure they'll need at least Connection and Variable lookups, secret masking,
logging, and possibly XCom access. But they clearly should *not* get
task-lifecycle channels (`SucceedTask`, `DeferTask`, etc.) since a callback
doesn't share the task lifecycle.
As an example of future work that might happen and why this decision matters
now, consider a Deadline Alert with a callback configured to handle a stalled
task. Something like "if this DAG is still running 30 minutes after it was
queued, fail it and restart." Today there's no way to do this because the
callback subprocess has zero comms channels. But even once we add channels,
the current task lifecycle messages (`RetryTask`, `SucceedTask`, etc.) won't
work here because those are self-referential. The task sends them about
itself, and the supervisor infers the identity from the process it's managing.
A deadline callback is a different process from the task it's alerting about.
It would need some new orchestration-style messages that carry explicit
identifiers like `FailTaskInstance(ti_id=...)` or
`ClearTaskInstance(ti_id=...)`. This is a natural extension of
`TriggerDagRun`, which already exists and operates on other DAG runs.
This use case is perhaps a stretch but it shows why getting the grouping right
now matters.
---
Proposed approaches
Like I said, there are any number of ways to carve this up. I picked a coupel
that I think are the most obivous, along with my proposed solution. If you see
a better option, please speak up. I don't want to over-engineer this, but I do
want to get the granularity right since adding more supervised process types is
likely in the future and changing this once it's out in the wild will be a big
breaking mess if we try to claw back permissions, or future additions will be
complicated if we structure these incorrectly.
Option 0: Minimal; just unblock callbacks, punt the rest of this discussion to
later
If we just want callbacks working, the fastest way to get there would be to
break out only the immediately shared subset and punt this entire discussion:
```python
BaseRequests = GetConnection | GetVariable | MaskSecret | ResendLoggingFD
TaskRequests = BaseRequests | …everything else…
CallbackRequests = BaseRequests
```
- Pro: Smallest diff, unblocks callback work immediately.
- Con: "Base" is an arbitrary grab bag that will likely drift, it doesn't
really help future consumers see what they're opting into, nor does it set us
up for the orchestration use case in the example.
Option 1: Group by access level ((my recommendation))
```python
ReadRequests = GetConnection | GetVariable | GetXCom | etc.. Everything
starting with "Get"
WriteRequests = SetXCom | DeleteXCom | PutVariable | DeleteVariable
LoggingRequests = MaskSecret | ResendLoggingFD
TaskLifecycleRequests = TaskState | SucceedTask | DeferTask | RetryTask |
RescheduleTask | SkipDownstreamTasks
TaskMetadataRequests = SetRenderedFields | SetRenderedMapIndex |
ValidateInletsAndOutlets
OrchestrationRequests = TriggerDagRun # plus FailTaskInstance,
ClearTaskInstance as potential future work, as an example
HITLRequests = CreateHITLDetailPayload | UpdateHITLDetail |
GetHITLDetailResponse # HITL = Human in the loop, since I had to look up what
it meant in this context
```
Then we assemble access per process:
```python
# Task gets everything
ToSupervisor = Annotated[
ReadRequests | WriteRequests | LoggingRequests
| TaskLifecycleRequests | TaskMetadataRequests
| OrchestrationRequests | HITLRequests,
Field(discriminator="type"),
]
# Callback gets read-only + logging, and we can add orchestration later if we
decide to go that route
CallbackToSupervisor = Annotated[
ReadRequests | LoggingRequests,
Field(discriminator="type"),
]
```
- Pro: Most of these are pretty self-documenting and maybe feels the most
comfortable for folks used to configuring tiered permissions policies.
- Pro: Opens the door to making access level configurable in the future. Some
day we might be able to do a team-based config option where Team A's callbacks
can only read and log, but Team B gets admin rights, etc.
- Con: Some groupings are judgment calls. Is `TriggerDagRun` a "write" or do
we put it in "orchestration" all by itself since it is the only current one
that affects a different entity? What if I want to allow my users to write
Variables but not write XComs? Is DeleteVariable a "write" or should Deletes
be their own tier entirely? I don't know if there is a solid "right answer" to
a lot of these so it'll be documentation-dependent.
---
Option 2: Group by resource type
```python
ConnectionRequests = GetConnection
VariableRequests = GetVariable | PutVariable | DeleteVariable
XComRequests = GetXCom | … | SetXCom | DeleteXCom
DagRunRequests = GetDagRun | … | TriggerDagRun
TaskInstanceRequests = GetPreviousTI | …
# …etc
```
- Pro: This feels like a natural "REST resource" grouping as opposed to
"permissions level" grouping
- Con: Mixes reads and writes within each group, so when a callback needs
`GetVariable` but not `PutVariable`, you either grant the whole
`VariableRequests` block or split it further, at which point you're back to
Option 1. Also makes the self-vs-other (task-vs-orchestration) distinction
invisible.
---
Why I prefer Option 1
The security-level grouping gives us a vocabulary: "read," "write," "logging,"
"lifecycle," "orchestration." Most of the composition statements like
`ReadRequests | LoggingRequests` are immediately understandable without reading
the docs. If we ever want to make callback permissions configurable it would
look something like `[callbacks] allowed_channels =
read,write,logging,orchestration`. But maybe the same can be said for Option 2
which would look something like `[callbacks] allowed_channels =
connections,variables,logging`).
---
My plan
Unless someone proposes a better structure or has objections, I want to proceed
with Option 1. The initial default callback set will be `ReadRequests |
LoggingRequests` so they will have access to secrets masking, logging, and
read-only for everything else. We can expand it as specific use cases (like
the deadline restarting a stalled task I mentioned above) come up.
[1] https://github.com/apache/airflow/pull/62645
[2]
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L1043