This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41ebdf3e3b0 Revert #59691 due to broken trigger rules in mapped task 
groups (#68418)
41ebdf3e3b0 is described below

commit 41ebdf3e3b0376f55c0302e279c5f17a7dedc545
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Jun 12 08:20:25 2026 +0200

    Revert #59691 due to broken trigger rules in mapped task groups (#68418)
---
 airflow-core/src/airflow/models/taskinstance.py    |  56 +----------
 .../tests/unit/models/test_taskinstance.py         | 109 ---------------------
 2 files changed, 1 insertion(+), 164 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index aeff2695f1b..740596f9d69 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2421,19 +2421,7 @@ def _get_relevant_map_indexes(
     # and "ti_count == ancestor_ti_count" does not work, since the further
     # expansion may be of length 1.
     if not _is_further_mapped_inside(relative, common_ancestor):
-        # During mapped task group expansion, upstream placeholder task 
instances
-        # (map_index = -1) may already have been replaced by their first 
expanded
-        # successor (map_index = 0) while downstream task instances are still
-        # unexpanded and continue resolving dependencies against the 
placeholder index.
-        resolved_map_index = (
-            0
-            if _should_use_post_expansion_placeholder(
-                task=task, relative=relative, map_index=ancestor_map_index, 
run_id=run_id, session=session
-            )
-            else ancestor_map_index
-        )
-
-        return resolved_map_index
+        return ancestor_map_index
 
     # Otherwise we need a partial aggregation for values from selected task
     # instances in the ancestor's expansion context.
@@ -2507,48 +2495,6 @@ def find_relevant_relatives(
     return visited
 
 
-def _should_use_post_expansion_placeholder(
-    *,
-    task: Operator,
-    relative: Operator,
-    map_index: int,
-    run_id: str,
-    session: Session,
-) -> bool:
-    """
-    Determine whether upstream dependency resolution should use map_index = 0.
-
-    Returns True when the upstream placeholder task instance
-    (map_index = -1) has already been replaced by its post-expansion
-    successor (map_index = 0).
-    """
-    if map_index != -1:
-        return False
-
-    rows = session.execute(
-        select(TaskInstance.task_id, TaskInstance.map_index).where(
-            TaskInstance.dag_id == relative.dag_id,
-            TaskInstance.run_id == run_id,
-            TaskInstance.task_id.in_([task.task_id, relative.task_id]),
-            TaskInstance.map_index.in_([-1, 0]),
-        )
-    ).all()
-
-    task_to_map_indexes: dict[str, set[int]] = defaultdict(set)
-    for task_id, mi in rows:
-        task_to_map_indexes[task_id].add(mi)
-
-    # We only rewrite when:
-    # 1) the current task is still using the placeholder (-1)
-    # 2) the upstream placeholder (-1) no longer exists
-    # 3) the post-expansion placeholder (0) does exist
-    return (
-        -1 in task_to_map_indexes[task.task_id]
-        and -1 not in task_to_map_indexes[relative.task_id]
-        and 0 in task_to_map_indexes[relative.task_id]
-    )
-
-
 class TaskInstanceNote(Base):
     """For storage of arbitrary notes concerning the task instance."""
 
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 987768da124..b9d8c85f613 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3523,115 +3523,6 @@ def test_find_relevant_relatives(dag_maker, session, 
normal_tasks, mapped_tasks,
     assert result == expected
 
 
-def test_downstream_placeholder_handles_upstream_post_expansion(dag_maker, 
session):
-    """
-    Test dynamic task mapping behavior when an upstream placeholder task
-    (map_index = -1) has been replaced by the first expanded task
-    (map_index = 0).
-
-    This verifies that downstream mapped dependency resolution:
-    - preserves placeholder behavior before upstream expansion
-    - correctly resolves the post-expansion transition
-    - preserves normal expanded task behavior afterwards
-    """
-
-    with dag_maker(session=session) as dag:
-
-        @task
-        def get_mapping_source():
-            return ["one", "two", "three"]
-
-        @task
-        def mapped_task(x):
-            output = f"{x}"
-            return output
-
-        @task_group(prefix_group_id=False)
-        def the_task_group(x):
-            start = MockOperator(task_id="start")
-            upstream = mapped_task(x)
-
-            # Downstream task inside the task group that does not directly
-            # consume the expand input, but is still mapped via the mapped
-            # task group context.
-            downstream = MockOperator(task_id="downstream")
-
-            start >> upstream >> downstream
-
-        mapping_source = get_mapping_source()
-        mapped_tg = the_task_group.expand(x=mapping_source)
-
-        mapping_source >> mapped_tg
-
-    # Create DAG run and execute prerequisites.
-    dr = dag_maker.create_dagrun()
-
-    dag_maker.run_ti("get_mapping_source", map_index=-1, dag_run=dr, 
session=session)
-
-    upstream_task = dag.get_task("mapped_task")
-    downstream_task = dag.get_task("downstream")
-
-    # Before upstream expansion occurs, mapped dependency resolution
-    # should retain the existing placeholder semantics since no concrete
-    # upstream/downstream map index pairing exists yet.
-    downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1, 
session=session)
-    downstream_ti.refresh_from_task(downstream_task)
-
-    result = downstream_ti.get_relevant_upstream_map_indexes(
-        upstream=upstream_task,
-        ti_count=1,
-        session=session,
-    )
-
-    assert result == -3
-
-    # Force expansion of the upstream mapped task.
-    upstream_task = dag.get_task("mapped_task")
-    _, max_index = TaskMap.expand_mapped_task(
-        upstream_task,
-        dr.run_id,
-        session=session,
-    )
-    upstream_expanded_ti_count = max_index + 1
-
-    downstream_task = dag.get_task("downstream")
-
-    # Grab the downstream placeholder TI.
-    downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1, 
session=session)
-    downstream_ti.refresh_from_task(downstream_task)
-
-    result = downstream_ti.get_relevant_upstream_map_indexes(
-        upstream=upstream_task,
-        ti_count=upstream_expanded_ti_count,
-        session=session,
-    )
-
-    assert result == 0
-
-    # Now do the same for downstream expanded (map_index = 0) to ensure 
existing behavior is not broken.
-    # Force expansion of the downstream mapped task.
-    _, max_index = TaskMap.expand_mapped_task(
-        downstream_task,
-        dr.run_id,
-        session=session,
-    )
-    downstream_expanded_ti_count = max_index + 1
-
-    # Grab the first expanded downstream task instance (map_index = 0).
-    downstream_ti = dr.get_task_instance(task_id="downstream", map_index=0, 
session=session)
-    downstream_ti.refresh_from_task(downstream_task)
-
-    result = downstream_ti.get_relevant_upstream_map_indexes(
-        upstream=upstream_task,
-        ti_count=downstream_expanded_ti_count,
-        session=session,
-    )
-
-    # Verify behavior remains unchanged once the downstream task
-    # itself has expanded.
-    assert result == 0
-
-
 def test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, 
session):
     """Test that specifying a non-mapped task as a tuple doesn't raise 
NotMapped exception."""
     # t1 -> t2 (non-mapped) -> t3

Reply via email to