shahar1 commented on code in PR #41429:
URL: https://github.com/apache/airflow/pull/41429#discussion_r1717293940


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1921,21 +1921,23 @@ def adopt_or_reset_orphaned_tasks(self, session: 
Session = NEW_SESSION) -> int:
     @provide_session
     def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
         """Mark any "deferred" task as failed if the trigger or execution 
timeout has passed."""
-        num_timed_out_tasks = session.execute(
-            update(TI)
-            .where(
-                TI.state == TaskInstanceState.DEFERRED,
-                TI.trigger_timeout < timezone.utcnow(),
-            )
-            .values(
-                state=TaskInstanceState.SCHEDULED,
-                next_method="__fail__",
-                next_kwargs={"error": "Trigger/execution timeout"},
-                trigger_id=None,
-            )
-        ).rowcount
-        if num_timed_out_tasks:
-            self.log.info("Timed out %i deferred tasks without fired 
triggers", num_timed_out_tasks)
+        for attempt in run_with_db_retries(logger=self.log):
+            with attempt:

Review Comment:
   Could you please add a test for this scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to