jason810496 opened a new issue, #67798:
URL: https://github.com/apache/airflow/issues/67798
### Background
In coordinator mode the Java SDK reports every task failure as a terminal
`TaskState(FAILED)` frame and then exits 0, so a TaskInstance with retries
configured is recorded `FAILED` and **never retried**. The retry decision is
made client-side in the SDK; the Java runtime does not implement it yet.
`task-sdk/src/airflow/sdk/execution_time/task_runner.py` is the source of
truth for SDK behavior. `_handle_current_task_failed` (~L1640):
```python
if ti._ti_context_from_server and ti._ti_context_from_server.should_retry:
return RetryTask(...), UP_FOR_RETRY
return TaskState(state=FAILED, ...), FAILED
```
The supervisor confirms this is load-bearing — `supervisor.py` `final_state`
(~L1658): on **exit 0** the runtime's frame is taken verbatim, so a
`TaskState(FAILED)` frame becomes terminal `FAILED`. `UP_FOR_RETRY` is
reachable **only** via a distinct `RetryTask` message, or a non-zero exit
combined with `should_retry` (`supervisor.py:1359`, `:1668`).
Current Java state:
- `java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt`
(~L64-71) maps task-not-found to `REMOVED` and every caught exception to
`TaskResult.of(TaskState.State.FAILED)`; there is no retry branch, no
`should_retry` read, and no `RetryTask` message.
`should_retry` already travels on the wire as a field of `TIRunContext`
(`StartupDetails.ti_context`); see the supervisor `schema.json` and
`_generated.py` (`TIRunContext.should_retry`).
### What needs to happen
1. Read `should_retry` (and `max_tries`) from `StartupDetails.ti_context`.
2. Add a `RetryTask` outbound message mirroring the Python wire shape.
3. In `Task.kt`, on task failure emit `RetryTask` (→ `UP_FOR_RETRY`) when
`should_retry` is true, otherwise keep `TaskState(FAILED)`.
4. Add tests covering success / fail / retry.
### Acceptance criteria
- A failing coordinator-mode task whose TI has retries remaining is recorded
`UP_FOR_RETRY`, not `FAILED`.
- `should_retry` from `ti_context` is parsed and honored.
### Context
- Found during local review of #67318 (Go coordinator-mode runtime); the
same gap exists on the Java side.
- Source of truth: `task-sdk/src/airflow/sdk/execution_time/task_runner.py`
and `supervisor.py`.
- Out of scope (separate follow-ups): custom `RetryPolicy` / `RetryAction`
(`_apply_retry_policy_or_default`) and the other non-success terminals
(`DeferTask`, `RescheduleTask`, `SkipDownstreamTasks`).
- Sibling Go SDK gap: #67797.
---
Drafted-by: Claude Code (Opus 4.7); reviewed by @jason810496 before posting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]