Copilot commented on code in PR #64023:
URL: https://github.com/apache/airflow/pull/64023#discussion_r3025336040


##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -448,6 +461,13 @@ def _handle_fail_fast_for_dag(ti: TI, dag_id: str, 
session: SessionDep, dag_bag:
         _stop_remaining_tasks(task_instance=ti, 
task_teardown_map=task_teardown_map, session=session)
 
 
+def _get_requested_state(ti_patch_payload: TIStateUpdate) -> TaskInstanceState 
| None:
+    """Extract the requested terminal state from a TI state update payload."""

Review Comment:
   `_get_requested_state()` claims to extract a "terminal" state, but it also 
accepts `TIRetryStatePayload` (up_for_retry). Suggest updating the docstring 
(or renaming the helper) to reflect that it extracts the requested *target* 
state for idempotency checks, not strictly terminal states.
   ```suggestion
       """Extract the requested target state from a TI state update payload for 
idempotency checks."""
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -363,6 +363,19 @@ def ti_update_state(
         )
 
     if previous_state != TaskInstanceState.RUNNING:
+        # Check for idempotent duplicate: if the TI is already in the exact 
same
+        # terminal state being requested, treat it as a successful no-op.
+        # This handles network-retry scenarios where the first request 
succeeded
+        # but the response was lost, causing a retry that sees the TI already
+        # in the target state.
+        requested_state = _get_requested_state(ti_patch_payload)
+        if requested_state is not None and previous_state == requested_state:
+            log.info(
+                "TI is already in the requested terminal state, treating as 
idempotent success",

Review Comment:
   The new idempotency path is described as handling a "terminal" state, but 
this branch is also used for non-terminal-but-direct states like up_for_retry. 
Consider rewording these comments (and any related docs) to refer to the 
"requested target state" rather than "terminal state" to avoid confusion for 
future maintainers.
   ```suggestion
           # requested target state, treat it as a successful no-op.
           # This handles network-retry scenarios where the first request 
succeeded
           # but the response was lost, causing a retry that sees the TI already
           # in that requested state.
           requested_state = _get_requested_state(ti_patch_payload)
           if requested_state is not None and previous_state == requested_state:
               log.info(
                   "TI is already in the requested target state, treating as 
idempotent success",
   ```



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1081,12 +1091,23 @@ def update_task_state_if_needed(self):
         # For states like `deferred`, `up_for_reschedule`, the process will 
exit with 0, but the state will be updated
         # by the subprocess in the `handle_requests` method.
         if self.final_state not in STATES_SENT_DIRECTLY:
-            self.client.task_instances.finish(
-                id=self.id,
-                state=self.final_state,
-                when=datetime.now(tz=timezone.utc),
-                rendered_map_index=self._rendered_map_index,
-            )
+            try:
+                self.client.task_instances.finish(
+                    id=self.id,
+                    state=self.final_state,
+                    when=datetime.now(tz=timezone.utc),
+                    rendered_map_index=self._rendered_map_index,
+                )
+            except ServerResponseError as e:
+                if e.response.status_code == HTTPStatus.CONFLICT:
+                    log.warning(
+                        "Failed to update TI state after process exit due to 
conflict",
+                        ti_id=self.id,
+                        final_state=self.final_state,
+                        detail=getattr(e, "detail", str(e)),

Review Comment:
   On conflict, this log field uses `getattr(e, "detail", str(e))`, but 
`ServerResponseError` details usually live on `e.response` (e.g., JSON body). 
Logging `e.response.json()`/`e.response.text` (guarded) would make this warning 
much more actionable when diagnosing why the finish update conflicted.
   ```suggestion
                       detail = getattr(e, "detail", str(e))
                       response_body = None
                       try:
                           if e.response is not None:
                               try:
                                   response_body = e.response.json()
                               except Exception:
                                   response_body = e.response.text
                       except Exception:
                           response_body = None
                       log.warning(
                           "Failed to update TI state after process exit due to 
conflict",
                           ti_id=self.id,
                           final_state=self.final_state,
                           detail=detail,
                           response_body=response_body,
   ```



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1262,22 +1283,44 @@ def _handle_request(self, msg: ToSupervisor, log: 
FilteringBoundLogger, req_id:
             self._terminal_state = msg.state
             self._task_end_time_monotonic = time.monotonic()
             self._rendered_map_index = msg.rendered_map_index
-            self.client.task_instances.succeed(
-                id=self.id,
-                when=msg.end_date,
-                task_outlets=msg.task_outlets,
-                outlet_events=msg.outlet_events,
-                rendered_map_index=self._rendered_map_index,
-            )
+            try:
+                self.client.task_instances.succeed(
+                    id=self.id,
+                    when=msg.end_date,
+                    task_outlets=msg.task_outlets,
+                    outlet_events=msg.outlet_events,
+                    rendered_map_index=self._rendered_map_index,
+                )
+            except ServerResponseError as e:
+                if e.response.status_code == HTTPStatus.CONFLICT and 
_is_already_in_target_state(
+                    e, msg.state
+                ):
+                    log.info(
+                        "TI already in success state, treating as idempotent 
success",
+                        ti_id=self.id,
+                    )

Review Comment:
   This info log says "treating as idempotent success". Since this is 
specifically swallowing a 409 conflict, wording like "idempotent no-op" / 
"already in target state" (and optionally including 
`previous_state`/`target_state`) would be clearer for operators reading logs 
during incident triage.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1262,22 +1283,44 @@ def _handle_request(self, msg: ToSupervisor, log: 
FilteringBoundLogger, req_id:
             self._terminal_state = msg.state
             self._task_end_time_monotonic = time.monotonic()
             self._rendered_map_index = msg.rendered_map_index
-            self.client.task_instances.succeed(
-                id=self.id,
-                when=msg.end_date,
-                task_outlets=msg.task_outlets,
-                outlet_events=msg.outlet_events,
-                rendered_map_index=self._rendered_map_index,
-            )
+            try:
+                self.client.task_instances.succeed(
+                    id=self.id,
+                    when=msg.end_date,
+                    task_outlets=msg.task_outlets,
+                    outlet_events=msg.outlet_events,
+                    rendered_map_index=self._rendered_map_index,
+                )
+            except ServerResponseError as e:
+                if e.response.status_code == HTTPStatus.CONFLICT and 
_is_already_in_target_state(
+                    e, msg.state
+                ):
+                    log.info(
+                        "TI already in success state, treating as idempotent 
success",
+                        ti_id=self.id,
+                    )
+                else:
+                    raise
         elif isinstance(msg, RetryTask):
             self._terminal_state = msg.state
             self._task_end_time_monotonic = time.monotonic()
             self._rendered_map_index = msg.rendered_map_index
-            self.client.task_instances.retry(
-                id=self.id,
-                end_date=msg.end_date,
-                rendered_map_index=self._rendered_map_index,
-            )
+            try:
+                self.client.task_instances.retry(
+                    id=self.id,
+                    end_date=msg.end_date,
+                    rendered_map_index=self._rendered_map_index,
+                )
+            except ServerResponseError as e:
+                if e.response.status_code == HTTPStatus.CONFLICT and 
_is_already_in_target_state(
+                    e, msg.state
+                ):
+                    log.info(
+                        "TI already in retry state, treating as idempotent 
success",

Review Comment:
   This log message says "treating as idempotent success" even though the 
target state here is a retry transition. Consider using neutral phrasing such 
as "already in retry state, treating as idempotent no-op" to avoid implying the 
task succeeded when it actually moved to UP_FOR_RETRY.
   ```suggestion
                           "TI already in retry state, treating as idempotent 
no-op",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to