mehmax opened a new issue #18840:
URL: https://github.com/apache/airflow/issues/18840


   ### Apache Airflow version
   
   2.1.0
   
   ### Operating System
   
   CentOs 7
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   This example DAG 
   task 1 is assigned to task group 1 and pushes return_value to xcom 
   task 2 reads return_value of task 1 and echos it.
   
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from airflow.utils.task_group import TaskGroup
   
   with DAG(dag_id='test_xcom', start_date=datetime(2021, 6, 1), 
schedule_interval='@once') as dag:
   
       with TaskGroup(group_id='group1') as g1:
           t1 = BashOperator(
               task_id='task1',
               bash_command='echo abcde',
               do_xcom_push=True
           )
   
       t2 = BashOperator(
           task_id='task2',
           bash_command='''echo {{ ti.xcom_pull(task_ids='task1') }}''',
           dag=dag
       )
   
   
   g1 >> t2
   ```
   
   The problem is, that `xcom_pull(task_ids='task1')` is always returning None 
even though it was pushed correctly:
   
![image](https://user-images.githubusercontent.com/84371809/136577875-7fab66ea-b6a1-487b-8376-3ccbcf99466b.png)
   
   
   
   
   
   
   
   
   ### What you expected to happen
   
   Task 2 should be able to read xcom from task 1
   
   ### How to reproduce
   
   Try this DAG:
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from airflow.utils.task_group import TaskGroup
   
   with DAG(dag_id='test_xcom', start_date=datetime(2021, 6, 1), 
schedule_interval='@once') as dag:
   
       with TaskGroup(group_id='group1') as g1:
           t1 = BashOperator(
               task_id='task1',
               bash_command='echo abcde',
               do_xcom_push=True
           )
   
       t2 = BashOperator(
           task_id='task2',
           bash_command='''echo {{ ti.xcom_pull(task_ids='task1') }}''',
           dag=dag
       )
   
   
   g1 >> t2
   ```
   
   let it run and look into the logfile:
   
   ```
   [2021-10-08 16:39:44,898] {subprocess.py:52} INFO - Tmp dir root location: 
    /tmp
   [2021-10-08 16:39:44,899] {subprocess.py:63} INFO - Running command: 
['bash', '-c', 'echo None']
   [2021-10-08 16:39:44,906] {subprocess.py:75} INFO - Output:
   [2021-10-08 16:39:44,907] {subprocess.py:79} INFO - None
   [2021-10-08 16:39:44,907] {subprocess.py:83} INFO - Command exited with 
return code 0
   [2021-10-08 16:39:44,920] {taskinstance.py:1184} INFO - Marking task as 
SUCCESS. dag_id=test_xcom, task_id=task2, execution_date=20211008T143942, 
start_date=20211008T143944, end_date=20211008T143944
   [2021-10-08 16:39:44,931] {taskinstance.py:1245} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2021-10-08 16:39:44,970] {local_task_job.py:151} INFO - Task exited with 
return code 0
   ```
   
   ### Anything else
   
   For checking if there was something else wrong, I tried the exact same DAG, 
but without TaskGroup.
   
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   
   
   with DAG(dag_id='test_xcom2', start_date=datetime(2021, 6, 1), 
schedule_interval='@once') as dag:
   
       t1 = BashOperator(
           task_id='task1',
           bash_command='echo abcde',
           do_xcom_push=True
       )
   
       t2 = BashOperator(
           task_id='task2',
           bash_command='''echo {{ ti.xcom_pull(task_ids='task1') }}''',
           dag=dag
       )
   
   
   t1 >> t2
   ```
   
   With This DAG xcom_pull is working:
   
   ```
   [2021-10-08 16:39:41,502] {subprocess.py:52} INFO - Tmp dir root location: 
    /tmp
   [2021-10-08 16:39:41,502] {subprocess.py:63} INFO - Running command: 
['bash', '-c', 'echo abcde']
   [2021-10-08 16:39:41,514] {subprocess.py:75} INFO - Output:
   [2021-10-08 16:39:41,514] {subprocess.py:79} INFO - abcde
   [2021-10-08 16:39:41,514] {subprocess.py:83} INFO - Command exited with 
return code 0
   [2021-10-08 16:39:41,549] {taskinstance.py:1184} INFO - Marking task as 
SUCCESS. dag_id=test_xcom2, task_id=task2, execution_date=20211008T143938, 
start_date=20211008T143941, end_date=20211008T143941
   [2021-10-08 16:39:41,561] {taskinstance.py:1245} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2021-10-08 16:39:41,589] {local_task_job.py:151} INFO - Task exited with 
return code 0
   
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to