amoghrajesh commented on code in PR #66463:
URL: https://github.com/apache/airflow/pull/66463#discussion_r3217771553
##########
airflow-core/tests/unit/state/test_metastore.py:
##########
@@ -234,6 +239,112 @@ def test_clear_with_all_map_indices_flag_wipes_wide(
assert backend.get(scope0, "job_id", session=session) is None
assert backend.get(scope1, "job_id", session=session) is None
+ def test_set_populates_expires_at(
+ self, session: Session, backend: MetastoreStateBackend, dag_run: DagRun
+ ):
+ """set() always populates expires_at so cleanup has a single pass."""
+ scope = TaskScope(dag_id=DAG_ID, run_id=RUN_ID, task_id=TASK_ID)
+ backend.set(scope, "job_id", "app_1234", session=session)
+ session.flush()
+
+ row = session.scalar(select(TaskStateModel).where(TaskStateModel.key
== "job_id"))
+ assert row is not None
Review Comment:
Handled in 7427d04ed2
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3200,6 +3215,21 @@ def _activate_assets_generate_warnings() ->
Iterator[tuple[str, str]]:
session.add(warning)
existing_warned_dag_ids.add(warning.dag_id)
+ @staticmethod
+ def _cleanup_orphaned_asset_state(*, session: Session) -> None:
+ """
+ Delete asset_state rows for assets no longer active in any DAG.
Review Comment:
Handled in 7427d04ed2
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3025,6 +3025,24 @@ state_store:
type: string
example: "mypackage.state.CustomStateBackend"
default: "airflow.state.metastore.MetastoreStateBackend"
+ default_retention_days:
+ description: |
+ Number of days to retain task_state rows after their last update.
Review Comment:
Handled in 7427d04ed2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]