One thing worth noting though (since it isn't already mentioned), is that
the
`ToTriggerSupervisor` in `triggerer_job_runner.py` already kind of does
this,
it is a scoped down channel (15 types) compared to 40 in `comms.py` for the
trigger runner subprocess.  So, the pattern isn't really new, it's just
been done
ad-hoc before.

That said, consider the `ToTriggerSupervisor` while refactoring to use
the composable groups.

One thing I also noticed regarding the scope of `ReadRequests` scope -- it
might
be broader than it sounds. With your scope, it would include the
`GetDagRun`,
`GetTICount` for instance, which are meaningful for tasks, but probably not
for
callbacks. You may want ReadRequests to be the full set for tasks, but
define
`CallbackToSupervisor` a bit more tightly by picking only the *gets *it
needs.

Option 1 seems most reasonable to me.

Thanks & Regards,
Amogh Desai


On Wed, Mar 25, 2026 at 5:49 AM Ferruzzi, Dennis <[email protected]>
wrote:

> TL;DR: The new callback supervised process added in PR #62645 [1] needs
> some - but not all - of the same comms channels that tasks use today.
> Before I wire them up, I'd like to restructure `ToSupervisor` in `comms.py`
> [2] so we can compose the right channel set per process type instead of
> maintaining parallel and redundant flat lists.  I've sketched out a few
> approaches below, but there are likely a hundred ways to slice this so
> maybe someone will see a cleaner design.  I'm all ears.  If nobody objects,
> I'll proceed with the "security level" style grouping (Option 1).
>
> To be clear, this is purely reorganizing the existing type definitions
> into logical building blocks.  There will be no protocol changes, no new
> message types, etc.  Some subset of those building blocks will then be used
> to assemble the default callback access.
>
> ---
>
> Background
>
> `ToSupervisor` in `task-sdk/…/comms.py` [2] is currently a single flat
> union of ~40 message types; every request a task process might send to its
> supervisor.  The new `CallbackSubprocess` for executor callbacks [1] has an
> empty message set, meaning callback code can't do anything that requires
> the supervisor to proxy to the API server.  No `Connection.get()`, no
> `Variable.get()`, etc.  To complicate matters, Callbacks are arbitrary user
> code.  People will put Slack notifications, PagerDuty alerts, custom
> cleanup logic, and more in them, so we don't want to just blindly give them
> unrestricted API access.  For example, I figure they'll need at least
> Connection and Variable lookups, secret masking, logging, and possibly XCom
> access.  But they clearly should *not* get task-lifecycle channels
> (`SucceedTask`, `DeferTask`, etc.) since a callback doesn't share the task
> lifecycle.
>
> As an example of future work that might happen and why this decision
> matters now, consider a Deadline Alert with a callback configured to handle
> a stalled task.  Something like "if this DAG is still running 30 minutes
> after it was queued, fail it and restart." Today there's no way to do this
> because the callback subprocess has zero comms channels.  But even once we
> add channels, the current task lifecycle messages (`RetryTask`,
> `SucceedTask`, etc.) won't work here because those are self-referential.
> The task sends them about itself, and the supervisor infers the identity
> from the process it's managing.  A deadline callback is a different process
> from the task it's alerting about.  It would need some new
> orchestration-style messages that carry explicit identifiers like
> `FailTaskInstance(ti_id=...)` or `ClearTaskInstance(ti_id=...)`.  This is a
> natural extension of `TriggerDagRun`, which already exists and operates on
> other DAG runs.
>
> This use case is perhaps a stretch but it shows why getting the grouping
> right now matters.
>
> ---
>
> Proposed approaches
>
> Like I said, there are any number of ways to carve this up.  I picked a
> coupel that I think are the most obivous, along with my proposed solution.
> If you see a better option, please speak up.  I don't want to over-engineer
> this, but I do want to get the granularity right since adding more
> supervised process types is likely in the future and changing this once
> it's out in the wild will be a big breaking mess if we try to claw back
> permissions, or future additions will be complicated if we structure these
> incorrectly.
>
> Option 0: Minimal; just unblock callbacks, punt the rest of this
> discussion to later
>
> If we just want callbacks working, the fastest way to get there would be
> to break out only the immediately shared subset and punt this entire
> discussion:
>
> ```python
> BaseRequests = GetConnection | GetVariable | MaskSecret | ResendLoggingFD
> TaskRequests = BaseRequests | …everything else…
> CallbackRequests = BaseRequests
> ```
>
> - Pro: Smallest diff, unblocks callback work immediately.
> - Con: "Base" is an arbitrary grab bag that will likely drift, it doesn't
> really help future consumers see what they're opting into, nor does it set
> us up for the orchestration use case in the example.
>
>
> Option 1: Group by access level  ((my recommendation))
>
> ```python
> ReadRequests = GetConnection | GetVariable | GetXCom | etc.. Everything
> starting with "Get"
> WriteRequests = SetXCom | DeleteXCom | PutVariable | DeleteVariable
> LoggingRequests = MaskSecret | ResendLoggingFD
> TaskLifecycleRequests = TaskState | SucceedTask | DeferTask | RetryTask |
> RescheduleTask | SkipDownstreamTasks
> TaskMetadataRequests = SetRenderedFields | SetRenderedMapIndex |
> ValidateInletsAndOutlets
> OrchestrationRequests = TriggerDagRun  # plus FailTaskInstance,
> ClearTaskInstance as potential future work, as an example
> HITLRequests = CreateHITLDetailPayload | UpdateHITLDetail |
> GetHITLDetailResponse # HITL = Human in the loop, since I had to look up
> what it meant in this context
> ```
>
> Then we assemble access per process:
>
> ```python
> # Task gets everything
> ToSupervisor = Annotated[
>     ReadRequests | WriteRequests | LoggingRequests
>     | TaskLifecycleRequests | TaskMetadataRequests
>     | OrchestrationRequests | HITLRequests,
>     Field(discriminator="type"),
> ]
>
> # Callback gets read-only + logging, and we can add orchestration later if
> we decide to go that route
> CallbackToSupervisor = Annotated[
>     ReadRequests | LoggingRequests,
>     Field(discriminator="type"),
> ]
> ```
>
> - Pro: Most of these are pretty self-documenting and maybe feels the most
> comfortable for folks used to configuring tiered permissions policies.
> - Pro: Opens the door to making access level configurable in the future.
> Some day we might be able to do a team-based config option where Team A's
> callbacks can only read and log, but Team B gets admin rights, etc.
> - Con: Some groupings are judgment calls.  Is `TriggerDagRun` a "write" or
> do we put it in "orchestration" all by itself since it is the only current
> one that affects a different entity?  What if I want to allow my users to
> write Variables but not write XComs?  Is DeleteVariable a "write" or should
> Deletes be their own tier entirely? I don't know if there is a solid "right
> answer" to a lot of these so it'll be documentation-dependent.
>
> ---
>
> Option 2: Group by resource type
>
> ```python
> ConnectionRequests = GetConnection
> VariableRequests = GetVariable | PutVariable | DeleteVariable
> XComRequests = GetXCom | … | SetXCom | DeleteXCom
> DagRunRequests = GetDagRun | … | TriggerDagRun
> TaskInstanceRequests = GetPreviousTI | …
> # …etc
> ```
>
> - Pro: This feels like a natural "REST resource" grouping as opposed to
> "permissions level" grouping
> - Con: Mixes reads and writes within each group, so when a callback needs
> `GetVariable` but not `PutVariable`, you either grant the whole
> `VariableRequests` block or split it further, at which point you're back to
> Option 1.  Also makes the self-vs-other (task-vs-orchestration) distinction
> invisible.
>
> ---
>
> Why I prefer Option 1
>
> The security-level grouping gives us a vocabulary: "read," "write,"
> "logging," "lifecycle," "orchestration." Most of the composition statements
> like `ReadRequests | LoggingRequests` are immediately understandable
> without reading the docs.  If we ever want to make callback permissions
> configurable it would look something like `[callbacks] allowed_channels =
> read,write,logging,orchestration`.  But maybe the same can be said for
> Option 2 which would look something like `[callbacks] allowed_channels =
> connections,variables,logging`).
>
> ---
>
> My plan
>
> Unless someone proposes a better structure or has objections, I want to
> proceed with Option 1.  The initial default callback set will be
> `ReadRequests | LoggingRequests` so they will have access to secrets
> masking, logging, and read-only for everything else.  We can expand it as
> specific use cases (like the deadline restarting a stalled task I mentioned
> above) come up.
>
>
> [1] https://github.com/apache/airflow/pull/62645
> [2]
> https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L1043
>

Reply via email to