amoghrajesh commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3206773888


##########
airflow-core/src/airflow/cli/cli_config.py:
##########
@@ -1519,6 +1519,19 @@ class GroupCommand(NamedTuple):
         args=(ARG_OUTPUT, ARG_VERBOSE),
     ),
 )
+STATE_STORE_COMMANDS = (
+    ActionCommand(
+        name="cleanup",
+        help="Remove expired task state rows via the configured state backend",

Review Comment:
   Handled in df379c50e6



##########
airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_state_and_asset_state_tables.py:
##########
@@ -65,6 +65,11 @@ def upgrade():
         sa.Column("run_id", StringID(), nullable=False),
         sa.Column("value", sa.Text().with_variant(mysql.MEDIUMTEXT(), 
"mysql"), nullable=False),
         sa.Column("updated_at", UtcDateTime(), nullable=False),
+        # Optional early-expiry override. When set, GC deletes this row when 
expires_at < now()
+        # even if updated_at is recent. NULL means no early expiry — the row 
is still cleaned
+        # up by the global updated_at + default_retention_days check. 
Populated via

Review Comment:
   Handled in df379c50e6



##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +259,84 @@ def _clear_asset_state(self, scope: AssetScope, *, 
session: Session) -> None:
             )
         )
 
+    def cleanup(self) -> None:
+        """
+        Remove expired task state rows.
+
+        Reads ``[state_store] default_retention_days`` and ``[state_store] 
state_cleanup_batch_size``
+        from config. Each pass runs in its own transaction so partial progress 
is committed even if a
+        later pass fails. Each pass is batched to avoid long-running locks on 
the table.
+
+        Two passes:
+        a. Rows where updated_at < now() - default_retention_days (global 
retention)
+        b. Rows where expires_at < now() (per-key early expiry set by the 
operator)
+        """
+        retention_days = conf.getint("state_store", "default_retention_days")
+        batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+        now = timezone.utcnow()
+        older_than = now - timedelta(days=retention_days) if retention_days > 
0 else None
+
+        pk_cols = (
+            TaskStateModel.dag_run_id,
+            TaskStateModel.task_id,
+            TaskStateModel.map_index,
+            TaskStateModel.key,
+        )

Review Comment:
   Handled in df379c50e6



##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +259,84 @@ def _clear_asset_state(self, scope: AssetScope, *, 
session: Session) -> None:
             )
         )
 
+    def cleanup(self) -> None:
+        """
+        Remove expired task state rows.
+
+        Reads ``[state_store] default_retention_days`` and ``[state_store] 
state_cleanup_batch_size``
+        from config. Each pass runs in its own transaction so partial progress 
is committed even if a
+        later pass fails. Each pass is batched to avoid long-running locks on 
the table.
+
+        Two passes:
+        a. Rows where updated_at < now() - default_retention_days (global 
retention)
+        b. Rows where expires_at < now() (per-key early expiry set by the 
operator)
+        """
+        retention_days = conf.getint("state_store", "default_retention_days")
+        batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+        now = timezone.utcnow()
+        older_than = now - timedelta(days=retention_days) if retention_days > 
0 else None
+
+        pk_cols = (
+            TaskStateModel.dag_run_id,
+            TaskStateModel.task_id,
+            TaskStateModel.map_index,
+            TaskStateModel.key,
+        )
+
+        def _delete_batched(where_clause) -> int:
+            total = 0
+            while True:
+                with create_session() as session:

Review Comment:
   Handled in df379c50e6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to