Nataneljpwd commented on code in PR #64198:
URL: https://github.com/apache/airflow/pull/64198#discussion_r2986116487


##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1355,7 +1355,7 @@ def _on_term(signum, frame):
         # Same metric with tagging
         Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
 
-        if msg:
+        if msg and state != TaskInstanceState.UP_FOR_RETRY:

Review Comment:
   Maybe it is a good idea to handle the case here?
   Rather than rely on a different method called elsewhere, as to me, it seems 
like this is an undocumented assumption that it will be handled later, at the 
very least I would add a comment clarifying it, preferably I would explicitly 
check if the state is up for retry AND there is message and handle it right 
there (can be extracted to a method but should be obvious where the handling 
occurs)



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1801,15 +1806,38 @@ def finalize(
         except Exception:
             log.exception("error calling listener")
     elif state == TaskInstanceState.UP_FOR_RETRY:
-        _run_task_state_change_callbacks(task, "on_retry_callback", context, 
log)
+        from airflow.sdk.exceptions import AirflowFailException
+
+        try:
+            _run_task_state_change_callbacks(task, "on_retry_callback", 
context, log)
+        except AirflowFailException as e:
+            # If on_retry_callback raises AirflowFailException, the task 
should not retry.
+            log.info("on_retry_callback raised AirflowFailException; marking 
task as failed without retry")
+            state = TaskInstanceState.FAILED
+            ti.state = state
+            error = e
+            SUPERVISOR_COMMS.send(
+                msg=TaskState(
+                    state=TaskInstanceState.FAILED,
+                    end_date=ti.end_date,
+                    rendered_map_index=ti.rendered_map_index,
+                )
+            )
+            _run_task_state_change_callbacks(task, "on_failure_callback", 
context, log)
+            if error and task.email_on_failure and task.email:
+                _send_error_email_notification(task, ti, context, error, log)
+        else:
+            # No AirflowFailException: proceed with retry as originally 
planned.
+            if msg:
+                SUPERVISOR_COMMS.send(msg=msg)
+            if error and task.email_on_retry and task.email:
+                _send_error_email_notification(task, ti, context, error, log)

Review Comment:
   Isn't this handled by the code deleted below either way?



##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4360,6 +4360,53 @@ class CustomOperator(BaseOperator):
             expected_exception_logs.insert(index, calls)
         assert log.exception.mock_calls == expected_exception_logs
 
+    def test_airflow_fail_exception_in_on_retry_callback_marks_task_as_failed(
+        self, create_runtime_ti, mock_supervisor_comms
+    ):
+        """AirflowFailException raised in on_retry_callback must prevent 
retries and mark task as FAILED."""
+        from airflow.sdk.exceptions import AirflowFailException
+
+        failure_callback_called = []
+        retry_callback_called = []
+
+        def retry_callback(context):
+            retry_callback_called.append(True)
+            raise AirflowFailException("No more retries!")
+
+        def failure_callback(context):
+            failure_callback_called.append(True)

Review Comment:
   Same as above



##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4360,6 +4360,53 @@ class CustomOperator(BaseOperator):
             expected_exception_logs.insert(index, calls)
         assert log.exception.mock_calls == expected_exception_logs
 
+    def test_airflow_fail_exception_in_on_retry_callback_marks_task_as_failed(
+        self, create_runtime_ti, mock_supervisor_comms
+    ):
+        """AirflowFailException raised in on_retry_callback must prevent 
retries and mark task as FAILED."""
+        from airflow.sdk.exceptions import AirflowFailException
+
+        failure_callback_called = []
+        retry_callback_called = []
+
+        def retry_callback(context):
+            retry_callback_called.append(True)
+            raise AirflowFailException("No more retries!")

Review Comment:
   Please use mocks and side effects instead of manually counting call counts



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1871,9 +1899,9 @@ def main():
                 bundle_name=ti.bundle_instance.name,
                 bundle_version=ti.bundle_instance.version,
             ):
-                state, _, error = run(ti, context, log)
+                state, run_msg, error = run(ti, context, log)

Review Comment:
   The msg was originally returned for testing, I would suggest either removing 
the comment, or understand if we should use it in the finalize or non test code 
(I still think it is better to handle in the run as much as possible)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to