SameerMesiah97 commented on code in PR #68595:
URL: https://github.com/apache/airflow/pull/68595#discussion_r3431474614
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3690,16 +3694,24 @@ def _cleanup_orphaned_asset_state_store(*, session:
Session) -> None:
"""
Delete asset_state_store rows for assets no longer active in any Dag.
- When _orphan_unreferenced_assets removes an asset from asset_active,
its
- asset_state_store rows become unreachable — no task can write to them
anymore.
- This runs in the same pass as asset orphanage to keep the table clean.
+ Bounded to ORPHANED_ASSET_STATE_STORE_CLEANUP_BATCH_SIZE orphaned
assets per
+ tick so a large backlog cannot become one unbounded delete in the
scheduler
+ loop; the remainder drains on later orphanage ticks.
"""
active_asset_ids = select(AssetModel.id).join(
AssetActive,
(AssetActive.name == AssetModel.name) & (AssetActive.uri ==
AssetModel.uri),
)
+ orphaned_asset_ids = session.scalars(
+ select(AssetStateStoreModel.asset_id)
+ .where(AssetStateStoreModel.asset_id.not_in(active_asset_ids))
+ .distinct()
Review Comment:
@steveahnahn
I see your point. But let's consider a situation where the 500
`AssetStateStoreModel` records you are fetching contain at least 1 duplicate
Asset ID (which is linked to 2 records). If you are de-duplicating the
retrieved assets using a set operation, won't this result in a number of assets
which is lower than 500? Again, if the limit is just for each fetch, then that
is defensible. But in that case, I would make it clear that the batch size
applies to the `AssetStateStoreModel` records fetched on each iteration. Not
the assets cleaned per loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]