[ 
https://issues.apache.org/jira/browse/AIRFLOW-703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Len Frodgers updated AIRFLOW-703:
---------------------------------
    Attachment: xcom_bug.py

xcom bug (python3)

> Xcom data cleared too soon
> --------------------------
>
>                 Key: AIRFLOW-703
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-703
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core, scheduler, xcom
>    Affects Versions: Airflow 2.0, Airflow 1.7.1.3
>         Environment: Tested using Dockerized Airflow setup with MySQL backend 
> and Celery executor
>            Reporter: Len Frodgers
>              Labels: xcom
>         Attachments: xcom_bug.py
>
>
> Xcom data is cleared at the start of the `run` method of the `TaskInstance`, 
> regardless of whether the TI is subsequently executed (e.g. if the TI has 
> previously succeeded, it won't execute). This means that if a TI for a DagRun 
> is run twice in close succession, the latter will correctly not execute 
> (since the former TI succeeded or is still running), but WILL clear any xcoms 
> set by the former TI. Therefore, any downstream tasks depending on these 
> xcoms will fail.
> I noticed this bug when I changed num_runs of the scheduler from None to 10. 
> It didn't happen every time, but probably 50% or so.
> However, I can reproduce this reliably and repeatably with the following test 
> dag:
> ```
> def func1(ti, **kwargs):
>     ti.xcom_push("k1", "xcom_custom")
>     return "xcom_default"
> def func2(ti, **kwargs):
>     time.sleep(10)
>     xcom_custom = ti.xcom_pull("op1", key="k1")
>     xcom_default = ti.xcom_pull("op1")
>     logging.info("Default: %s, Custom: %s", xcom_default, xcom_custom)
>     assert None not in (xcom_custom, xcom_default)
> bug_dag = DAG(
>     'xcom_bug',
>     max_active_runs=1,
>     schedule_interval=timedelta(minutes=3),
>     start_date=datetime(2016, 12, 16, 14, 57)
> )
> op1 = PythonOperator(python_callable=func1, provide_context=True, 
> task_id="op1", dag=bug_dag)
> op2 = PythonOperator(python_callable=func2, provide_context=True, 
> task_id="op2", dag=bug_dag)
> op1.set_downstream(op2)
> ```
> To make op1 execute twice, I use the UI to run it twice while op2 is doing 
> the `time.sleep`.
> Logs from running this:
> [will attach]
> The fix seems straightforward: don't clear xcom unless the TI will actually 
> execute. Will happily create a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to