One thing worth noting though (since it isn't already mentioned), is that the `ToTriggerSupervisor` in `triggerer_job_runner.py` already kind of does this, it is a scoped down channel (15 types) compared to 40 in `comms.py` for the trigger runner subprocess. So, the pattern isn't really new, it's just been done ad-hoc before.
That said, consider the `ToTriggerSupervisor` while refactoring to use the composable groups. One thing I also noticed regarding the scope of `ReadRequests` scope -- it might be broader than it sounds. With your scope, it would include the `GetDagRun`, `GetTICount` for instance, which are meaningful for tasks, but probably not for callbacks. You may want ReadRequests to be the full set for tasks, but define `CallbackToSupervisor` a bit more tightly by picking only the *gets *it needs. Option 1 seems most reasonable to me. Thanks & Regards, Amogh Desai On Wed, Mar 25, 2026 at 5:49 AM Ferruzzi, Dennis <[email protected]> wrote: > TL;DR: The new callback supervised process added in PR #62645 [1] needs > some - but not all - of the same comms channels that tasks use today. > Before I wire them up, I'd like to restructure `ToSupervisor` in `comms.py` > [2] so we can compose the right channel set per process type instead of > maintaining parallel and redundant flat lists. I've sketched out a few > approaches below, but there are likely a hundred ways to slice this so > maybe someone will see a cleaner design. I'm all ears. If nobody objects, > I'll proceed with the "security level" style grouping (Option 1). > > To be clear, this is purely reorganizing the existing type definitions > into logical building blocks. There will be no protocol changes, no new > message types, etc. Some subset of those building blocks will then be used > to assemble the default callback access. > > --- > > Background > > `ToSupervisor` in `task-sdk/…/comms.py` [2] is currently a single flat > union of ~40 message types; every request a task process might send to its > supervisor. The new `CallbackSubprocess` for executor callbacks [1] has an > empty message set, meaning callback code can't do anything that requires > the supervisor to proxy to the API server. No `Connection.get()`, no > `Variable.get()`, etc. To complicate matters, Callbacks are arbitrary user > code. People will put Slack notifications, PagerDuty alerts, custom > cleanup logic, and more in them, so we don't want to just blindly give them > unrestricted API access. For example, I figure they'll need at least > Connection and Variable lookups, secret masking, logging, and possibly XCom > access. But they clearly should *not* get task-lifecycle channels > (`SucceedTask`, `DeferTask`, etc.) since a callback doesn't share the task > lifecycle. > > As an example of future work that might happen and why this decision > matters now, consider a Deadline Alert with a callback configured to handle > a stalled task. Something like "if this DAG is still running 30 minutes > after it was queued, fail it and restart." Today there's no way to do this > because the callback subprocess has zero comms channels. But even once we > add channels, the current task lifecycle messages (`RetryTask`, > `SucceedTask`, etc.) won't work here because those are self-referential. > The task sends them about itself, and the supervisor infers the identity > from the process it's managing. A deadline callback is a different process > from the task it's alerting about. It would need some new > orchestration-style messages that carry explicit identifiers like > `FailTaskInstance(ti_id=...)` or `ClearTaskInstance(ti_id=...)`. This is a > natural extension of `TriggerDagRun`, which already exists and operates on > other DAG runs. > > This use case is perhaps a stretch but it shows why getting the grouping > right now matters. > > --- > > Proposed approaches > > Like I said, there are any number of ways to carve this up. I picked a > coupel that I think are the most obivous, along with my proposed solution. > If you see a better option, please speak up. I don't want to over-engineer > this, but I do want to get the granularity right since adding more > supervised process types is likely in the future and changing this once > it's out in the wild will be a big breaking mess if we try to claw back > permissions, or future additions will be complicated if we structure these > incorrectly. > > Option 0: Minimal; just unblock callbacks, punt the rest of this > discussion to later > > If we just want callbacks working, the fastest way to get there would be > to break out only the immediately shared subset and punt this entire > discussion: > > ```python > BaseRequests = GetConnection | GetVariable | MaskSecret | ResendLoggingFD > TaskRequests = BaseRequests | …everything else… > CallbackRequests = BaseRequests > ``` > > - Pro: Smallest diff, unblocks callback work immediately. > - Con: "Base" is an arbitrary grab bag that will likely drift, it doesn't > really help future consumers see what they're opting into, nor does it set > us up for the orchestration use case in the example. > > > Option 1: Group by access level ((my recommendation)) > > ```python > ReadRequests = GetConnection | GetVariable | GetXCom | etc.. Everything > starting with "Get" > WriteRequests = SetXCom | DeleteXCom | PutVariable | DeleteVariable > LoggingRequests = MaskSecret | ResendLoggingFD > TaskLifecycleRequests = TaskState | SucceedTask | DeferTask | RetryTask | > RescheduleTask | SkipDownstreamTasks > TaskMetadataRequests = SetRenderedFields | SetRenderedMapIndex | > ValidateInletsAndOutlets > OrchestrationRequests = TriggerDagRun # plus FailTaskInstance, > ClearTaskInstance as potential future work, as an example > HITLRequests = CreateHITLDetailPayload | UpdateHITLDetail | > GetHITLDetailResponse # HITL = Human in the loop, since I had to look up > what it meant in this context > ``` > > Then we assemble access per process: > > ```python > # Task gets everything > ToSupervisor = Annotated[ > ReadRequests | WriteRequests | LoggingRequests > | TaskLifecycleRequests | TaskMetadataRequests > | OrchestrationRequests | HITLRequests, > Field(discriminator="type"), > ] > > # Callback gets read-only + logging, and we can add orchestration later if > we decide to go that route > CallbackToSupervisor = Annotated[ > ReadRequests | LoggingRequests, > Field(discriminator="type"), > ] > ``` > > - Pro: Most of these are pretty self-documenting and maybe feels the most > comfortable for folks used to configuring tiered permissions policies. > - Pro: Opens the door to making access level configurable in the future. > Some day we might be able to do a team-based config option where Team A's > callbacks can only read and log, but Team B gets admin rights, etc. > - Con: Some groupings are judgment calls. Is `TriggerDagRun` a "write" or > do we put it in "orchestration" all by itself since it is the only current > one that affects a different entity? What if I want to allow my users to > write Variables but not write XComs? Is DeleteVariable a "write" or should > Deletes be their own tier entirely? I don't know if there is a solid "right > answer" to a lot of these so it'll be documentation-dependent. > > --- > > Option 2: Group by resource type > > ```python > ConnectionRequests = GetConnection > VariableRequests = GetVariable | PutVariable | DeleteVariable > XComRequests = GetXCom | … | SetXCom | DeleteXCom > DagRunRequests = GetDagRun | … | TriggerDagRun > TaskInstanceRequests = GetPreviousTI | … > # …etc > ``` > > - Pro: This feels like a natural "REST resource" grouping as opposed to > "permissions level" grouping > - Con: Mixes reads and writes within each group, so when a callback needs > `GetVariable` but not `PutVariable`, you either grant the whole > `VariableRequests` block or split it further, at which point you're back to > Option 1. Also makes the self-vs-other (task-vs-orchestration) distinction > invisible. > > --- > > Why I prefer Option 1 > > The security-level grouping gives us a vocabulary: "read," "write," > "logging," "lifecycle," "orchestration." Most of the composition statements > like `ReadRequests | LoggingRequests` are immediately understandable > without reading the docs. If we ever want to make callback permissions > configurable it would look something like `[callbacks] allowed_channels = > read,write,logging,orchestration`. But maybe the same can be said for > Option 2 which would look something like `[callbacks] allowed_channels = > connections,variables,logging`). > > --- > > My plan > > Unless someone proposes a better structure or has objections, I want to > proceed with Option 1. The initial default callback set will be > `ReadRequests | LoggingRequests` so they will have access to secrets > masking, logging, and read-only for everything else. We can expand it as > specific use cases (like the deadline restarting a stalled task I mentioned > above) come up. > > > [1] https://github.com/apache/airflow/pull/62645 > [2] > https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L1043 >
