shahar1 opened a new pull request, #68749:
URL: https://github.com/apache/airflow/pull/68749
# Human Summary
closes: #39456
related: #39603
This PR fixes a bug where new asset-scheduled Dags consume historical asset
events that occurred before the Dag started referencing those assets.
# AI Summary
<details><summary>Click here</summary>
**Bug.** In `SchedulerJobRunner._create_dag_runs_asset_triggered`, the set
of asset events attached to a run is bounded below by the previous
asset-triggered run's `run_after`, falling back to `date.min` when there is no
previous run (`func.coalesce(cte.c.previous_dag_run_run_after, date.min)`). For
a Dag's very first run this means every asset event ever recorded for its
assets is consumed — so a Dag newly added to assets that already have history
reprocesses the entire backlog on its first run.
**Fix.** Add the Dag's earliest schedule-reference `created_at` to the
`coalesce` fallback chain, before `date.min`:
```python
AssetEvent.timestamp > func.coalesce(
cte.c.previous_dag_run_run_after,
select(func.min(DagScheduleAssetReference.created_at))
.where(DagScheduleAssetReference.dag_id == dag.dag_id)
.scalar_subquery(),
date.min,
)
```
On the first run the window is bounded at the moment the Dag started
scheduling on its assets; subsequent runs are unchanged (still bounded by the
previous run's `run_after`). `created_at` is reliable for this because schedule
references are updated in place during parsing
(`dag_processing/collection.py`), not deleted and recreated, so it survives
re-serialization.
**Scope.** Direct asset references only. The asset-**alias** path is
deliberately left on `date.min`: an alias consumer is expected to pick up
events attached to its alias regardless of timing (covered by
`test_create_dag_runs_asset_alias_with_asset_event_attached`).
This is the same fix idea as the stale 2.x PR #39603, re-expressed for the
current SQL-native query (a single `coalesce` over a CTE rather than a Python
`if/else` over a joined reference table; a scalar subquery is used instead of a
bare correlated column to avoid an implicit cartesian join).
**Validation.**
- New regression test
`test_new_asset_triggered_dag_ignores_events_before_creation` — fails on `main`
(consumes 2 events), passes here (consumes 1).
- Updated `test_create_dag_runs_assets`, which previously encoded the buggy
behavior.
- All asset-related scheduler tests pass.
</details>
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes — Claude Code (Opus 4.8, 1M context)
Generated-by: Claude Code (Opus 4.8, 1M context) following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]