TakawaAkirayo commented on code in PR #41429:
URL: https://github.com/apache/airflow/pull/41429#discussion_r1720747677
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1921,21 +1921,23 @@ def adopt_or_reset_orphaned_tasks(self, session:
Session = NEW_SESSION) -> int:
@provide_session
def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
"""Mark any "deferred" task as failed if the trigger or execution
timeout has passed."""
- num_timed_out_tasks = session.execute(
- update(TI)
- .where(
- TI.state == TaskInstanceState.DEFERRED,
- TI.trigger_timeout < timezone.utcnow(),
- )
- .values(
- state=TaskInstanceState.SCHEDULED,
- next_method="__fail__",
- next_kwargs={"error": "Trigger/execution timeout"},
- trigger_id=None,
- )
- ).rowcount
- if num_timed_out_tasks:
- self.log.info("Timed out %i deferred tasks without fired
triggers", num_timed_out_tasks)
+ for attempt in run_with_db_retries(logger=self.log):
+ with attempt:
Review Comment:
Hi @shahar1 I already added the UT including positive case(retry on error,
succeed eventually) and negative case(no retry, fail).
I don't know how to produce a deadlock exception locally in unit tests, so
I’m just mocking an exception at the session level to simulate the scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]