joshOberhaus opened a new issue, #67265:
URL: https://github.com/apache/airflow/issues/67265

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.2.1
   
   ### What happened and how to reproduce it?
   
   In the following minimal example, run_optional_step runs for each sibling 
(item_type `full`, and item_type `partial`), however `partial` is intended to 
be skipped::
   ```
   
   """Minimal example: @task.branch inside a dynamically mapped task group.
   This example replicates the structure of a real-world DAG, where:
   - One outer task group (process_items) fetches items and expands
       an inner task group (handle_item) via expand_kwargs.
   - Inside handle_item, a @task.branch decides whether to run
       run_optional_step based on item_type.
   
   Expected behavior
   ------------------
   item_type == "full"    → branch returns absolute task ID → run_optional_step 
should run
   item_type == "partial" → branch returns None             → run_optional_step 
should be skipped
   
   Actual behavior
   -------------------------
   run_optional_step runs for ALL items.
   """
   
   from __future__ import annotations
   
   import pendulum
   from airflow.sdk import DAG, task, task_group
   from airflow.providers.standard.operators.empty import EmptyOperator
   
   with DAG(
       dag_id="branch_in_mapped_taskgroup_example",
       schedule=None,
       start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
       catchup=False,
   ) as dag:
   
       # ------------------------------------------------------------------ #
       # Inner task group: handles one mapped item.                           #
       # The branch should skip or run an optional step per mapped instance.  #
       # ------------------------------------------------------------------ #
       @task_group(group_id="handle_item")
       def tg_handle_item(item_id: str, item_type: str):
           """Process one item.  Only 'full' items need the extra step."""
   
           @task(task_id="process")
           def process(item_id: str, item_type: str) -> str:
               print(f"Processing {item_id=} {item_type=}")
               return item_type
   
           result = process(item_id, item_type)
   
           @task.branch(task_id="select_optional_step")
           def select_optional_step(item_type: str):
               if item_type == "full":
                   return ["process_items.handle_item.run_optional_step"]
               return None
   
           branch = select_optional_step(result)
           run_optional_step = EmptyOperator(task_id="run_optional_step")
           branch >> run_optional_step
   
       # ------------------------------------------------------------------ #
       # Outer task group: loads items and expands the inner group once per  #
       # item. There is only one outer group instance in the DAG.            #
       # ------------------------------------------------------------------ #
       @task_group(group_id="process_items")
       def tg_process_items():
           @task(task_id="get_items")
           def get_items() -> list[dict]:
               return [
                   {"item_id": "item_1", "item_type": "full"},     # optional 
step should run
                   {"item_id": "item_2", "item_type": "partial"},  # optional 
step should be skipped
               ]
   
           tg_handle_item.expand_kwargs(get_items())
   
       tg_process_items()
   
   ```
   
   ### What you think should happen instead?
   
   item_type == "full"    → branch returns absolute task ID → run_optional_step 
should run
   item_type == "partial" → branch returns None             → run_optional_step 
should be skipped
   
   This is what happens up through 3.1.7.
   
   ### Operating System
   
   _No response_
   
   ### Deployment
   
   None
   
   ### Apache Airflow Provider(s)
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   This was originally discussed in 
https://github.com/apache/airflow/issues/65745, however, it seems like that 
issue is either not-reproducible or describing another issue.
   
   [Further context from 
@martijn-exads](https://github.com/apache/airflow/issues/65745#issuecomment-4497401392):
   We think we've traced it to https://github.com/apache/airflow/pull/62287, 
specifically this line in NotPreviouslySkippedDep:
   xcom_map_index = ti.map_index if parent.is_mapped else -1
   
   For a branch operator inside a mapped @task_group, the operator itself has 
no .expand() so parent.is_mapped is False, but its TIs run with map_index >= 0 
and SkipMixin writes XCOM_SKIPMIXIN_KEY at the parent TI's per-map map_index. 
The dep then queries map_indexes=-1, finds nothing, returns "not skipped", and 
the non-selected sibling proceeds.
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to