karenbraganz commented on issue #41077:
URL: https://github.com/apache/airflow/issues/41077#issuecomment-2286476279

   This is happening because the XCom is pulled from the DAG run with the most 
recent logical date when `include_prior_dates=True`. Here are two potential 
workarounds for this issue:
   
   1. Pass the parent DAG's run ID to the child DAG through 
`TriggerDagRunOperator` and set `include_prior_dates=False`. 
    
   First, add the `trigger_run_id="{{ run_id }}"` parameter to 
`TriggerDagRunOperator` in main_dag.py as shown below.
    
   ```
       child_dag_trigger = TriggerDagRunOperator(
           task_id="child_dag_trigger",
           trigger_dag_id="child_dag",
           retries=0,
           wait_for_completion=True,
           deferrable=True
           trigger_run="{{ run_id }}"
   ```
   Next, delete the `include_prior_dates=True` parameter from the XCom pull in 
child_dag.py as shown below. It will default to `False`.
    
   `value = ti.xcom_pull(dag_id='main_dag', task_ids='push_config', 
key='snapshot_id',)`
    
   When `include_prior_dates=False`, the child DAG run only pulls XComs from a 
DAG run with the same `run_id`. Since we are passing the parent DAG run's 
`run_id `to the child DAG run through `TriggerDagRunOperator`, the correct XCom 
will be pulled even during a DAG run that is scheduled after a manual run. 
    
   2. The second potential workaround would be to use `CronTriggerTimetable` in 
the DAG's schedule parameter instead of just the regular Cron expression which 
defaults to using the `CronDataIntervalTimetable`.
    
   First, you would have to import `CronTriggerTimetable` by adding this line 
to main_dag.py.
    
   `from airflow.timetables.trigger import CronTriggerTimetable`
    
   Next, you would have to change the schedule parameter to 
`schedule=CronTriggerTimetable(<your_Cron_expression>)` as shown below in 
main_dag.py. 
    
   ```
    with DAG('main_dag', start_date=datetime(2023, 11, 8, 
tzinfo=pendulum.timezone("America/New_York")),
            schedule=CronTriggerTimetable('*/5 * * * *', timezone='UTC'), 
max_active_runs=1, catchup=False, template_searchpath=airflow_include_dir, 
default_args=default_args, tags=["basic"]) as dag:
   ```
    
   This works with `include_prior_dates=True`. 
    
   With CronTriggerTimetable, both manually triggered and scheduled DAG runs 
will have the `logical_date` equal to `data_interval_end` (time when DAG 
actually runs). On the other hand, with the default `CronDataIntervalTimetable` 
that we were using before, the `logical_date` equals the `data_interval_end` 
for manual runs only whereas the `logical_date` equals the 
`data_interval_start` for scheduled runs. This results in the `logical_date` of 
a scheduled run being prior to the `logical_date` of the manual run that was 
triggered right before it. Since the XCom pulls data from the run with the most 
recent `logical_date` when `include_prior_dates=True`, the child DAG run will 
pull the XCom from the manual run instead of its own parent run (which has a 
`logical_date`  less recent than the manual run). Switching to 
`CronTriggerTimetable` fixes the `logical_date` logic and resolves this issue 
even when `include_prior_dates=True`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to