uranusjr commented on code in PR #47996:
URL: https://github.com/apache/airflow/pull/47996#discussion_r2006227637


##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -884,12 +884,21 @@ def finalize(
         _xcom_push(ti, key=xcom_key, value=link)
 
     log.debug("Running finalizers", ti=ti)
-    if state in [TerminalTIState.SUCCESS]:
+    if state == TerminalTIState.SUCCESS:
         get_listener_manager().hook.on_task_instance_success(
             previous_state=TaskInstanceState.RUNNING, task_instance=ti
         )
         # TODO: Run task success callbacks here
-    if state in [TerminalTIState.FAILED, TerminalTIState.FAIL_WITHOUT_RETRY]:
+    elif state == TerminalTIState.FAILED:
+        get_listener_manager().hook.on_task_instance_failed(
+            previous_state=TaskInstanceState.RUNNING, task_instance=ti, 
error=error
+        )
+        if ti._ti_context_from_server and 
ti._ti_context_from_server.should_retry_on_failure:
+            pass  # TODO: Run task retry callbacks here
+        else:
+            # TODO: Run task failure callbacks here
+            state == TerminalTIState.FAIL_WITHOUT_RETRY

Review Comment:
   No; without this, the API will infinitely retry the task. (Or we need to 
somehow check retry eligibility twice in the API, once before to tell the task 
runner, and once after to decide whether to set the state to UP_FOR_RETRY.) By 
setting the terminal state to FAIL_WITHOUT_RETRY instead, the task runner 
(having the knowledge on retry eligibility)  tells the API not to retry but 
fail the task directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to