kaxil commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1898412149
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -199,17 +199,22 @@ def ti_update_state(
)
# We exclude_unset to avoid updating fields that are not set in the payload
- data = ti_patch_payload.model_dump(exclude_unset=True)
+ # We do not need to deserialize "should_retry" -- it is used for dynamic
decision-making within failed state
+ data = ti_patch_payload.model_dump(exclude_unset=True,
exclude={"should_retry"})
query = update(TI).where(TI.id == ti_id_str).values(data)
if isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
query = query.values(state=ti_patch_payload.state)
+ updated_state = ti_patch_payload.state
if ti_patch_payload.state == State.FAILED:
# clear the next_method and next_kwargs
query = query.values(next_method=None, next_kwargs=None)
- updated_state = State.FAILED
+ task_instance = session.get(TI, ti_id_str)
+ if _is_eligible_to_retry(task_instance,
ti_patch_payload.should_retry):
Review Comment:
Do we need to get entire TI here?
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -199,17 +199,22 @@ def ti_update_state(
)
# We exclude_unset to avoid updating fields that are not set in the payload
- data = ti_patch_payload.model_dump(exclude_unset=True)
+ # We do not need to deserialize "should_retry" -- it is used for dynamic
decision-making within failed state
+ data = ti_patch_payload.model_dump(exclude_unset=True,
exclude={"should_retry"})
query = update(TI).where(TI.id == ti_id_str).values(data)
if isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
query = query.values(state=ti_patch_payload.state)
+ updated_state = ti_patch_payload.state
if ti_patch_payload.state == State.FAILED:
# clear the next_method and next_kwargs
query = query.values(next_method=None, next_kwargs=None)
- updated_state = State.FAILED
+ task_instance = session.get(TI, ti_id_str)
+ if _is_eligible_to_retry(task_instance,
ti_patch_payload.should_retry):
+ query = query.values(state=State.UP_FOR_RETRY)
+ updated_state = State.UP_FOR_RETRY
Review Comment:
```suggestion
updated_state = State.UP_FOR_RETRY
query = query.values(state=updated_state)
```
##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -128,7 +128,11 @@ def finish(self, id: uuid.UUID, state: TerminalTIState,
when: datetime):
"""Tell the API server that this TI has reached a terminal state."""
# TODO: handle the naming better. finish sounds wrong as "even"
deferred is essentially finishing.
body = TITerminalStatePayload(end_date=when,
state=TerminalTIState(state))
+ self.client.patch(f"task-instances/{id}/state",
content=body.model_dump_json())
+ def fail(self, id: uuid.UUID, when: datetime, should_retry: bool):
+ """Tell the API server that this TI has to fail, with or without
retries."""
+ body = TITerminalStatePayload(end_date=when,
state=TerminalTIState.FAILED, should_retry=should_retry)
Review Comment:
Hmm do we need a separate func here (`finish` and `fail` feels confusing),
`should_retry` already has default, we can not send it if it is False?
##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -710,7 +714,14 @@ def handle_requests(self, log: FilteringBoundLogger) ->
Generator[None, bytes, N
def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger):
log.debug("Received message from task runner", msg=msg)
resp = None
- if isinstance(msg, TaskState):
+ if isinstance(msg, FailState):
+ self._terminal_state = TerminalTIState.FAILED
+ self._task_end_time_monotonic = time.monotonic()
+ self._fail_request_sent = True
Review Comment:
Why do we need `self_fail_request_sent`?
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -199,17 +199,22 @@ def ti_update_state(
)
# We exclude_unset to avoid updating fields that are not set in the payload
- data = ti_patch_payload.model_dump(exclude_unset=True)
+ # We do not need to deserialize "should_retry" -- it is used for dynamic
decision-making within failed state
+ data = ti_patch_payload.model_dump(exclude_unset=True,
exclude={"should_retry"})
query = update(TI).where(TI.id == ti_id_str).values(data)
if isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
query = query.values(state=ti_patch_payload.state)
+ updated_state = ti_patch_payload.state
if ti_patch_payload.state == State.FAILED:
# clear the next_method and next_kwargs
query = query.values(next_method=None, next_kwargs=None)
- updated_state = State.FAILED
+ task_instance = session.get(TI, ti_id_str)
+ if _is_eligible_to_retry(task_instance,
ti_patch_payload.should_retry):
Review Comment:
You can also get `max_tries` & `try_number` too on L188 and here simply
compare
https://github.com/apache/airflow/blob/873e765376eb9654e5c8b8a71d95f92dd96f8958/airflow/api_fastapi/execution_api/routes/task_instances.py#L188
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]