SameerMesiah97 commented on code in PR #68595:
URL: https://github.com/apache/airflow/pull/68595#discussion_r3423089264


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3690,16 +3694,24 @@ def _cleanup_orphaned_asset_state_store(*, session: 
Session) -> None:
         """
         Delete asset_state_store rows for assets no longer active in any Dag.
 
-        When _orphan_unreferenced_assets removes an asset from asset_active, 
its
-        asset_state_store rows become unreachable — no task can write to them 
anymore.
-        This runs in the same pass as asset orphanage to keep the table clean.
+        Bounded to ORPHANED_ASSET_STATE_STORE_CLEANUP_BATCH_SIZE orphaned 
assets per
+        tick so a large backlog cannot become one unbounded delete in the 
scheduler
+        loop; the remainder drains on later orphanage ticks.
         """
         active_asset_ids = select(AssetModel.id).join(
             AssetActive,
             (AssetActive.name == AssetModel.name) & (AssetActive.uri == 
AssetModel.uri),
         )
+        orphaned_asset_ids = session.scalars(
+            select(AssetStateStoreModel.asset_id)
+            .where(AssetStateStoreModel.asset_id.not_in(active_asset_ids))
+            .distinct()

Review Comment:
   I am just whether `distinct` is needed here as I imagine that your 
implementation is inteded to address a massive accumulation of orphaned assets 
and it can get very expensive in those scenarios. 



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -160,6 +160,10 @@
 # safety bound, not a behavioural knob operators need to tune.
 MAX_PARTITION_DAG_RUNS_PER_LOOP = 500
 
+# Cap on orphaned assets whose asset_state_store rows are deleted per scheduler
+# orphanage tick; the remainder drain on later ticks. Mirrors 
MAX_PARTITION_DAG_RUNS_PER_LOOP.
+ORPHANED_ASSET_STATE_STORE_CLEANUP_BATCH_SIZE = 500

Review Comment:
   Why 500? Is there an empirical reason for this or was this number chosen 
based on some heuristic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to