> ‎With your scope, it would include the `GetDagRun`, `GetTICount` for 
> instance, which are meaningful for tasks, but probably not for callbacks.

Yup, exactly.  Which is a point toward Option 2 and why this isn't such a cut 
and dry decision.

- ferruzzi

________________________________
From: Amogh Desai <[email protected]>
Sent: Wednesday, March 25, 2026 1:43 AM
To: [email protected] <[email protected]>
Subject: RE: [EXT] Subject: [Discussion] Restructuring supervised-process comms 
channels for reuse (callbacks & future work)

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. Ne 
cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez pas 
confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que le 
contenu ne présente aucun risque.



One thing worth noting though (since it isn't already mentioned), is that
the
`ToTriggerSupervisor` in `triggerer_job_runner.py` already kind of does
this,
it is a scoped down channel (15 types) compared to 40 in `comms.py` for the
trigger runner subprocess.  So, the pattern isn't really new, it's just
been done
ad-hoc before.

That said, consider the `ToTriggerSupervisor` while refactoring to use
the composable groups.

One thing I also noticed regarding the scope of `ReadRequests` scope -- it
might
be broader than it sounds. With your scope, it would include the
`GetDagRun`,
`GetTICount` for instance, which are meaningful for tasks, but probably not
for
callbacks. You may want ReadRequests to be the full set for tasks, but
define
`CallbackToSupervisor` a bit more tightly by picking only the *gets *it
needs.

Option 1 seems most reasonable to me.

Thanks & Regards,
Amogh Desai


On Wed, Mar 25, 2026 at 5:49 AM Ferruzzi, Dennis <[email protected]>
wrote:

> TL;DR: The new callback supervised process added in PR #62645 [1] needs
> some - but not all - of the same comms channels that tasks use today.
> Before I wire them up, I'd like to restructure `ToSupervisor` in `comms.py`
> [2] so we can compose the right channel set per process type instead of
> maintaining parallel and redundant flat lists.  I've sketched out a few
> approaches below, but there are likely a hundred ways to slice this so
> maybe someone will see a cleaner design.  I'm all ears.  If nobody objects,
> I'll proceed with the "security level" style grouping (Option 1).
>
> To be clear, this is purely reorganizing the existing type definitions
> into logical building blocks.  There will be no protocol changes, no new
> message types, etc.  Some subset of those building blocks will then be used
> to assemble the default callback access.
>
> ---
>
> Background
>
> `ToSupervisor` in `task-sdk/…/comms.py` [2] is currently a single flat
> union of ~40 message types; every request a task process might send to its
> supervisor.  The new `CallbackSubprocess` for executor callbacks [1] has an
> empty message set, meaning callback code can't do anything that requires
> the supervisor to proxy to the API server.  No `Connection.get()`, no
> `Variable.get()`, etc.  To complicate matters, Callbacks are arbitrary user
> code.  People will put Slack notifications, PagerDuty alerts, custom
> cleanup logic, and more in them, so we don't want to just blindly give them
> unrestricted API access.  For example, I figure they'll need at least
> Connection and Variable lookups, secret masking, logging, and possibly XCom
> access.  But they clearly should *not* get task-lifecycle channels
> (`SucceedTask`, `DeferTask`, etc.) since a callback doesn't share the task
> lifecycle.
>
> As an example of future work that might happen and why this decision
> matters now, consider a Deadline Alert with a callback configured to handle
> a stalled task.  Something like "if this DAG is still running 30 minutes
> after it was queued, fail it and restart." Today there's no way to do this
> because the callback subprocess has zero comms channels.  But even once we
> add channels, the current task lifecycle messages (`RetryTask`,
> `SucceedTask`, etc.) won't work here because those are self-referential.
> The task sends them about itself, and the supervisor infers the identity
> from the process it's managing.  A deadline callback is a different process
> from the task it's alerting about.  It would need some new
> orchestration-style messages that carry explicit identifiers like
> `FailTaskInstance(ti_id=...)` or `ClearTaskInstance(ti_id=...)`.  This is a
> natural extension of `TriggerDagRun`, which already exists and operates on
> other DAG runs.
>
> This use case is perhaps a stretch but it shows why getting the grouping
> right now matters.
>
> ---
>
> Proposed approaches
>
> Like I said, there are any number of ways to carve this up.  I picked a
> coupel that I think are the most obivous, along with my proposed solution.
> If you see a better option, please speak up.  I don't want to over-engineer
> this, but I do want to get the granularity right since adding more
> supervised process types is likely in the future and changing this once
> it's out in the wild will be a big breaking mess if we try to claw back
> permissions, or future additions will be complicated if we structure these
> incorrectly.
>
> Option 0: Minimal; just unblock callbacks, punt the rest of this
> discussion to later
>
> If we just want callbacks working, the fastest way to get there would be
> to break out only the immediately shared subset and punt this entire
> discussion:
>
> ```python
> BaseRequests = GetConnection | GetVariable | MaskSecret | ResendLoggingFD
> TaskRequests = BaseRequests | …everything else…
> CallbackRequests = BaseRequests
> ```
>
> - Pro: Smallest diff, unblocks callback work immediately.
> - Con: "Base" is an arbitrary grab bag that will likely drift, it doesn't
> really help future consumers see what they're opting into, nor does it set
> us up for the orchestration use case in the example.
>
>
> Option 1: Group by access level  ((my recommendation))
>
> ```python
> ReadRequests = GetConnection | GetVariable | GetXCom | etc.. Everything
> starting with "Get"
> WriteRequests = SetXCom | DeleteXCom | PutVariable | DeleteVariable
> LoggingRequests = MaskSecret | ResendLoggingFD
> TaskLifecycleRequests = TaskState | SucceedTask | DeferTask | RetryTask |
> RescheduleTask | SkipDownstreamTasks
> TaskMetadataRequests = SetRenderedFields | SetRenderedMapIndex |
> ValidateInletsAndOutlets
> OrchestrationRequests = TriggerDagRun  # plus FailTaskInstance,
> ClearTaskInstance as potential future work, as an example
> HITLRequests = CreateHITLDetailPayload | UpdateHITLDetail |
> GetHITLDetailResponse # HITL = Human in the loop, since I had to look up
> what it meant in this context
> ```
>
> Then we assemble access per process:
>
> ```python
> # Task gets everything
> ToSupervisor = Annotated[
>     ReadRequests | WriteRequests | LoggingRequests
>     | TaskLifecycleRequests | TaskMetadataRequests
>     | OrchestrationRequests | HITLRequests,
>     Field(discriminator="type"),
> ]
>
> # Callback gets read-only + logging, and we can add orchestration later if
> we decide to go that route
> CallbackToSupervisor = Annotated[
>     ReadRequests | LoggingRequests,
>     Field(discriminator="type"),
> ]
> ```
>
> - Pro: Most of these are pretty self-documenting and maybe feels the most
> comfortable for folks used to configuring tiered permissions policies.
> - Pro: Opens the door to making access level configurable in the future.
> Some day we might be able to do a team-based config option where Team A's
> callbacks can only read and log, but Team B gets admin rights, etc.
> - Con: Some groupings are judgment calls.  Is `TriggerDagRun` a "write" or
> do we put it in "orchestration" all by itself since it is the only current
> one that affects a different entity?  What if I want to allow my users to
> write Variables but not write XComs?  Is DeleteVariable a "write" or should
> Deletes be their own tier entirely? I don't know if there is a solid "right
> answer" to a lot of these so it'll be documentation-dependent.
>
> ---
>
> Option 2: Group by resource type
>
> ```python
> ConnectionRequests = GetConnection
> VariableRequests = GetVariable | PutVariable | DeleteVariable
> XComRequests = GetXCom | … | SetXCom | DeleteXCom
> DagRunRequests = GetDagRun | … | TriggerDagRun
> TaskInstanceRequests = GetPreviousTI | …
> # …etc
> ```
>
> - Pro: This feels like a natural "REST resource" grouping as opposed to
> "permissions level" grouping
> - Con: Mixes reads and writes within each group, so when a callback needs
> `GetVariable` but not `PutVariable`, you either grant the whole
> `VariableRequests` block or split it further, at which point you're back to
> Option 1.  Also makes the self-vs-other (task-vs-orchestration) distinction
> invisible.
>
> ---
>
> Why I prefer Option 1
>
> The security-level grouping gives us a vocabulary: "read," "write,"
> "logging," "lifecycle," "orchestration." Most of the composition statements
> like `ReadRequests | LoggingRequests` are immediately understandable
> without reading the docs.  If we ever want to make callback permissions
> configurable it would look something like `[callbacks] allowed_channels =
> read,write,logging,orchestration`.  But maybe the same can be said for
> Option 2 which would look something like `[callbacks] allowed_channels =
> connections,variables,logging`).
>
> ---
>
> My plan
>
> Unless someone proposes a better structure or has objections, I want to
> proceed with Option 1.  The initial default callback set will be
> `ReadRequests | LoggingRequests` so they will have access to secrets
> masking, logging, and read-only for everything else.  We can expand it as
> specific use cases (like the deadline restarting a stalled task I mentioned
> above) come up.
>
>
> [1] https://github.com/apache/airflow/pull/62645
> [2]
> https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L1043
>

Reply via email to