atul-astronomer opened a new issue, #46665:
URL: https://github.com/apache/airflow/issues/46665

   ### Apache Airflow version
   
   3.0.0a1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   While pulling XCom, if user passes multiple map_indexes in argument then 
task is failing.
   
   ```javascript
   {"timestamp":"2025-02-12T06:55:44.726594","level":"error","event":"Task 
failed with 
exception","logger":"task","error_detail":[{"exc_type":"NotImplementedError","exc_value":"Multiple
 map_indexes are not supported 
yet","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":567,"name":"run"},
   
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":671,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/airflow/decorators/base.py","lineno":252,"name":"execute"},
   
{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":196,"name":"execute"},
   
{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":222,"name":"execute_callable"},{"filename":"/opt/airflow/airflow/utils/operator_helpers.py","lineno":261,"name":"run"},{"filename":"/files/dags/taskmap/xcom_pull.py","lineno":49,"name":"xcom_pull"},
   
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":277,"name":"xcom_pull"}]}]}
   {"timestamp":"2025-02-12T06:55:44.726849","level":"debug","event":"Sending 
request","json":"{\"state\":\"failed\",\"end_date\":\"2025-02-12T06:55:44.726813Z\",\"type\":\"TaskState\"}\n","logger":"task"}
   
   ``` 
   
   ### What you think should happen instead?
   
   DAG should pass as its passing in AF2.
   
   ### How to reproduce
   
   Run the below DAG code on AF3 tag 3.0.0a2 and notice error on task 
`xcom_pull`
   
   ```python
   from airflow import DAG
   from airflow.models.taskinstance import TaskInstance
   from datetime import datetime, timedelta
   
   with DAG(
       dag_id="taskmap_xcom_pull",
       start_date=datetime(1970, 1, 1),
       schedule=None,
   ) as dag:
   
       @dag.task
       def foo():
           return "foo"
   
       @dag.task
       def identity(thing):
           return thing
   
       @dag.task
       def uppercase(thing):
           return thing.upper()
   
       @dag.task(multiple_outputs=True)
       def two_parts(thing):
           return {"first": thing[0:3], "rest": thing[3:]}
   
       @dag.task
       def xcom_pull(**context):
           ti: TaskInstance = context["ti"]
   
           for x in [
               ti.xcom_pull(task_ids="identity"),
               # <airflow.models.taskinstance._LazyXComAccess object at 
0x7f5d39f92430>
               ti.xcom_pull(task_ids="foo"),
               # foo
               ti.xcom_pull(task_ids="bar"),
               # []
               ti.xcom_pull(task_ids=["identity", "foo", "bar", "uppercase", 
"two_parts"]),
               # [2, 3, 1, 'foo', [], 'A', 'B', 'C', {'first': 'ips', 'rest': 
'um'}, {'first': 'lor', 'rest': 'em'}, {'first': 'dol', 'rest': 'or'}]
               ti.xcom_pull(task_ids="identity", map_indexes=2),
               # 3
               ti.xcom_pull(task_ids="foo", map_indexes=0),
               # None
               ti.xcom_pull(task_ids="foo", map_indexes=2),
               # None
               ti.xcom_pull(task_ids=["identity", "plusfive"], map_indexes=2),
               # [3]
               ti.xcom_pull(task_ids=["identity", "foo", "plusfive"], 
map_indexes=[0, 2]),
               # [1, 3]
               ti.xcom_pull(task_ids=["identity", "plusfive", "two_parts"]),
           ]:
               print(x)
   
       # works
       (
           foo()
           >> identity.expand(thing=[1, 2, 3])
           >> uppercase.expand(thing=["a", "b", "c"])
           >> two_parts.expand(thing=["lorem", "ipsum", "dolor"])
           >> xcom_pull()
       )
   
   ``` 
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to