Nataneljpwd commented on code in PR #64198:
URL: https://github.com/apache/airflow/pull/64198#discussion_r2986116487
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1355,7 +1355,7 @@ def _on_term(signum, frame):
# Same metric with tagging
Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
- if msg:
+ if msg and state != TaskInstanceState.UP_FOR_RETRY:
Review Comment:
Maybe it is a good idea to handle the case here?
Rather than rely on a different method called elsewhere, as to me, it seems
like this is an undocumented assumption that it will be handled later, at the
very least I would add a comment clarifying it, preferably I would explicitly
check if the state is up for retry AND there is message and handle it right
there (can be extracted to a method but should be obvious where the handling
occurs)
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1801,15 +1806,38 @@ def finalize(
except Exception:
log.exception("error calling listener")
elif state == TaskInstanceState.UP_FOR_RETRY:
- _run_task_state_change_callbacks(task, "on_retry_callback", context,
log)
+ from airflow.sdk.exceptions import AirflowFailException
+
+ try:
+ _run_task_state_change_callbacks(task, "on_retry_callback",
context, log)
+ except AirflowFailException as e:
+ # If on_retry_callback raises AirflowFailException, the task
should not retry.
+ log.info("on_retry_callback raised AirflowFailException; marking
task as failed without retry")
+ state = TaskInstanceState.FAILED
+ ti.state = state
+ error = e
+ SUPERVISOR_COMMS.send(
+ msg=TaskState(
+ state=TaskInstanceState.FAILED,
+ end_date=ti.end_date,
+ rendered_map_index=ti.rendered_map_index,
+ )
+ )
+ _run_task_state_change_callbacks(task, "on_failure_callback",
context, log)
+ if error and task.email_on_failure and task.email:
+ _send_error_email_notification(task, ti, context, error, log)
+ else:
+ # No AirflowFailException: proceed with retry as originally
planned.
+ if msg:
+ SUPERVISOR_COMMS.send(msg=msg)
+ if error and task.email_on_retry and task.email:
+ _send_error_email_notification(task, ti, context, error, log)
Review Comment:
Isn't this handled by the code deleted below either way?
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4360,6 +4360,53 @@ class CustomOperator(BaseOperator):
expected_exception_logs.insert(index, calls)
assert log.exception.mock_calls == expected_exception_logs
+ def test_airflow_fail_exception_in_on_retry_callback_marks_task_as_failed(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """AirflowFailException raised in on_retry_callback must prevent
retries and mark task as FAILED."""
+ from airflow.sdk.exceptions import AirflowFailException
+
+ failure_callback_called = []
+ retry_callback_called = []
+
+ def retry_callback(context):
+ retry_callback_called.append(True)
+ raise AirflowFailException("No more retries!")
+
+ def failure_callback(context):
+ failure_callback_called.append(True)
Review Comment:
Same as above
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4360,6 +4360,53 @@ class CustomOperator(BaseOperator):
expected_exception_logs.insert(index, calls)
assert log.exception.mock_calls == expected_exception_logs
+ def test_airflow_fail_exception_in_on_retry_callback_marks_task_as_failed(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """AirflowFailException raised in on_retry_callback must prevent
retries and mark task as FAILED."""
+ from airflow.sdk.exceptions import AirflowFailException
+
+ failure_callback_called = []
+ retry_callback_called = []
+
+ def retry_callback(context):
+ retry_callback_called.append(True)
+ raise AirflowFailException("No more retries!")
Review Comment:
Please use mocks and side effects instead of manually counting call counts
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1871,9 +1899,9 @@ def main():
bundle_name=ti.bundle_instance.name,
bundle_version=ti.bundle_instance.version,
):
- state, _, error = run(ti, context, log)
+ state, run_msg, error = run(ti, context, log)
Review Comment:
The msg was originally returned for testing, I would suggest either removing
the comment, or understand if we should use it in the finalize or non test code
(I still think it is better to handle in the run as much as possible)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]