henry3260 commented on code in PR #66574:
URL: https://github.com/apache/airflow/pull/66574#discussion_r3219894813
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1254,6 +1277,50 @@ def update_task_state_if_needed(self):
rendered_map_index=self._rendered_map_index,
)
+ def _replay_pending_terminal_state_msg(self) -> None:
+ """
+ Re-issue the dedicated API call for an unsynced terminal-state msg.
+
+ Best-effort — if the second attempt also fails the exception is
+ logged and we move on; the supervisor's overall failure handling
+ (heartbeat, exit-code reporting) will eventually surface the issue.
+ """
+ msg = self._pending_terminal_state_msg
+ if msg is None:
+ return
+ try:
+ if isinstance(msg, SucceedTask):
+ self.client.task_instances.succeed(
+ id=self.id,
+ when=msg.end_date,
+ task_outlets=msg.task_outlets,
+ outlet_events=msg.outlet_events,
+ rendered_map_index=self._rendered_map_index,
+ )
Review Comment:
Nice fix! Could we extract a helper method for the API calls? I noticed that
`_handle_request` and `_replay_pending_terminal_state_msg` share a lot of the
same logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]