This is an automated email from the ASF dual-hosted git repository.
dabla pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a5ffa6c7949 Fix upstream map index resolution after placeholder
expansion (#59691)
a5ffa6c7949 is described below
commit a5ffa6c7949e07b13fd410256b99e77621d53b7b
Author: SameerMesiah97 <[email protected]>
AuthorDate: Thu Jun 11 18:22:20 2026 +0100
Fix upstream map index resolution after placeholder expansion (#59691)
* Fix upstream map index resolution after placeholder expansion with unit
test.
* Rename placeholder map index helper to reflect boolean semantics,
return bool instead of 0/None, and simplify the associated
resolution logic and documentation.
---------
Co-authored-by: Sameer Mesiah <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 56 ++++++++++-
.../tests/unit/models/test_taskinstance.py | 109 +++++++++++++++++++++
2 files changed, 164 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 740596f9d69..aeff2695f1b 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2421,7 +2421,19 @@ def _get_relevant_map_indexes(
# and "ti_count == ancestor_ti_count" does not work, since the further
# expansion may be of length 1.
if not _is_further_mapped_inside(relative, common_ancestor):
- return ancestor_map_index
+ # During mapped task group expansion, upstream placeholder task
instances
+ # (map_index = -1) may already have been replaced by their first
expanded
+ # successor (map_index = 0) while downstream task instances are still
+ # unexpanded and continue resolving dependencies against the
placeholder index.
+ resolved_map_index = (
+ 0
+ if _should_use_post_expansion_placeholder(
+ task=task, relative=relative, map_index=ancestor_map_index,
run_id=run_id, session=session
+ )
+ else ancestor_map_index
+ )
+
+ return resolved_map_index
# Otherwise we need a partial aggregation for values from selected task
# instances in the ancestor's expansion context.
@@ -2495,6 +2507,48 @@ def find_relevant_relatives(
return visited
+def _should_use_post_expansion_placeholder(
+ *,
+ task: Operator,
+ relative: Operator,
+ map_index: int,
+ run_id: str,
+ session: Session,
+) -> bool:
+ """
+ Determine whether upstream dependency resolution should use map_index = 0.
+
+ Returns True when the upstream placeholder task instance
+ (map_index = -1) has already been replaced by its post-expansion
+ successor (map_index = 0).
+ """
+ if map_index != -1:
+ return False
+
+ rows = session.execute(
+ select(TaskInstance.task_id, TaskInstance.map_index).where(
+ TaskInstance.dag_id == relative.dag_id,
+ TaskInstance.run_id == run_id,
+ TaskInstance.task_id.in_([task.task_id, relative.task_id]),
+ TaskInstance.map_index.in_([-1, 0]),
+ )
+ ).all()
+
+ task_to_map_indexes: dict[str, set[int]] = defaultdict(set)
+ for task_id, mi in rows:
+ task_to_map_indexes[task_id].add(mi)
+
+ # We only rewrite when:
+ # 1) the current task is still using the placeholder (-1)
+ # 2) the upstream placeholder (-1) no longer exists
+ # 3) the post-expansion placeholder (0) does exist
+ return (
+ -1 in task_to_map_indexes[task.task_id]
+ and -1 not in task_to_map_indexes[relative.task_id]
+ and 0 in task_to_map_indexes[relative.task_id]
+ )
+
+
class TaskInstanceNote(Base):
"""For storage of arbitrary notes concerning the task instance."""
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index b9d8c85f613..987768da124 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3523,6 +3523,115 @@ def test_find_relevant_relatives(dag_maker, session,
normal_tasks, mapped_tasks,
assert result == expected
+def test_downstream_placeholder_handles_upstream_post_expansion(dag_maker,
session):
+ """
+ Test dynamic task mapping behavior when an upstream placeholder task
+ (map_index = -1) has been replaced by the first expanded task
+ (map_index = 0).
+
+ This verifies that downstream mapped dependency resolution:
+ - preserves placeholder behavior before upstream expansion
+ - correctly resolves the post-expansion transition
+ - preserves normal expanded task behavior afterwards
+ """
+
+ with dag_maker(session=session) as dag:
+
+ @task
+ def get_mapping_source():
+ return ["one", "two", "three"]
+
+ @task
+ def mapped_task(x):
+ output = f"{x}"
+ return output
+
+ @task_group(prefix_group_id=False)
+ def the_task_group(x):
+ start = MockOperator(task_id="start")
+ upstream = mapped_task(x)
+
+ # Downstream task inside the task group that does not directly
+ # consume the expand input, but is still mapped via the mapped
+ # task group context.
+ downstream = MockOperator(task_id="downstream")
+
+ start >> upstream >> downstream
+
+ mapping_source = get_mapping_source()
+ mapped_tg = the_task_group.expand(x=mapping_source)
+
+ mapping_source >> mapped_tg
+
+ # Create DAG run and execute prerequisites.
+ dr = dag_maker.create_dagrun()
+
+ dag_maker.run_ti("get_mapping_source", map_index=-1, dag_run=dr,
session=session)
+
+ upstream_task = dag.get_task("mapped_task")
+ downstream_task = dag.get_task("downstream")
+
+ # Before upstream expansion occurs, mapped dependency resolution
+ # should retain the existing placeholder semantics since no concrete
+ # upstream/downstream map index pairing exists yet.
+ downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1,
session=session)
+ downstream_ti.refresh_from_task(downstream_task)
+
+ result = downstream_ti.get_relevant_upstream_map_indexes(
+ upstream=upstream_task,
+ ti_count=1,
+ session=session,
+ )
+
+ assert result == -3
+
+ # Force expansion of the upstream mapped task.
+ upstream_task = dag.get_task("mapped_task")
+ _, max_index = TaskMap.expand_mapped_task(
+ upstream_task,
+ dr.run_id,
+ session=session,
+ )
+ upstream_expanded_ti_count = max_index + 1
+
+ downstream_task = dag.get_task("downstream")
+
+ # Grab the downstream placeholder TI.
+ downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1,
session=session)
+ downstream_ti.refresh_from_task(downstream_task)
+
+ result = downstream_ti.get_relevant_upstream_map_indexes(
+ upstream=upstream_task,
+ ti_count=upstream_expanded_ti_count,
+ session=session,
+ )
+
+ assert result == 0
+
+ # Now do the same for downstream expanded (map_index = 0) to ensure
existing behavior is not broken.
+ # Force expansion of the downstream mapped task.
+ _, max_index = TaskMap.expand_mapped_task(
+ downstream_task,
+ dr.run_id,
+ session=session,
+ )
+ downstream_expanded_ti_count = max_index + 1
+
+ # Grab the first expanded downstream task instance (map_index = 0).
+ downstream_ti = dr.get_task_instance(task_id="downstream", map_index=0,
session=session)
+ downstream_ti.refresh_from_task(downstream_task)
+
+ result = downstream_ti.get_relevant_upstream_map_indexes(
+ upstream=upstream_task,
+ ti_count=downstream_expanded_ti_count,
+ session=session,
+ )
+
+ # Verify behavior remains unchanged once the downstream task
+ # itself has expanded.
+ assert result == 0
+
+
def test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker,
session):
"""Test that specifying a non-mapped task as a tuple doesn't raise
NotMapped exception."""
# t1 -> t2 (non-mapped) -> t3