[
https://issues.apache.org/jira/browse/AIRFLOW-703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Len Frodgers updated AIRFLOW-703:
---------------------------------
Attachment: xcom_bug.py
xcom bug (python3)
> Xcom data cleared too soon
> --------------------------
>
> Key: AIRFLOW-703
> URL: https://issues.apache.org/jira/browse/AIRFLOW-703
> Project: Apache Airflow
> Issue Type: Bug
> Components: core, scheduler, xcom
> Affects Versions: Airflow 2.0, Airflow 1.7.1.3
> Environment: Tested using Dockerized Airflow setup with MySQL backend
> and Celery executor
> Reporter: Len Frodgers
> Labels: xcom
> Attachments: xcom_bug.py
>
>
> Xcom data is cleared at the start of the `run` method of the `TaskInstance`,
> regardless of whether the TI is subsequently executed (e.g. if the TI has
> previously succeeded, it won't execute). This means that if a TI for a DagRun
> is run twice in close succession, the latter will correctly not execute
> (since the former TI succeeded or is still running), but WILL clear any xcoms
> set by the former TI. Therefore, any downstream tasks depending on these
> xcoms will fail.
> I noticed this bug when I changed num_runs of the scheduler from None to 10.
> It didn't happen every time, but probably 50% or so.
> However, I can reproduce this reliably and repeatably with the following test
> dag:
> ```
> def func1(ti, **kwargs):
> ti.xcom_push("k1", "xcom_custom")
> return "xcom_default"
> def func2(ti, **kwargs):
> time.sleep(10)
> xcom_custom = ti.xcom_pull("op1", key="k1")
> xcom_default = ti.xcom_pull("op1")
> logging.info("Default: %s, Custom: %s", xcom_default, xcom_custom)
> assert None not in (xcom_custom, xcom_default)
> bug_dag = DAG(
> 'xcom_bug',
> max_active_runs=1,
> schedule_interval=timedelta(minutes=3),
> start_date=datetime(2016, 12, 16, 14, 57)
> )
> op1 = PythonOperator(python_callable=func1, provide_context=True,
> task_id="op1", dag=bug_dag)
> op2 = PythonOperator(python_callable=func2, provide_context=True,
> task_id="op2", dag=bug_dag)
> op1.set_downstream(op2)
> ```
> To make op1 execute twice, I use the UI to run it twice while op2 is doing
> the `time.sleep`.
> Logs from running this:
> [will attach]
> The fix seems straightforward: don't clear xcom unless the TI will actually
> execute. Will happily create a PR.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)