vchiapaikeo commented on PR #33172:
URL: https://github.com/apache/airflow/pull/33172#issuecomment-1668573717
Hmm yes, that makes sense. Would adding the below condition be sufficient to
look at TI's queued by the current replica?
```
TI.queued_by_job_id == self.job.id,
```
So then the method would look something like this?
```py
@provide_session
def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
"""Mark any "deferred" task as failed if the trigger or execution
timeout has passed."""
self.log.debug("Calling SchedulerJob.check_trigger_timeouts method")
try:
num_timed_out_tasks = session.execute(
update(TI)
.where(
TI.state == TaskInstanceState.DEFERRED,
TI.trigger_timeout < timezone.utcnow(),
# Only perform update against the ones that were queued
by this scheduler
TI.queued_by_job_id == self.job.id,
)
.values(
state=TaskInstanceState.SCHEDULED,
next_method="__fail__",
next_kwargs={"error": "Trigger/execution timeout"},
trigger_id=None,
)
).rowcount
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired
triggers", num_timed_out_tasks)
except OperationalError as e:
session.rollback()
self.log.warning(
f"Failed to check trigger timeouts due to {e}. Will
reattempt at next scheduled check"
)
```
The problem I see here is that there may be lingering triggers that do not
get cleaned up if replicas get dropped. Maybe this overcomplicates the
problem...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]