falcon97 opened a new issue, #41077: URL: https://github.com/apache/airflow/issues/41077
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.7.3 ### What happened? I have a primary DAG which generates some timestamps during each DAG run. Following the completion of some tasks in this DAG, I trigger a few other DAGs using the TriggerDagRunOperator. These secondary DAGs make use of the timestamps generated by the primary one using the xcom_pull function. There was one instance where I had to trigger the DAG manually outside of its schedule. All the tasks completed as expected during this manual run, and fetched the values from XCom correctly. But during the next scheduled run, I observed that in the secondary DAGs, the value fetched by the xcom_pull function was from the previous manual run of the primary DAG. However in the primary DAG itself, the value was being fetched correctly. ### What you think should happen instead? The xcom_pull function should be fetching the latest value regardless of whether the previous run is manual or scheduled. This is exhibiting the issue mentioned in #28085. Relevant comment link: https://github.com/apache/airflow/discussions/28085#discussioncomment-4300135 ### How to reproduce 1. Create the DAGs mentioned below. 2. Unpause both DAGs. The first scheduled run of main_dag should push a value to XCom, and the corresponding DAG run of child_dag should fetch the same value from XCom. 3. Before the next scheduled run of main_dag begins in 5 minutes, trigger it manually. Again, we should see the same behavior as mentioned in step 2. 4. Let the next scheduled run of main_dag start. It pushes a value to XCom, but the same value is not fetched by child_dag. Instead, the value from the previous manual run is printed. main_dag.py: ``` from datetime import datetime, timedelta import os import pendulum from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator airflow_include_dir = os.path.join(os.environ['AIRFLOW_HOME'], 'include') default_args = { "owner": "airflow", 'retries': 3, 'retry_delay': timedelta(seconds=60), } def push_config(ti): current_datetime = datetime.now() ti.xcom_push(key='snapshot_id', value=current_datetime) print(f'Pushed {current_datetime} to XCom with key snapshot_id.') with DAG('main_dag', start_date=datetime(2023, 11, 8, tzinfo=pendulum.timezone("America/New_York")), schedule='*/5 * * * *', max_active_runs=1, catchup=False, template_searchpath=airflow_include_dir, default_args=default_args, tags=["basic"]) as dag: push_config = PythonOperator( task_id='push_config', python_callable=push_config ) child_dag_trigger = TriggerDagRunOperator( task_id="child_dag_trigger", trigger_dag_id="child_dag", retries=0, wait_for_completion=True, deferrable=True ) push_config >> child_dag_trigger ``` child_dag.py: ``` from airflow import DAG from datetime import datetime, timedelta from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 11, 8), 'retries': 3, 'retry_delay': timedelta(seconds=60) } def pull_config(ti): value = ti.xcom_pull( dag_id='main_dag', task_ids='push_config', key='snapshot_id', include_prior_dates=True) print(value) with DAG('child_dag', default_args=default_args, schedule=None, catchup=False, tags=["reporting"]) as dag: pull_config = PythonOperator( task_id='pull_config', python_callable=pull_config ) pull_config ``` ### Operating System Linux ### Versions of Apache Airflow Providers _No response_ ### Deployment Astronomer ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
