amoghrajesh commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1898608576
##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -128,7 +128,11 @@ def finish(self, id: uuid.UUID, state: TerminalTIState,
when: datetime):
"""Tell the API server that this TI has reached a terminal state."""
# TODO: handle the naming better. finish sounds wrong as "even"
deferred is essentially finishing.
body = TITerminalStatePayload(end_date=when,
state=TerminalTIState(state))
+ self.client.patch(f"task-instances/{id}/state",
content=body.model_dump_json())
+ def fail(self, id: uuid.UUID, when: datetime, should_retry: bool):
+ """Tell the API server that this TI has to fail, with or without
retries."""
+ body = TITerminalStatePayload(end_date=when,
state=TerminalTIState.FAILED, should_retry=should_retry)
Review Comment:
This comment is not valid anymore as the code is reworked
##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -710,7 +714,14 @@ def handle_requests(self, log: FilteringBoundLogger) ->
Generator[None, bytes, N
def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger):
log.debug("Received message from task runner", msg=msg)
resp = None
- if isinstance(msg, TaskState):
+ if isinstance(msg, FailState):
+ self._terminal_state = TerminalTIState.FAILED
+ self._task_end_time_monotonic = time.monotonic()
+ self._fail_request_sent = True
Review Comment:
Same here, removed that variable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]