amoghrajesh commented on code in PR #57474:
URL: https://github.com/apache/airflow/pull/57474#discussion_r2485419751


##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -285,30 +289,55 @@ def ti_run(
 
 
 def _get_upstream_map_indexes(
-    task: MappedOperator | SerializedBaseOperator, ti_map_index: int, run_id: 
str, session: SessionDep
+    *,
+    serialized_dag: SerializedDAG,
+    ti: TI,
+    session: SessionDep,
 ) -> Iterator[tuple[str, int | list[int] | None]]:
-    task_mapped_group = task.get_closest_mapped_task_group()
+    task = serialized_dag.get_task(ti.task_id)
     for upstream_task in task.upstream_list:
-        upstream_mapped_group = upstream_task.get_closest_mapped_task_group()
         map_indexes: int | list[int] | None
-        if upstream_mapped_group is None:
+        if (upstream_mapped_group := 
upstream_task.get_closest_mapped_task_group()) is None:
             # regular tasks or non-mapped task groups
             map_indexes = None
-        elif task_mapped_group == upstream_mapped_group:
+        elif task.get_closest_mapped_task_group() == upstream_mapped_group:
             # tasks in the same mapped task group hierarchy
-            map_indexes = ti_map_index
+            map_indexes = ti.map_index
         else:
             # tasks not in the same mapped task group
             # the upstream mapped task group should combine the return xcom as 
a list and return it
-            mapped_ti_count: int
+            mapped_ti_count: int | None = None
+
             try:
-                # for cases that does not need to resolve xcom
+                # First try: without resolving XCom
                 mapped_ti_count = 
upstream_mapped_group.get_parse_time_mapped_ti_count()
             except NotFullyPopulated:
-                # for cases that needs to resolve xcom to get the correct count
-                mapped_ti_count = cast(
-                    "SchedulerExpandInput", upstream_mapped_group._expand_input
-                ).get_total_map_length(run_id, session=session)
+                # Second try: resolve XCom for correct count
+                try:
+                    expand_input = cast("SchedulerExpandInput", 
upstream_mapped_group._expand_input)
+                    mapped_ti_count = 
expand_input.get_total_map_length(ti.run_id, session=session)
+                except NotFullyPopulated:
+                    # For these trigger rules, unresolved map indexes are 
acceptable.
+                    # The success of the upstream task is not the main reason 
for triggering the current task.
+                    # Therefore, whether the upstream task is fully populated 
can be ignored.
+                    allowed_rules = {
+                        TriggerRule.ALL_FAILED,
+                        TriggerRule.ALL_DONE,
+                        TriggerRule.ALL_DONE_MIN_ONE_SUCCESS,
+                        TriggerRule.ALL_DONE_SETUP_SUCCESS,
+                        TriggerRule.ONE_SUCCESS,
+                        TriggerRule.ONE_FAILED,
+                        TriggerRule.ONE_DONE,
+                        TriggerRule.NONE_FAILED,
+                        TriggerRule.NONE_SKIPPED,
+                        TriggerRule.ALWAYS,
+                        TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+                        TriggerRule.ALL_SKIPPED,
+                    }
+                    if task.trigger_rule in allowed_rules:
+                        mapped_ti_count = None

Review Comment:
   Same as not success?



##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -436,6 +443,71 @@ def task_3():
         )
         assert response.json()["upstream_map_indexes"] == {"tg.task_2": [0, 1, 
2, 3, 4, 5]}
 
+    def test_dynamic_task_mapping_with_trigger_rule(
+        self, client: Client, dag_maker: DagMaker, session: Session
+    ):
+        """
+        Test that the Task Instance upstream_map_indexes is not populuated but
+        the downstream task should still be run due to trigger rule.
+        """
+
+        with dag_maker(session=session, serialized=True):
+
+            @task
+            def task_1():
+                raise AirflowSkipException()
+
+            @task_group
+            def tg(x):
+                @task
+                def task_2():
+                    raise AirflowSkipException()
+
+                task_2()
+
+            @task(trigger_rule=TriggerRule.ALL_DONE)
+            def task_3():
+                pass
+
+            @task
+            def task_4():
+                pass
+
+            tg.expand(x=task_1()) >> [task_3(), task_4()]
+
+        dr = dag_maker.create_dagrun()
+
+        decision = dr.task_instance_scheduling_decisions(session=session)
+
+        # Simulate task_1 skipped
+        (ti_1,) = decision.schedulable_tis
+        ti_1.state = TaskInstanceState.SKIPPED
+        session.flush()
+
+        # Now task_2 in mapped tagk group is not expanded and also skipped..

Review Comment:
   ```suggestion
           # Now task_2 in mapped task group is not expanded and also skipped..
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to