Lee-W commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3217400057
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3025,6 +3025,24 @@ state_store:
type: string
example: "mypackage.state.CustomStateBackend"
default: "airflow.state.metastore.MetastoreStateBackend"
+ default_retention_days:
+ description: |
+ Number of days to retain task_state rows after their last update.
+ Rows older than this are removed by the scheduler's periodic cleanup.
+ This config does not affect asset_state rows.
Review Comment:
if it does not affect asset state, should we have different config for each?
##########
airflow-core/src/airflow/cli/commands/state_store_command.py:
##########
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+log = logging.getLogger(__name__)
+
+# Other state operations (list, get, delete per key) will be added here in the
future.
+
+
+def cleanup(args) -> None:
+ """Remove expired task state rows via the configured state backend."""
+ from airflow.state import get_state_backend
+ from airflow.state.metastore import MetastoreStateBackend
Review Comment:
Why do we do inline imports here?
##########
airflow-core/src/airflow/cli/commands/state_store_command.py:
##########
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+log = logging.getLogger(__name__)
+
+# Other state operations (list, get, delete per key) will be added here in the
future.
+
+
+def cleanup(args) -> None:
+ """Remove expired task state rows via the configured state backend."""
+ from airflow.state import get_state_backend
+ from airflow.state.metastore import MetastoreStateBackend
+
+ backend = get_state_backend()
+
+ if args.dry_run:
+ if isinstance(backend, MetastoreStateBackend):
+ summary = backend._dry_run_summary()
+ expired = summary["expired"]
+ if not expired:
+ print("Nothing to delete.")
+ return
+ print(f"Would delete {len(expired)} task state row(s):\n")
+ for dag_id, run_id, task_id, map_index, key in expired:
+ print(
+ f" DAG {dag_id!r}, run {run_id!r}, task {task_id!r},
map_index {map_index!r}, key {key!r}"
Review Comment:
```suggestion
f" Dag {dag_id!r}, run {run_id!r}, task {task_id!r},
map_index {map_index!r}, key {key!r}"
```
##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -252,6 +272,51 @@ def _clear_asset_state(self, scope: AssetScope, *,
session: Session) -> None:
)
)
+ def cleanup(self) -> None:
+ """
+ Remove expired task state rows.
+
+ ``expires_at`` is set at write time on every ``set()`` call, so
cleanup is a single
+ ``WHERE expires_at < now()`` pass. Rows with ``expires_at=NULL``
(default_retention_days=0)
+ are never deleted. Batching is configurable via ``[state_store]
state_cleanup_batch_size``.
+ """
+ batch_size = conf.getint("state_store", "state_cleanup_batch_size")
+ now = timezone.utcnow()
+
+ def _delete_batched(where_clause) -> int:
+ total = 0
+ with create_session() as session:
+ while True:
+ id_query = select(TaskStateModel.id).where(where_clause)
+ if batch_size > 0:
+ id_query = id_query.limit(batch_size)
+ ids = session.scalars(id_query).all()
+ if not ids:
+ break
+
session.execute(delete(TaskStateModel).where(TaskStateModel.id.in_(ids)))
+ session.commit()
+ total += len(ids)
+ if batch_size <= 0 or len(ids) < batch_size:
+ break
+ return total
+
+ deleted = _delete_batched(TaskStateModel.expires_at < now)
+ log.info("Deleted expired task_state rows", rows_deleted=deleted)
+
+ def _dry_run_summary(self) -> dict[str, list]:
Review Comment:
```suggestion
def _summary_dry_run_(self) -> dict[str, list]:
```
since it's not a property and probably shoudn't be one
##########
airflow-core/tests/unit/state/test_metastore.py:
##########
@@ -234,6 +239,112 @@ def test_clear_with_all_map_indices_flag_wipes_wide(
assert backend.get(scope0, "job_id", session=session) is None
assert backend.get(scope1, "job_id", session=session) is None
+ def test_set_populates_expires_at(
+ self, session: Session, backend: MetastoreStateBackend, dag_run: DagRun
+ ):
+ """set() always populates expires_at so cleanup has a single pass."""
+ scope = TaskScope(dag_id=DAG_ID, run_id=RUN_ID, task_id=TASK_ID)
+ backend.set(scope, "job_id", "app_1234", session=session)
+ session.flush()
+
+ row = session.scalar(select(TaskStateModel).where(TaskStateModel.key
== "job_id"))
+ assert row is not None
Review Comment:
it is possible for us to check the attributes instead of just existence
instead
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3200,6 +3215,21 @@ def _activate_assets_generate_warnings() ->
Iterator[tuple[str, str]]:
session.add(warning)
existing_warned_dag_ids.add(warning.dag_id)
+ @staticmethod
+ def _cleanup_orphaned_asset_state(*, session: Session) -> None:
+ """
+ Delete asset_state rows for assets no longer active in any DAG.
Review Comment:
```suggestion
Delete asset_state rows for assets no longer active in any Dag.
```
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3025,6 +3025,24 @@ state_store:
type: string
example: "mypackage.state.CustomStateBackend"
default: "airflow.state.metastore.MetastoreStateBackend"
+ default_retention_days:
+ description: |
+ Number of days to retain task_state rows after their last update.
Review Comment:
```suggestion
Number of days to retain task state after their last update.
```
Since it's user-facing, let's not emphasize the DB aspect.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]