amoghrajesh commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3217316017


##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +272,51 @@ def _clear_asset_state(self, scope: AssetScope, *, 
session: Session) -> None:
             )
         )
 
+    def cleanup(self) -> None:
+        """
+        Remove expired task state rows.
+
+        ``expires_at`` is set at write time on every ``set()`` call, so 
cleanup is a single
+        ``WHERE expires_at < now()`` pass. Rows with ``expires_at=NULL`` 
(default_retention_days=0)
+        are never deleted. Batching is configurable via ``[state_store] 
state_cleanup_batch_size``.
+        """
+        batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+        now = timezone.utcnow()
+
+        def _delete_batched(where_clause) -> int:
+            total = 0
+            with create_session() as session:
+                while True:
+                    id_query = select(TaskStateModel.id).where(where_clause)
+                    if batch_size > 0:
+                        id_query = id_query.limit(batch_size)
+                    ids = session.scalars(id_query).all()
+                    if not ids:
+                        break
+                    
session.execute(delete(TaskStateModel).where(TaskStateModel.id.in_(ids)))
+                    session.commit()
+                    total += len(ids)
+                    if batch_size <= 0 or len(ids) < batch_size:
+                        break
+            return total
+
+        deleted = _delete_batched(TaskStateModel.expires_at < now)

Review Comment:
   `expires_at` is set once at write time in every `set()` call and is never 
updated independently of the row — there's no dependency on `updated_at` being 
in sync with it. If you call `set()` again on the same key, the upsert 
recalculates and overwrites both `updated_at` and `expires_at` together 
atomically.
   
   One legitimate edge case you may be pointing at: if a user starts with 
`default_retention_days=0, then later raises it to 30 days, those old NULL rows 
won't be picked up by the current WHERE `expires_at < now()` pass. We can add a 
second pass WHERE `expires_at` IS NULL AND `updated_at < now - 
default_retention_days` for that case. How does that sound?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to