This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4d5046d78ce Adding example dag for task state store with mapped tasks
(#68670)
4d5046d78ce is described below
commit 4d5046d78ceb272a1fa1dcfb79f85e9dc960e33f
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Jun 18 13:01:00 2026 +0530
Adding example dag for task state store with mapped tasks (#68670)
---
.../example_task_state_store_mapped.py | 59 ++++++++++++++++++++++
1 file changed, 59 insertions(+)
diff --git
a/airflow-core/src/airflow/example_dags/example_task_state_store_mapped.py
b/airflow-core/src/airflow/example_dags/example_task_state_store_mapped.py
new file mode 100644
index 00000000000..3fd8778af87
--- /dev/null
+++ b/airflow-core/src/airflow/example_dags/example_task_state_store_mapped.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG with mapped tasks to demonstrate task state store isolation per
map_index."""
+
+from __future__ import annotations
+
+import random
+from datetime import datetime, timezone
+
+from airflow.sdk import DAG, task
+
+TABLES = ["orders", "customers", "products"]
+
+with DAG(
+ dag_id="example_task_state_store_mapped",
+ schedule=None,
+ start_date=datetime(2026, 1, 1),
+ catchup=False,
+ tags=["example", "task-state-store"],
+ doc_md=__doc__,
+) as dag:
+
+ @task
+ def get_tables() -> list[str]:
+ """Return the list of tables to process."""
+ return TABLES
+
+ @task
+ def process_table(table: str, task_state_store=None, ti=None) -> dict:
+ """Process one table — each mapped instance gets its own task state."""
+ row_count = random.randint(100, 10000)
+ result = {
+ "table": table,
+ "map_index": ti.map_index,
+ "row_count": row_count,
+ "processed_at":
datetime.now(tz=timezone.utc).isoformat(timespec="seconds"),
+ }
+ task_state_store.set("status", "complete")
+ task_state_store.set("result", result)
+
+ print(f"[map_index={ti.map_index}] Processed {table}: {row_count}
rows")
+ return result
+
+ tables = get_tables()
+ process_table.expand(table=tables)