pankajastro commented on code in PR #32000:
URL: https://github.com/apache/airflow/pull/32000#discussion_r1244825723


##########
airflow/models/dag.py:
##########
@@ -2690,22 +2690,50 @@ def add_logger_if_needed(ti: TaskInstance):
         )
 
         tasks = self.task_dict
+        # Special case when retry is set and DAG is paused
+        task_try_number = {}
+
         self.log.debug("starting dagrun")
         # Instead of starting a scheduler, we run the minimal loop possible to 
check
         # for task readiness and dependency management. This is notably faster
         # than creating a BackfillJob and allows us to surface logs to the user
         while dr.state == State.RUNNING:
             schedulable_tis, _ = dr.update_state(session=session)
-            try:
-                for ti in schedulable_tis:
+
+            for ti in schedulable_tis:
+                try:
                     add_logger_if_needed(ti)
                     ti.task = tasks[ti.task_id]
-                    _run_task(ti, session=session)
-            except Exception:
-                self.log.info(
-                    "Task failed. DAG will continue to run until finished and 
be marked as failed.",
-                    exc_info=True,
-                )
+                    if ti.task_id not in task_try_number:
+                        task_try_number[ti.task_id] = 0
+                    ti = _run_task(ti, session=session)
+                except Exception:
+                    if ti.state == State.UP_FOR_RETRY:
+                        try_number = task_try_number.get(ti.task_id, 0)
+                        if try_number > ti.max_tries:
+                            ti.set_state(State.FAILED)
+                        else:
+                            task_try_number[ti.task_id] = try_number + 1
+                    self.log.info(
+                        "Task failed. DAG will continue to run until finished 
and be marked as failed.",
+                        exc_info=True,
+                    )
+            for ti in dr.get_task_instances():
+                # Special case TI resume from deferred state
+                if ti.state == State.SCHEDULED:

Review Comment:
   first we are calling `_run_task` L2709 for a task which set the task in the 
running state. Here, since a task has run previously and now is in a scheduled 
state so I'm assuming it is a deferrable task, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to