kaxil opened a new pull request, #68028:
URL: https://github.com/apache/airflow/pull/68028

   Human-in-the-loop (HITL) tasks parked on the triggerer: the operator called 
`self.defer(...)` with an `HITLTrigger`, so the task sat in `deferred` and a 
trigger held a triggerer slot for the entire wait. HITL waits are open-ended (a 
human may answer in minutes or in days), so a single parked task pinned the 
triggerer the whole time, and the triggerer could never scale to zero while any 
task was awaiting a response.
   
   This adds a first-class `awaiting_input` task state, managed by the 
scheduler, that holds neither a worker slot nor the triggerer, the same model 
`up_for_reschedule` already uses. The task parks in `awaiting_input` and is 
resumed directly by either a human response (through the Core API) or the 
scheduler's timeout sweep. The triggerer is no longer involved in HITL at all, 
so it can scale to zero independently of pending human input.
   
   ## Why a new state instead of reusing `deferred`
   
   `deferred` means "a trigger is running on the triggerer for this task". HITL 
has no trigger, so reusing `deferred` keeps a false coupling to the triggerer 
and defeats the scale-to-zero goal. A distinct `awaiting_input` state makes the 
wait visible in its own right and lets the scheduler own liveness (response 
timeouts) with no triggerer in the loop.
   
   ## How it works
   
   - The Task SDK raises `TaskAwaitingInput` (a sibling of `TaskDeferred`). The 
task runner sends an `AwaitInputTask` message and the Execution API parks the 
TI in `awaiting_input`, storing the optional response deadline on the existing 
`trigger_timeout` column. No trigger row is created.
   - A human response through the Core API resumes the task directly via 
`handle_event_submit`, the same primitive the trigger path used, so 
`HITLOperator.execute_complete` runs unchanged.
   - The scheduler runs a bounded `awaiting_input` timeout sweep: a task past 
its deadline resumes with its default response, or fails if no default was set. 
This replaces the triggerer's timeout handling.
   - The respond path and the park path take the same lock order 
(`TaskInstance` then `HITLDetail`), so a response that races the park 
transition cannot deadlock or strand an already-answered task.
   
   ## Backwards compatibility
   
   - On Airflow < 3.3 the operator falls back to the original 
`self.defer(...HITLTrigger...)` path, so HITL keeps working against older cores.
   - `awaiting_input` is additive: the TI `state` column is a free-form string, 
so no migration is needed.
   - The UI and the pending-actions filters treat both `deferred` (pre-3.3 / 
fallback) and `awaiting_input` as "pending required action", so old and new 
runs both surface correctly.
   
   ## Demo
   
   The triggerer was killed for all of the below; nothing but the scheduler and 
API server are involved.
   
   <img width="2426" height="1704" alt="image" 
src="https://github.com/user-attachments/assets/c4571e89-df6b-4442-84cd-5ac44f758df3";
 />
   
   <img width="1680" height="1050" alt="1-grid-awaiting-input" 
src="https://github.com/user-attachments/assets/1b5628a1-ab43-4133-848a-09a8f7df347f";
 />
   
   
   <img width="1680" height="1050" alt="2-required-actions-list" 
src="https://github.com/user-attachments/assets/885ebf2b-18ab-4bef-ab9b-e21c1727cbfc";
 />
   
   
   <img width="1680" height="1050" alt="3-response-form-freetext" 
src="https://github.com/user-attachments/assets/906d34d1-7ab3-4537-9105-499cf64a1184";
 />
   
   <img width="1680" height="1050" alt="4-response-form-options" 
src="https://github.com/user-attachments/assets/d6fae636-79e3-4d68-8036-c0fe5c57edf7";
 />
   
   <img width="1680" height="1050" alt="5-resumed-no-triggerer" 
src="https://github.com/user-attachments/assets/43eceaed-88e3-4587-b273-7ab0b128d1da";
 />
   
   
   
   ## Known follow-up
   
   `check-supervisor-schemas-versions` flags `AwaitInputTask` as a new 
supervisor body. cadwyn cannot attach a `VersionChange` to the bundle's only 
(oldest) version, and the schema package's `AGENTS.md` says a brand-new body 
needs none, so this is a bootstrap gap in that check rather than a missing 
entry. The check needs a small tweak to allow purely-additive new bodies; 
handled separately.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to