vchiapaikeo commented on PR #33172:
URL: https://github.com/apache/airflow/pull/33172#issuecomment-1668573717

   Hmm yes, that makes sense. Would adding the below condition be sufficient to 
look at TI's queued by the current replica?
   
   ```
   TI.queued_by_job_id == self.job.id,
   ```
   
   So then the method would look something like this?
   
   ```py
       @provide_session
       def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
           """Mark any "deferred" task as failed if the trigger or execution 
timeout has passed."""
           self.log.debug("Calling SchedulerJob.check_trigger_timeouts method")
   
           try:
               num_timed_out_tasks = session.execute(
                   update(TI)
                   .where(
                       TI.state == TaskInstanceState.DEFERRED,
                       TI.trigger_timeout < timezone.utcnow(),
                       # Only perform update against the ones that were queued 
by this scheduler
                       TI.queued_by_job_id == self.job.id,
                   )
                   .values(
                       state=TaskInstanceState.SCHEDULED,
                       next_method="__fail__",
                       next_kwargs={"error": "Trigger/execution timeout"},
                       trigger_id=None,
                   )
               ).rowcount
   
               if num_timed_out_tasks:
                   self.log.info("Timed out %i deferred tasks without fired 
triggers", num_timed_out_tasks)
   
           except OperationalError as e:
               session.rollback()
               self.log.warning(
                   f"Failed to check trigger timeouts due to {e}. Will 
reattempt at next scheduled check"
               )
   ```
   
   The problem I see here is that there may be lingering triggers that do not 
get cleaned up if replicas get dropped. Maybe this overcomplicates the 
problem...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to