hinnefe2 opened a new issue, #24487:
URL: https://github.com/apache/airflow/issues/24487

   ### Apache Airflow version
   
   2.3.2 (latest released)
   
   ### What happened
   
   Attempting to use [dynamic task 
mapping](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#mapping-over-result-of-classic-operators)
 on the results of a `KubernetesPodOperator` (or `GKEStartPodOperator`) 
produces 3x as many downstream task instances as it should. Two-thirds of the 
downstream tasks fail more or less instantly.
   
   
   ### What you think should happen instead
   
   The problem is that the number of downstream tasks is calculated by counting 
XCOMs associated with the upstream task, assuming that each `task_id` has a 
single XCOM:
   
https://github.com/apache/airflow/blob/fe5e689adfe3b2f9bcc37d3975ae1aea9b55e28a/airflow/models/mappedoperator.py#L606-L615
   
   However the `KubernetesPodOperator` pushes two XCOMs in its `.execute()` 
method:
   
   
https://github.com/apache/airflow/blob/fe5e689adfe3b2f9bcc37d3975ae1aea9b55e28a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L425-L426
   
   So the number of downstream tasks ends up being 3x what it should.
   
   ### How to reproduce
   
   Reproducing the behavior requires access to a kubernetes cluster, but in 
psedo-code, a dag like this should demonstrate the behavior:
   ```
   with DAG(...) as dag:
   
       # produces an output list with N elements
       first_pod = GKEStartPodOperator(..., do_xcom_push=True)
   
       # produces 1 output per input, so N task instances are created each with 
a single output
       second_pod = GKEStartPodOperator.partial(..., 
do_xcom_push=True).expand(id=XComArg(first_pod))
   
       # should have N task instances created, but actually gets 3N task 
instances created
       third_pod = GKEStartPodOperator.partial(..., 
do_xcom_push=True).expand(id=XComArg(second_pod))
   ```
   
   ### Operating System
   
   macOS 12.4
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes==4.1.0
   apache-airflow-providers-google==8.0.0
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   When I edit `mappedoperator.py` in my local deployment to filter on the XCOM 
key things behave as expected:
   ```
           # Collect lengths from mapped upstreams.
           xcom_query = (
               session.query(XCom.task_id, func.count(XCom.map_index))
               .group_by(XCom.task_id)
               .filter(
                   XCom.dag_id == self.dag_id,
                   XCom.run_id == run_id,
                   XCom.key == 'return_value',             <------- added this 
line
                   XCom.task_id.in_(mapped_dep_keys),
                   XCom.map_index >= 0,
               )
           )
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to