Nataneljpwd commented on code in PR #66781:
URL: https://github.com/apache/airflow/pull/66781#discussion_r3336604259
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1967,31 +2005,72 @@ def finalize(
except Exception:
log.exception("error calling listener")
elif state == TaskInstanceState.UP_FOR_RETRY:
- _run_task_state_change_callbacks(task, "on_retry_callback", context,
log)
+ from airflow.sdk.exceptions import AirflowFailException
+
try:
- get_listener_manager().hook.on_task_instance_failed(
- previous_state=TaskInstanceState.RUNNING, task_instance=ti,
error=error
+ _run_task_state_change_callbacks(task, "on_retry_callback",
context, log)
+ except AirflowFailException as fail_exc:
+ # User explicitly asked to fail without retrying from inside
on_retry_callback.
+ # Promote the state to FAILED, replace any pending RetryTask with
TaskState(FAILED),
+ # and run the failure-path finalizers. See #60172.
+ log.info("AirflowFailException raised in on_retry_callback;
failing task without retry")
+ state = TaskInstanceState.FAILED
+ ti.state = state
+ error = fail_exc
+ context["exception"] = fail_exc
+ ti.end_date = datetime.now(tz=timezone.utc)
+ msg = TaskState(
+ state=TaskInstanceState.FAILED,
+ end_date=ti.end_date,
+ rendered_map_index=ti.rendered_map_index,
+ )
+ _run_task_state_change_callbacks(task, "on_failure_callback",
context, log)
+ _handle_failure_notifications(
+ task=task,
+ ti=ti,
+ context=context,
+ error=error,
+ log=log,
+ send_email=task.email_on_failure,
+ )
Review Comment:
Minor formatting nit, not blocking, I would add some spacing around here to
separate different parts of code to make it more readable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]