jason810496 opened a new issue, #67797:
URL: https://github.com/apache/airflow/issues/67797

   ### Background
   
   In coordinator mode the Go SDK reports every task failure as a terminal 
`TaskState(failed)` frame and then exits 0, so a TaskInstance with retries 
configured is recorded `FAILED` and **never retried**. The retry decision is 
made client-side in the SDK; the Go runtime does not implement it yet.
   
   `task-sdk/src/airflow/sdk/execution_time/task_runner.py` is the source of 
truth for SDK behavior. `_handle_current_task_failed` (~L1640):
   
   ```python
   if ti._ti_context_from_server and ti._ti_context_from_server.should_retry:
       return RetryTask(...), UP_FOR_RETRY
   return TaskState(state=FAILED, ...), FAILED
   ```
   
   The supervisor confirms this is load-bearing — `supervisor.py` `final_state` 
(~L1658): on **exit 0** the runtime's frame is taken verbatim, so a 
`TaskState(failed)` frame becomes terminal `FAILED`. `UP_FOR_RETRY` is 
reachable **only** via a distinct `RetryTask` message, or a non-zero exit 
combined with `should_retry` (`supervisor.py:1359`, `:1668`).
   
   Current Go state:
   - `go-sdk/pkg/execution/task_runner.go` — task error, panic, and bad-UUID 
paths all return `TaskStateMsg{State: TaskStateFailed}`; there is no retry 
branch.
   - `go-sdk/pkg/execution/messages.go` — `decodeTIRunContext` does not parse 
`should_retry` / `max_tries`, and there is no `RetryTask` outbound message 
type. So neither the input nor the message for the retry path exists.
   
   `should_retry` already travels on the wire as a field of `TIRunContext` 
(`StartupDetails.ti_context`); see the supervisor `schema.json` and 
`_generated.py` (`TIRunContext.should_retry`).
   
   ### What needs to happen
   
   1. Parse `should_retry` (and `max_tries`) from `StartupDetails.ti_context` 
in `decodeTIRunContext`.
   2. Add a `RetryTask` outbound message type mirroring the Python wire shape.
   3. In `task_runner.go`, on task failure emit `RetryTask` (→ `UP_FOR_RETRY`) 
when `should_retry` is true, otherwise keep `TaskState(failed)`.
   4. Add tests covering success / fail / retry.
   
   ### Acceptance criteria
   
   - A failing coordinator-mode task whose TI has retries remaining is recorded 
`UP_FOR_RETRY`, not `FAILED`.
   - `should_retry` from `ti_context` is parsed and honored.
   - The TODO at the failure site in `go-sdk/pkg/execution/task_runner.go` is 
removed.
   
   ### Context
   
   - Found during local review of #67318 (coordinator-mode runtime entry point 
and task runner).
   - Source of truth: `task-sdk/src/airflow/sdk/execution_time/task_runner.py` 
and `supervisor.py`.
   - Out of scope (separate follow-ups): custom `RetryPolicy` / `RetryAction` 
(`_apply_retry_policy_or_default`) and the other non-success terminals 
(`DeferTask`, `RescheduleTask`, `SkipDownstreamTasks`).
   - Sibling Java SDK has the same gap — cross-link to follow.
   
   ---
   Drafted-by: Claude Code (Opus 4.7); reviewed by @jason810496 before posting


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to