Fair point, but isn't it the same trap from the other direction too? Option
2's resource groups mix
reads and writes, so VariableRequests gives callbacks delete access
alongside get.

The actual issue with Option 1 is not the read/write/lifecycle separation,
it's that we assume
"ReadRequests = everything starting with Get" and it gets too coarse. Maybe
you can consider
splitting it into SharedReads (Connection, Variable, XCom, Asset (things
that subprocess needs))
and TaskContextReads (GetPreviousTI, GetTaskBreadcrumbs etc (things that
only make sense
for a process that is a task). Callbacks can get SharedReads |
LoggingRequests, and the tasks
get the entirel set.

Thanks & Regards,
Amogh Desai


On Wed, Mar 25, 2026 at 10:33 PM Ferruzzi, Dennis <[email protected]>
wrote:

> > ‎With your scope, it would include the `GetDagRun`, `GetTICount` for
> instance, which are meaningful for tasks, but probably not for callbacks.
>
> Yup, exactly.  Which is a point toward Option 2 and why this isn't such a
> cut and dry decision.
>
> - ferruzzi
>
> ________________________________
> From: Amogh Desai <[email protected]>
> Sent: Wednesday, March 25, 2026 1:43 AM
> To: [email protected] <[email protected]>
> Subject: RE: [EXT] Subject: [Discussion] Restructuring supervised-process
> comms channels for reuse (callbacks & future work)
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe.
> Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez
> pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que
> le contenu ne présente aucun risque.
>
>
>
> One thing worth noting though (since it isn't already mentioned), is that
> the
> `ToTriggerSupervisor` in `triggerer_job_runner.py` already kind of does
> this,
> it is a scoped down channel (15 types) compared to 40 in `comms.py` for the
> trigger runner subprocess.  So, the pattern isn't really new, it's just
> been done
> ad-hoc before.
>
> That said, consider the `ToTriggerSupervisor` while refactoring to use
> the composable groups.
>
> One thing I also noticed regarding the scope of `ReadRequests` scope -- it
> might
> be broader than it sounds. With your scope, it would include the
> `GetDagRun`,
> `GetTICount` for instance, which are meaningful for tasks, but probably not
> for
> callbacks. You may want ReadRequests to be the full set for tasks, but
> define
> `CallbackToSupervisor` a bit more tightly by picking only the *gets *it
> needs.
>
> Option 1 seems most reasonable to me.
>
> Thanks & Regards,
> Amogh Desai
>
>
> On Wed, Mar 25, 2026 at 5:49 AM Ferruzzi, Dennis <[email protected]>
> wrote:
>
> > TL;DR: The new callback supervised process added in PR #62645 [1] needs
> > some - but not all - of the same comms channels that tasks use today.
> > Before I wire them up, I'd like to restructure `ToSupervisor` in
> `comms.py`
> > [2] so we can compose the right channel set per process type instead of
> > maintaining parallel and redundant flat lists.  I've sketched out a few
> > approaches below, but there are likely a hundred ways to slice this so
> > maybe someone will see a cleaner design.  I'm all ears.  If nobody
> objects,
> > I'll proceed with the "security level" style grouping (Option 1).
> >
> > To be clear, this is purely reorganizing the existing type definitions
> > into logical building blocks.  There will be no protocol changes, no new
> > message types, etc.  Some subset of those building blocks will then be
> used
> > to assemble the default callback access.
> >
> > ---
> >
> > Background
> >
> > `ToSupervisor` in `task-sdk/…/comms.py` [2] is currently a single flat
> > union of ~40 message types; every request a task process might send to
> its
> > supervisor.  The new `CallbackSubprocess` for executor callbacks [1] has
> an
> > empty message set, meaning callback code can't do anything that requires
> > the supervisor to proxy to the API server.  No `Connection.get()`, no
> > `Variable.get()`, etc.  To complicate matters, Callbacks are arbitrary
> user
> > code.  People will put Slack notifications, PagerDuty alerts, custom
> > cleanup logic, and more in them, so we don't want to just blindly give
> them
> > unrestricted API access.  For example, I figure they'll need at least
> > Connection and Variable lookups, secret masking, logging, and possibly
> XCom
> > access.  But they clearly should *not* get task-lifecycle channels
> > (`SucceedTask`, `DeferTask`, etc.) since a callback doesn't share the
> task
> > lifecycle.
> >
> > As an example of future work that might happen and why this decision
> > matters now, consider a Deadline Alert with a callback configured to
> handle
> > a stalled task.  Something like "if this DAG is still running 30 minutes
> > after it was queued, fail it and restart." Today there's no way to do
> this
> > because the callback subprocess has zero comms channels.  But even once
> we
> > add channels, the current task lifecycle messages (`RetryTask`,
> > `SucceedTask`, etc.) won't work here because those are self-referential.
> > The task sends them about itself, and the supervisor infers the identity
> > from the process it's managing.  A deadline callback is a different
> process
> > from the task it's alerting about.  It would need some new
> > orchestration-style messages that carry explicit identifiers like
> > `FailTaskInstance(ti_id=...)` or `ClearTaskInstance(ti_id=...)`.  This
> is a
> > natural extension of `TriggerDagRun`, which already exists and operates
> on
> > other DAG runs.
> >
> > This use case is perhaps a stretch but it shows why getting the grouping
> > right now matters.
> >
> > ---
> >
> > Proposed approaches
> >
> > Like I said, there are any number of ways to carve this up.  I picked a
> > coupel that I think are the most obivous, along with my proposed
> solution.
> > If you see a better option, please speak up.  I don't want to
> over-engineer
> > this, but I do want to get the granularity right since adding more
> > supervised process types is likely in the future and changing this once
> > it's out in the wild will be a big breaking mess if we try to claw back
> > permissions, or future additions will be complicated if we structure
> these
> > incorrectly.
> >
> > Option 0: Minimal; just unblock callbacks, punt the rest of this
> > discussion to later
> >
> > If we just want callbacks working, the fastest way to get there would be
> > to break out only the immediately shared subset and punt this entire
> > discussion:
> >
> > ```python
> > BaseRequests = GetConnection | GetVariable | MaskSecret | ResendLoggingFD
> > TaskRequests = BaseRequests | …everything else…
> > CallbackRequests = BaseRequests
> > ```
> >
> > - Pro: Smallest diff, unblocks callback work immediately.
> > - Con: "Base" is an arbitrary grab bag that will likely drift, it doesn't
> > really help future consumers see what they're opting into, nor does it
> set
> > us up for the orchestration use case in the example.
> >
> >
> > Option 1: Group by access level  ((my recommendation))
> >
> > ```python
> > ReadRequests = GetConnection | GetVariable | GetXCom | etc.. Everything
> > starting with "Get"
> > WriteRequests = SetXCom | DeleteXCom | PutVariable | DeleteVariable
> > LoggingRequests = MaskSecret | ResendLoggingFD
> > TaskLifecycleRequests = TaskState | SucceedTask | DeferTask | RetryTask |
> > RescheduleTask | SkipDownstreamTasks
> > TaskMetadataRequests = SetRenderedFields | SetRenderedMapIndex |
> > ValidateInletsAndOutlets
> > OrchestrationRequests = TriggerDagRun  # plus FailTaskInstance,
> > ClearTaskInstance as potential future work, as an example
> > HITLRequests = CreateHITLDetailPayload | UpdateHITLDetail |
> > GetHITLDetailResponse # HITL = Human in the loop, since I had to look up
> > what it meant in this context
> > ```
> >
> > Then we assemble access per process:
> >
> > ```python
> > # Task gets everything
> > ToSupervisor = Annotated[
> >     ReadRequests | WriteRequests | LoggingRequests
> >     | TaskLifecycleRequests | TaskMetadataRequests
> >     | OrchestrationRequests | HITLRequests,
> >     Field(discriminator="type"),
> > ]
> >
> > # Callback gets read-only + logging, and we can add orchestration later
> if
> > we decide to go that route
> > CallbackToSupervisor = Annotated[
> >     ReadRequests | LoggingRequests,
> >     Field(discriminator="type"),
> > ]
> > ```
> >
> > - Pro: Most of these are pretty self-documenting and maybe feels the most
> > comfortable for folks used to configuring tiered permissions policies.
> > - Pro: Opens the door to making access level configurable in the future.
> > Some day we might be able to do a team-based config option where Team A's
> > callbacks can only read and log, but Team B gets admin rights, etc.
> > - Con: Some groupings are judgment calls.  Is `TriggerDagRun` a "write"
> or
> > do we put it in "orchestration" all by itself since it is the only
> current
> > one that affects a different entity?  What if I want to allow my users to
> > write Variables but not write XComs?  Is DeleteVariable a "write" or
> should
> > Deletes be their own tier entirely? I don't know if there is a solid
> "right
> > answer" to a lot of these so it'll be documentation-dependent.
> >
> > ---
> >
> > Option 2: Group by resource type
> >
> > ```python
> > ConnectionRequests = GetConnection
> > VariableRequests = GetVariable | PutVariable | DeleteVariable
> > XComRequests = GetXCom | … | SetXCom | DeleteXCom
> > DagRunRequests = GetDagRun | … | TriggerDagRun
> > TaskInstanceRequests = GetPreviousTI | …
> > # …etc
> > ```
> >
> > - Pro: This feels like a natural "REST resource" grouping as opposed to
> > "permissions level" grouping
> > - Con: Mixes reads and writes within each group, so when a callback needs
> > `GetVariable` but not `PutVariable`, you either grant the whole
> > `VariableRequests` block or split it further, at which point you're back
> to
> > Option 1.  Also makes the self-vs-other (task-vs-orchestration)
> distinction
> > invisible.
> >
> > ---
> >
> > Why I prefer Option 1
> >
> > The security-level grouping gives us a vocabulary: "read," "write,"
> > "logging," "lifecycle," "orchestration." Most of the composition
> statements
> > like `ReadRequests | LoggingRequests` are immediately understandable
> > without reading the docs.  If we ever want to make callback permissions
> > configurable it would look something like `[callbacks] allowed_channels =
> > read,write,logging,orchestration`.  But maybe the same can be said for
> > Option 2 which would look something like `[callbacks] allowed_channels =
> > connections,variables,logging`).
> >
> > ---
> >
> > My plan
> >
> > Unless someone proposes a better structure or has objections, I want to
> > proceed with Option 1.  The initial default callback set will be
> > `ReadRequests | LoggingRequests` so they will have access to secrets
> > masking, logging, and read-only for everything else.  We can expand it as
> > specific use cases (like the deadline restarting a stalled task I
> mentioned
> > above) come up.
> >
> >
> > [1] https://github.com/apache/airflow/pull/62645
> > [2]
> >
> https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L1043
> >
>

Reply via email to