vincbeck commented on code in PR #28900:
URL: https://github.com/apache/airflow/pull/28900#discussion_r1311990437
##########
airflow/models/taskinstance.py:
##########
@@ -1978,257 +2668,88 @@ def handle_failure(
task: BaseOperator | None = None
try:
- if getattr(self, "task", None) and context:
- task = self.task.unmap((context, session))
+ if getattr(ti, "task", None) and context:
+ task = ti.task.unmap((context, session))
except Exception:
- self.log.error("Unable to unmap task to determine if we need to
send an alert email")
+ cls.logger().error("Unable to unmap task to determine if we need
to send an alert email")
- if force_fail or not self.is_eligible_to_retry():
- self.state = TaskInstanceState.FAILED
+ if force_fail or not ti.is_eligible_to_retry():
+ ti.state = TaskInstanceState.FAILED
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None
- callback_type = "on_failure"
if task and task.dag and task.dag.fail_stop:
- _stop_remaining_tasks(self=self, session=session)
+ _stop_remaining_tasks(task_instance=ti, session=session)
else:
- if self.state == TaskInstanceState.QUEUED:
- # We increase the try_number so as to fail the task if it
fails to start after sometime
- self._try_number += 1
- self.state = TaskInstanceState.UP_FOR_RETRY
+ if ti.state == TaskInstanceState.QUEUED:
+ # We increase the try_number to fail the task if it fails to
start after sometime
+ ti._try_number += 1
+ ti.state = State.UP_FOR_RETRY
email_for_state = operator.attrgetter("email_on_retry")
callbacks = task.on_retry_callback if task else None
- callback_type = "on_retry"
- self._log_state("Immediate failure requested. " if force_fail else "")
- if task and email_for_state(task) and task.email:
- try:
- self.email_alert(error, task)
- except Exception:
- self.log.exception("Failed to send email to: %s", task.email)
+ return {
+ "ti": ti,
+ "email_for_state": email_for_state,
+ "task": task,
+ "callbacks": callbacks,
+ "context": context,
+ }
- if callbacks and context:
- self._run_finished_callback(callbacks, context, callback_type)
+ @staticmethod
+ @internal_api_call
+ @provide_session
+ def save_to_db(ti: TaskInstance | TaskInstancePydantic, session: Session =
NEW_SESSION):
Review Comment:
Thanks! I just did it. The method `fetch_handle_failure_context` is an
internal API and has the task instance as parameter? Should I modify as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]