We also need to account for when task instances are cleared. If a
task-instance is cleared out of many other task-instances in a running
dagrun, should we change the queued_at for the dagrun?

On Mon, 15 Dec 2025 at 18:17, Ferruzzi, Dennis <[email protected]> wrote:

> Would my "temporary" fix work for now, or is there some corner I'm not
> seeing around?
>
>
>  - ferruzzi
>
> "Nothing is more permanent than a temporary solution"
>
>
>
> ________________________________
> From: Daniel Standish via dev <[email protected]>
> Sent: Saturday, December 13, 2025 12:21 PM
> To: [email protected]
> Cc: Daniel Standish
> Subject: RE: [EXT] DagRun queued_at timestamp discussion
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe.
> Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez
> pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que
> le contenu ne présente aucun risque.
>
>
>
> The main cause here is that we reuse our dag runs, when we shouldn't.
>
> But resolving that is a big project.
>
> On Fri, Dec 12, 2025 at 4:40 PM Ferruzzi, Dennis <[email protected]>
> wrote:
>
> > I'll amend my proposal a little with a less-intrusive proposal.  We can
> > leave the RUNNING  -> RUNNING transition alone entirely, revert my
> TERMINAL
> > -> QUEUED transition fix, and make the following change in the
> > clear_task_instances method [1] instead:
> >
> > ```
> >
> > for dr in drs:
> >     if dr.state in State.finished_dr_states:
> >         # ... existing code ...
> >
> >     # Always update queued_at when clearing, regardless of the old state
> >     if dag_run_state == DagRunState.QUEUED or dr.state ==
> > DagRunState.RUNNING:
> >         dr.queued_at = timezone.utcnow()
> >         dr.clear_number += 1
> > ```
> >
> >
> > That would be super clean and the only effective change would be that
> > queued_at will always reflect the most recent time it was marked as ready
> > for scheduling, which I feel was the intent of that column (but I may be
> > wrong)
> >
> >
> >
> > [1]
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L192
> >
> >
> >  - ferruzzi
> >
> >
> > ________________________________
> > From: Stefan Wang <[email protected]>
> > Sent: Wednesday, December 10, 2025 9:57 PM
> > To: [email protected]
> > Subject: RE: [EXT] DagRun queued_at timestamp discussion
> >
> > CAUTION: This email originated from outside of the organization. Do not
> > click links or open attachments unless you can confirm the sender and
> know
> > the content is safe.
> >
> >
> >
> > AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe.
> > Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne
> pouvez
> > pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain
> que
> > le contenu ne présente aucun risque.
> >
> >
> >
> > Hi Dennis,
> >
> > Thanks for the great investigation work and for opening this discussion!
> >
> > PR #59066 was a clean fix, and I really appreciated how you dug into the
> > SQLAlchemy behavior to understand what was happening. Here are my
> thoughts
> > (though I'm relatively new to the community and still catching up on
> > discussions pre-2023, so please take my input as just one perspective) :p
> > The PR solves the immediate problem well. Updating queued_at when
> clearing
> > finished runs makes sense semantically and fixes the Deadline Alert
> issue.
> > The explicit assignment is clear and sidesteps the property-setter
> > persistence issue nicely.
> >
> > 1. RUNNING → RERUNNING vs. RUNNING → QUEUED
> > I can see why this feels unintuitive. My guess is RERUNNING exists to
> > preserve "this was already executing" information, which might be useful
> > for monitoring/debugging?, but I don't have context on the original
> design
> > decisions here.
> > This feels like a bigger state machine question—would be interesting to
> > see thoughts from the community. IIUC, Deadline Alerts should be okay if
> > this stays as-is for now though.
> >
> > 2. Should queued_at update for RERUNNING too?
> > PR #59066 updated queued_at for finished → QUEUED but not for RUNNING →
> > RERUNNING. For Deadline Alert consistency, it probably should update in
> > both cases. The semantic tension is that RERUNNING doesn't actually go
> > through a "queued" stage, but pragmatically it's "starting the execution
> > attempt again," so resetting the timer makes sense to me.
> >
> > 3. queued_at vs. last_queued_at Column
> > In a "perfect world" I'd prefer renaming the existing queued_at column to
> > something like last_queued_at, submitted_at, or last_attempt_at for
> clearer
> > semantics. However, I can see how that kind of breaking change would be a
> > migration headache without strong enough justification. Keeping queued_at
> > as-is seems pragmatic.
> >
> > 4. Queue Time Arrays
> > Agreed this could be a bit over-engineering at this stage.
> >
> > Thanks,
> > Stefan
> >
> > > On Dec 9, 2025, at 2:04 PM, Ferruzzi, Dennis <[email protected]>
> > wrote:
> > >
> > > There's a good bit of context here to get to where I'm going, but the
> > TLDR is "Should the DagRun's queued_at time get updated when a user
> clears
> > the Dag run?"
> > >
> > > Initial Question:  When a user has a Deadline using the
> > DeadlineReference.DAGRUN_QUEUED_AT and the Dag run gets cleared,
> shouldn't
> > the timer restart?
> > > Initial Answer:  Surely.  If my deadline says "email me if this has
> been
> > queued for more than a half hour", then I would expect re-queuing the run
> > should reset that timer.
> > >
> > > I was surprised to learn that it doesn't, so I did some digging and
> > found a few related bits that feel odd to me.  I'm looking to start up a
> > discussion about whether they are intentional and needed or if they can
> > safely be fixed. Given the code structure, they appear to be intentional
> > decisions, so I want to make sure I'm not missing context before changing
> > things.
> > >
> > > What's currently happening:
> > >
> > > When a user clicks on the "clear dag run" button in the UI, we land in
> > models/taskinstance.py::clear_task_instance [1].  If the run was in a
> > terminal state then set the state to QUEUED, if the run was RUNNING then
> > skip the queue and go straight to RERUNNING.  Neither case changes the
> > queued_at timestamp. [2]
> > >
> > > I did some digging and it looks like this goes back to a commit in 2023
> > [3] where there was an Issue [4] requesting that start_date should not
> get
> > changed (which I agree with) and I suspect that queued_at was just
> > collateral damage here and this is safe to fix, but I want to see if
> anyone
> > knows of a reason this should remain how it is.
> > >
> > > Discussion Points:
> > >
> > > 1. Why does a finished run get sent to the queue but a RUNNING dag run
> > skip the queue when it is cleared?  That seems unintuitive and I would
> > expect it to get tossed to the queue for the scheduler to pick back up
> > regardless of the current state.
> > > 2. Bikeshed time!  IF a change is needed (TBD), then should we update
> > queued_at or add a new column perhaps last_queued_at to prevent breaking
> > anyone's workflow?   If we're adding a new column, should it be the
> > last_queued_at, or should it be an array showing every time it was
> > re-queued?  If the latter, do we add an API so users can access that?
> > >
> > > My thoughts and proposal:
> > >
> > > I feel like any time the run is cleared it should get tossed back on
> the
> > queue regardless of the current state, and doing so should update the
> > existing queued_at column.  It's simple, clean, and predictable.  If we
> > decide that is a breaking change then my backup proposal is always
> re-queue
> > and add a single-timestamp last_queued_at non-nullable column which
> > defaults to match queued_at at init and gets updated each time the run is
> > re-queued.
> > >
> > >
> > > [1]
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L192
> > > [2] This is now a lie, I changed the terminal state to update in
> > https://github.com/apache/airflow/pull/59066 but for now let's leave
> that
> > out of the discussion.  It may need to be reverted or modified depending
> on
> > how this discussion goes.
> > > [3] https://github.com/apache/airflow/pull/30125
> > > [4] https://github.com/apache/airflow/issues/30124
> > >
> > >
> > >
> > >
> > > - ferruzzi
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
> >
>

Reply via email to