potiuk commented on PR #33172:
URL: https://github.com/apache/airflow/pull/33172#issuecomment-1668623843
> Hmm yes, that makes sense. Would adding the below condition be sufficient
to look at TI's queued by the current replica?
>
> ```
> TI.queued_by_job_id == self.job.id,
> ```
>
> So then the method would look something like this?
>
> ```python
> @provide_session
> def check_trigger_timeouts(self, session: Session = NEW_SESSION) ->
None:
> """Mark any "deferred" task as failed if the trigger or execution
timeout has passed."""
> self.log.debug("Calling SchedulerJob.check_trigger_timeouts
method")
>
> try:
> num_timed_out_tasks = session.execute(
> update(TI)
> .where(
> TI.state == TaskInstanceState.DEFERRED,
> TI.trigger_timeout < timezone.utcnow(),
> # Only perform update against the ones that were
queued by this scheduler
> TI.queued_by_job_id == self.job.id,
> )
> .values(
> state=TaskInstanceState.SCHEDULED,
> next_method="__fail__",
> next_kwargs={"error": "Trigger/execution timeout"},
> trigger_id=None,
> )
> ).rowcount
>
> if num_timed_out_tasks:
> self.log.info("Timed out %i deferred tasks without fired
triggers", num_timed_out_tasks)
>
> except OperationalError as e:
> session.rollback()
> self.log.warning(
> f"Failed to check trigger timeouts due to {e}. Will
reattempt at next scheduled check"
> )
> ```
>
> The problem I see here is that there may be lingering triggers that do not
get cleaned up if replicas get dropped. Maybe this overcomplicates the
problem...
Nope. It's different. IMHO you should not limit it by job_id, but you should
add dag_run "FOR UPDATE" section with SKIP_LOCKED condition - basically to join
the task_instance with dag_run they beling to, and make sure that dag_run is
locked "for update". I am not super expert in the sqlalchemy queries to be able
to tell exactly how it should be done, my knowledge is more based on the
"relational DB knowledge" I have, and learning from @ashb's
https://www.youtube.com/watch?v=DYC4-xElccE talk on how scheduler works
internally - but there are quite a few such sqlalchemy queries in Airflow code
that already do it I believe.
Maybe others who know better could chime in as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]