TL;DR: The new callback supervised process added in PR #62645 [1] needs some - 
but not all - of the same comms channels that tasks use today.  Before I wire 
them up, I'd like to restructure `ToSupervisor` in `comms.py` [2] so we can 
compose the right channel set per process type instead of maintaining parallel 
and redundant flat lists.  I've sketched out a few approaches below, but there 
are likely a hundred ways to slice this so maybe someone will see a cleaner 
design.  I'm all ears.  If nobody objects, I'll proceed with the "security 
level" style grouping (Option 1).

To be clear, this is purely reorganizing the existing type definitions into 
logical building blocks.  There will be no protocol changes, no new message 
types, etc.  Some subset of those building blocks will then be used to assemble 
the default callback access.

---

Background

`ToSupervisor` in `task-sdk/…/comms.py` [2] is currently a single flat union of 
~40 message types; every request a task process might send to its supervisor.  
The new `CallbackSubprocess` for executor callbacks [1] has an empty message 
set, meaning callback code can't do anything that requires the supervisor to 
proxy to the API server.  No `Connection.get()`, no `Variable.get()`, etc.  To 
complicate matters, Callbacks are arbitrary user code.  People will put Slack 
notifications, PagerDuty alerts, custom cleanup logic, and more in them, so we 
don't want to just blindly give them unrestricted API access.  For example, I 
figure they'll need at least Connection and Variable lookups, secret masking, 
logging, and possibly XCom access.  But they clearly should *not* get 
task-lifecycle channels (`SucceedTask`, `DeferTask`, etc.) since a callback 
doesn't share the task lifecycle.

As an example of future work that might happen and why this decision matters 
now, consider a Deadline Alert with a callback configured to handle a stalled 
task.  Something like "if this DAG is still running 30 minutes after it was 
queued, fail it and restart." Today there's no way to do this because the 
callback subprocess has zero comms channels.  But even once we add channels, 
the current task lifecycle messages (`RetryTask`, `SucceedTask`, etc.) won't 
work here because those are self-referential.  The task sends them about 
itself, and the supervisor infers the identity from the process it's managing.  
A deadline callback is a different process from the task it's alerting about.  
It would need some new orchestration-style messages that carry explicit 
identifiers like `FailTaskInstance(ti_id=...)` or 
`ClearTaskInstance(ti_id=...)`.  This is a natural extension of 
`TriggerDagRun`, which already exists and operates on other DAG runs.

This use case is perhaps a stretch but it shows why getting the grouping right 
now matters.

---

Proposed approaches

Like I said, there are any number of ways to carve this up.  I picked a coupel 
that I think are the most obivous, along with my proposed solution.  If you see 
a better option, please speak up.  I don't want to over-engineer this, but I do 
want to get the granularity right since adding more supervised process types is 
likely in the future and changing this once it's out in the wild will be a big 
breaking mess if we try to claw back permissions, or future additions will be 
complicated if we structure these incorrectly.

Option 0: Minimal; just unblock callbacks, punt the rest of this discussion to 
later

If we just want callbacks working, the fastest way to get there would be to 
break out only the immediately shared subset and punt this entire discussion:

```python
BaseRequests = GetConnection | GetVariable | MaskSecret | ResendLoggingFD
TaskRequests = BaseRequests | …everything else…
CallbackRequests = BaseRequests
```

- Pro: Smallest diff, unblocks callback work immediately.
- Con: "Base" is an arbitrary grab bag that will likely drift, it doesn't 
really help future consumers see what they're opting into, nor does it set us 
up for the orchestration use case in the example.


Option 1: Group by access level  ((my recommendation))

```python
ReadRequests = GetConnection | GetVariable | GetXCom | etc.. Everything 
starting with "Get"
WriteRequests = SetXCom | DeleteXCom | PutVariable | DeleteVariable
LoggingRequests = MaskSecret | ResendLoggingFD
TaskLifecycleRequests = TaskState | SucceedTask | DeferTask | RetryTask | 
RescheduleTask | SkipDownstreamTasks
TaskMetadataRequests = SetRenderedFields | SetRenderedMapIndex | 
ValidateInletsAndOutlets
OrchestrationRequests = TriggerDagRun  # plus FailTaskInstance, 
ClearTaskInstance as potential future work, as an example
HITLRequests = CreateHITLDetailPayload | UpdateHITLDetail | 
GetHITLDetailResponse # HITL = Human in the loop, since I had to look up what 
it meant in this context
```

Then we assemble access per process:

```python
# Task gets everything
ToSupervisor = Annotated[
    ReadRequests | WriteRequests | LoggingRequests
    | TaskLifecycleRequests | TaskMetadataRequests
    | OrchestrationRequests | HITLRequests,
    Field(discriminator="type"),
]

# Callback gets read-only + logging, and we can add orchestration later if we 
decide to go that route
CallbackToSupervisor = Annotated[
    ReadRequests | LoggingRequests,
    Field(discriminator="type"),
]
```

- Pro: Most of these are pretty self-documenting and maybe feels the most 
comfortable for folks used to configuring tiered permissions policies.
- Pro: Opens the door to making access level configurable in the future.  Some 
day we might be able to do a team-based config option where Team A's callbacks 
can only read and log, but Team B gets admin rights, etc.
- Con: Some groupings are judgment calls.  Is `TriggerDagRun` a "write" or do 
we put it in "orchestration" all by itself since it is the only current one 
that affects a different entity?  What if I want to allow my users to write 
Variables but not write XComs?  Is DeleteVariable a "write" or should Deletes 
be their own tier entirely? I don't know if there is a solid "right answer" to 
a lot of these so it'll be documentation-dependent.

---

Option 2: Group by resource type

```python
ConnectionRequests = GetConnection
VariableRequests = GetVariable | PutVariable | DeleteVariable
XComRequests = GetXCom | … | SetXCom | DeleteXCom
DagRunRequests = GetDagRun | … | TriggerDagRun
TaskInstanceRequests = GetPreviousTI | …
# …etc
```

- Pro: This feels like a natural "REST resource" grouping as opposed to 
"permissions level" grouping
- Con: Mixes reads and writes within each group, so when a callback needs 
`GetVariable` but not `PutVariable`, you either grant the whole 
`VariableRequests` block or split it further, at which point you're back to 
Option 1.  Also makes the self-vs-other (task-vs-orchestration) distinction 
invisible.

---

Why I prefer Option 1

The security-level grouping gives us a vocabulary: "read," "write," "logging," 
"lifecycle," "orchestration." Most of the composition statements like 
`ReadRequests | LoggingRequests` are immediately understandable without reading 
the docs.  If we ever want to make callback permissions configurable it would 
look something like `[callbacks] allowed_channels = 
read,write,logging,orchestration`.  But maybe the same can be said for Option 2 
which would look something like `[callbacks] allowed_channels = 
connections,variables,logging`).

---

My plan

Unless someone proposes a better structure or has objections, I want to proceed 
with Option 1.  The initial default callback set will be `ReadRequests | 
LoggingRequests` so they will have access to secrets masking, logging, and 
read-only for everything else.  We can expand it as specific use cases (like 
the deadline restarting a stalled task I mentioned above) come up.


[1] https://github.com/apache/airflow/pull/62645
[2] 
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L1043

Reply via email to