falcon97 opened a new issue, #41077:
URL: https://github.com/apache/airflow/issues/41077

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.7.3
   
   ### What happened?
   
   I have a primary DAG which generates some timestamps during each DAG run. 
Following the completion of some tasks in this DAG, I trigger a few other DAGs 
using the TriggerDagRunOperator. These secondary DAGs make use of the 
timestamps generated by the primary one using the xcom_pull function. There was 
one instance where I had to trigger the DAG manually outside of its schedule. 
All the tasks completed as expected during this manual run, and fetched the 
values from XCom correctly.
   
   But during the next scheduled run, I observed that in the secondary DAGs, 
the value fetched by the xcom_pull function was from the previous manual run of 
the primary DAG. However in the primary DAG itself, the value was being fetched 
correctly.
   
   ### What you think should happen instead?
   
   The xcom_pull function should be fetching the latest value regardless of 
whether the previous run is manual or scheduled. This is exhibiting the issue 
mentioned in #28085.
   Relevant comment link: 
https://github.com/apache/airflow/discussions/28085#discussioncomment-4300135
   
   ### How to reproduce
   
   1. Create the DAGs mentioned below.
   2. Unpause both DAGs. The first scheduled run of main_dag should push a 
value to XCom, and the corresponding DAG run of child_dag should fetch the same 
value from XCom.
   3. Before the next scheduled run of main_dag begins in 5 minutes, trigger it 
manually. Again, we should see the same behavior as mentioned in step 2.
   4. Let the next scheduled run of main_dag start. It pushes a value to XCom, 
but the same value is not fetched by child_dag. Instead, the value from the 
previous manual run is printed.
   
   
   main_dag.py:
   ```
   from datetime import datetime, timedelta
   import os
   import pendulum
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator 
   
   airflow_include_dir = os.path.join(os.environ['AIRFLOW_HOME'], 'include')
   
   default_args = {
       "owner": "airflow",
       'retries': 3,
       'retry_delay': timedelta(seconds=60),
   }
    
   def push_config(ti):
       current_datetime = datetime.now()
       ti.xcom_push(key='snapshot_id', value=current_datetime)
       print(f'Pushed {current_datetime} to XCom with key snapshot_id.')
   
    with DAG('main_dag', start_date=datetime(2023, 11, 8, 
tzinfo=pendulum.timezone("America/New_York")),
            schedule='*/5 * * * *', max_active_runs=1, catchup=False, 
template_searchpath=airflow_include_dir,
            default_args=default_args, tags=["basic"]) as dag:
   
       push_config = PythonOperator(
           task_id='push_config',
           python_callable=push_config
       )
   
       child_dag_trigger = TriggerDagRunOperator(
           task_id="child_dag_trigger",
           trigger_dag_id="child_dag",
           retries=0,
           wait_for_completion=True,
           deferrable=True
       )
   
       push_config >> child_dag_trigger
   ```
   child_dag.py:
   ```
   from airflow import DAG
   from datetime import datetime, timedelta
   from airflow.operators.python import PythonOperator
    
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': datetime(2023, 11, 8),
       'retries': 3,
       'retry_delay': timedelta(seconds=60)
   }
   
   def pull_config(ti):
       value = ti.xcom_pull(
           dag_id='main_dag', task_ids='push_config', key='snapshot_id', 
include_prior_dates=True)
       print(value)
   
   with DAG('child_dag', default_args=default_args, schedule=None, 
catchup=False, tags=["reporting"]) as dag:
   
       pull_config = PythonOperator(
           task_id='pull_config',
           python_callable=pull_config
       )
   
       pull_config
   ```
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to