potiuk commented on code in PR #28900:
URL: https://github.com/apache/airflow/pull/28900#discussion_r1311295636


##########
airflow/models/taskinstance.py:
##########
@@ -1978,257 +2668,88 @@ def handle_failure(
 
         task: BaseOperator | None = None
         try:
-            if getattr(self, "task", None) and context:
-                task = self.task.unmap((context, session))
+            if getattr(ti, "task", None) and context:
+                task = ti.task.unmap((context, session))
         except Exception:
-            self.log.error("Unable to unmap task to determine if we need to 
send an alert email")
+            cls.logger().error("Unable to unmap task to determine if we need 
to send an alert email")
 
-        if force_fail or not self.is_eligible_to_retry():
-            self.state = TaskInstanceState.FAILED
+        if force_fail or not ti.is_eligible_to_retry():
+            ti.state = TaskInstanceState.FAILED
             email_for_state = operator.attrgetter("email_on_failure")
             callbacks = task.on_failure_callback if task else None
-            callback_type = "on_failure"
 
             if task and task.dag and task.dag.fail_stop:
-                _stop_remaining_tasks(self=self, session=session)
+                _stop_remaining_tasks(task_instance=ti, session=session)
         else:
-            if self.state == TaskInstanceState.QUEUED:
-                # We increase the try_number so as to fail the task if it 
fails to start after sometime
-                self._try_number += 1
-            self.state = TaskInstanceState.UP_FOR_RETRY
+            if ti.state == TaskInstanceState.QUEUED:
+                # We increase the try_number to fail the task if it fails to 
start after sometime
+                ti._try_number += 1
+            ti.state = State.UP_FOR_RETRY
             email_for_state = operator.attrgetter("email_on_retry")
             callbacks = task.on_retry_callback if task else None
-            callback_type = "on_retry"
 
-        self._log_state("Immediate failure requested. " if force_fail else "")
-        if task and email_for_state(task) and task.email:
-            try:
-                self.email_alert(error, task)
-            except Exception:
-                self.log.exception("Failed to send email to: %s", task.email)
+        return {
+            "ti": ti,
+            "email_for_state": email_for_state,
+            "task": task,
+            "callbacks": callbacks,
+            "context": context,
+        }
 
-        if callbacks and context:
-            self._run_finished_callback(callbacks, context, callback_type)
+    @staticmethod
+    @internal_api_call
+    @provide_session
+    def save_to_db(ti: TaskInstance | TaskInstancePydantic, session: Session = 
NEW_SESSION):

Review Comment:
   This is the last one, I think where TaskInstans is passed as parameter of 
internal API, and I think it shoudl done differently. I believe the original 
code in failure only updates 3 fields: end_date (and duration), state, 
try_number. 
   
   So what I think should happen here... we should likely have an internal_api 
method of the sort of:
   
   ```
   def finish_task(dag_id, task_id, execution_date, state):
   ```
   and call this method. Because that's what effectively that method does. We 
don't even have to pass "end_date" (it's always now) and maybe that will lead 
to removal of the `set_end_date` eventually? 
   
   I think this is the "logical" operation we are doing here and the 
internal_api_call should be really about that - finishing task. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to