This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 439105f33b3 Add `task_store` table to `airflow db clean` mechanism 
(#68218)
439105f33b3 is described below

commit 439105f33b3644865c4a58b591ea9c16b4281a7c
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Jun 11 10:54:37 2026 +0530

    Add `task_store` table to `airflow db clean` mechanism (#68218)
---
 airflow-core/src/airflow/utils/db_cleanup.py     |   7 +-
 airflow-core/tests/unit/utils/test_db_cleanup.py | 108 +++++++++++++++++++++--
 2 files changed, 105 insertions(+), 10 deletions(-)

diff --git a/airflow-core/src/airflow/utils/db_cleanup.py 
b/airflow-core/src/airflow/utils/db_cleanup.py
index 8730a883a12..f0a6c2fe01a 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -140,7 +140,7 @@ config_list: list[_TableConfig] = [
         keep_last=True,
         keep_last_filters=[column("run_type") != DagRunType.MANUAL],
         keep_last_group_by=["dag_id"],
-        dependent_tables=["task_instance", "deadline"],
+        dependent_tables=["task_instance", "task_store", "deadline"],
     ),
     _TableConfig(table_name="asset_event", recency_column_name="timestamp", 
dag_id_column_name="dag_id"),
     _TableConfig(table_name="import_error", recency_column_name="timestamp"),
@@ -155,6 +155,11 @@ config_list: list[_TableConfig] = [
     _TableConfig(
         table_name="task_instance_history", recency_column_name="start_date", 
dag_id_column_name="dag_id"
     ),
+    _TableConfig(
+        table_name="task_store",
+        recency_column_name="expires_at",
+        dag_id_column_name="dag_id",
+    ),
     _TableConfig(table_name="task_reschedule", 
recency_column_name="start_date", dag_id_column_name="dag_id"),
     _TableConfig(table_name="xcom", recency_column_name="timestamp", 
dag_id_column_name="dag_id"),
     _TableConfig(table_name="_xcom_archive", recency_column_name="timestamp", 
dag_id_column_name="dag_id"),
diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py 
b/airflow-core/tests/unit/utils/test_db_cleanup.py
index a2fd47c6853..d7b81db63b3 100644
--- a/airflow-core/tests/unit/utils/test_db_cleanup.py
+++ b/airflow-core/tests/unit/utils/test_db_cleanup.py
@@ -37,6 +37,7 @@ from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagbundle import DagBundleModel
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.task_store import TaskStoreModel
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
 from airflow.utils.db_cleanup import (
@@ -878,6 +879,94 @@ def create_tis(base_date, num_tis, 
run_type=DagRunType.SCHEDULED):
         session.commit()
 
 
[email protected]_test
+class TestTaskStoreCleanup:
+    def test_expired_rows_deleted(self):
+        cfg = config_dict["task_store"]
+        now = pendulum.now(tz="UTC")
+        past = now.subtract(days=30)
+        future = now.add(days=30)
+
+        with create_session() as session:
+            bundle = DagBundleModel(name="ts_test_bundle")
+            session.add(bundle)
+            session.flush()
+
+            dag = DAG(dag_id="ts_test_dag")
+            dm = DagModel(dag_id="ts_test_dag", bundle_name="ts_test_bundle")
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name="ts_test_bundle")
+
+            dag_run = DagRun(
+                "ts_test_dag",
+                run_id="ts_test_run",
+                run_type=DagRunType.SCHEDULED,
+                start_date=past,
+            )
+            session.add(dag_run)
+            session.flush()
+
+            expired = TaskStoreModel(
+                dag_run_id=dag_run.id,
+                task_id="t1",
+                map_index=-1,
+                key="job_id",
+                dag_id="ts_test_dag",
+                run_id="ts_test_run",
+                value="job-expired",
+                updated_at=past,
+                expires_at=past.subtract(days=1),
+            )
+            never_expire = TaskStoreModel(
+                dag_run_id=dag_run.id,
+                task_id="t1",
+                map_index=-1,
+                key="result",
+                dag_id="ts_test_dag",
+                run_id="ts_test_run",
+                value="job-never-expire",
+                updated_at=past,
+                expires_at=None,
+            )
+            not_yet_expired = TaskStoreModel(
+                dag_run_id=dag_run.id,
+                task_id="t1",
+                map_index=-1,
+                key="future_key",
+                dag_id="ts_test_dag",
+                run_id="ts_test_run",
+                value="job-future",
+                updated_at=past,
+                expires_at=future,
+            )
+            session.add_all([expired, never_expire, not_yet_expired])
+            session.commit()
+
+        cutoff = now.subtract(hours=1)
+        with create_session() as session:
+            _cleanup_table(
+                **cfg.__dict__,
+                clean_before_timestamp=cutoff,
+                dry_run=False,
+                verbose=False,
+                confirm=False,
+                skip_archive=True,
+                session=session,
+            )
+
+        with create_session() as session:
+            not_deleted = {
+                row.key
+                for row in session.scalars(
+                    select(TaskStoreModel).where(TaskStoreModel.dag_id == 
"ts_test_dag")
+                ).all()
+            }
+
+        assert "job_id" not in not_deleted, "expired row should be deleted"
+        assert "result" in not_deleted, "NEVER_EXPIRE row (expires_at=NULL) 
must survive"
+        assert "future_key" in not_deleted, "not-yet-expired row must survive"
+
+
 @pytest.mark.db_test
 class TestConnectionTestRequestCleanup:
     """Verify db_cleanup never deletes in-flight connection tests (kaxil 
r3169602754)."""
@@ -917,15 +1006,16 @@ class TestConnectionTestRequestCleanup:
 
         # Run cleanup with a cutoff well past every seeded row.
         cutoff = pendulum.now(tz="UTC").subtract(days=1)
-        _cleanup_table(
-            **cfg.__dict__,
-            clean_before_timestamp=cutoff,
-            dry_run=False,
-            verbose=False,
-            confirm=False,
-            skip_archive=True,
-            session=create_session().__enter__(),
-        )
+        with create_session() as session:
+            _cleanup_table(
+                **cfg.__dict__,
+                clean_before_timestamp=cutoff,
+                dry_run=False,
+                verbose=False,
+                confirm=False,
+                skip_archive=True,
+                session=session,
+            )
 
         with create_session() as s:
             survivors = {

Reply via email to