amoghrajesh commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1895919981


##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -714,6 +718,14 @@ def _handle_request(self, msg: ToSupervisor, log: 
FilteringBoundLogger):
         if isinstance(msg, TaskState):
             self._terminal_state = msg.state
             self._task_end_time_monotonic = time.monotonic()
+            if msg.task_retries:
+                self.client.task_instances.finish(
+                    id=self.id,
+                    state=self.final_state,
+                    when=datetime.now(tz=timezone.utc),
+                    task_retries=msg.task_retries,
+                )
+                self._should_retry = True

Review Comment:
   We have this in the task runner:
   ```
           msg = TaskState(state=TerminalTIState.FAILED, 
end_date=datetime.now(tz=timezone.utc))
           if not getattr(ti, "task", None):
               # We do not know about retries, let's mark it -1, so that the 
execution api can make a guess
               msg.task_retries = -1
           else:
               # `None` indicates no retries provided, the default is anyway 0 
which evaluates to false
               msg.task_retries = ti.task.retries or None
   ```
   
   The task runner sets the value to -1 if it didn't have a "task", asking the 
server to guess it, if it sets a non `None`, it wants to retry.
   
   Now when the request comes to supervisor, this part comes in:
   ```
               if msg.task_retries:
                   self.client.task_instances.finish(
                       id=self.id,
                       state=self.final_state,
                       when=datetime.now(tz=timezone.utc),
                       task_retries=msg.task_retries,
                   )
                   self._should_retry = True
   ```
   
   This simply means, if I HAVE to retry, let me send a `finish` call, but with 
retries defined. Then we set a variable `self._should_retry` to indicate that 
retry request has been sent, so that we dont send it in `wait()`, like this:
   ```
           if self.final_state in TerminalTIState and not self._should_retry:
               self.client.task_instances.finish(
                   id=self.id, state=self.final_state, 
when=datetime.now(tz=timezone.utc), task_retries=None
               )
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to