amoghrajesh commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3231520604


##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +272,51 @@ def _clear_asset_state(self, scope: AssetScope, *, 
session: Session) -> None:
             )
         )
 
+    def cleanup(self) -> None:
+        """
+        Remove expired task state rows.
+
+        ``expires_at`` is set at write time on every ``set()`` call, so 
cleanup is a single
+        ``WHERE expires_at < now()`` pass. Rows with ``expires_at=NULL`` 
(default_retention_days=0)
+        are never deleted. Batching is configurable via ``[state_store] 
state_cleanup_batch_size``.
+        """
+        batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+        now = timezone.utcnow()
+
+        def _delete_batched(where_clause) -> int:
+            total = 0
+            with create_session() as session:
+                while True:
+                    id_query = select(TaskStateModel.id).where(where_clause)
+                    if batch_size > 0:
+                        id_query = id_query.limit(batch_size)
+                    ids = session.scalars(id_query).all()
+                    if not ids:
+                        break
+                    
session.execute(delete(TaskStateModel).where(TaskStateModel.id.in_(ids)))
+                    session.commit()
+                    total += len(ids)
+                    if batch_size <= 0 or len(ids) < batch_size:
+                        break
+            return total
+
+        deleted = _delete_batched(TaskStateModel.expires_at < now)

Review Comment:
   TBH, i would hold off for now — people can run a direct sql statements if 
needed, and any new key(s) that gets written will automatically pick up the new 
`default_retention_days` configured. If there is a clear demand for it we can 
add something like `airflow state-store set-expiry` when needed, but feels 
premature before we know how common that use case is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to