[ 
https://issues.apache.org/jira/browse/AIRFLOW-703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Len Frodgers updated AIRFLOW-703:
---------------------------------
    Description: 
Xcom data is cleared at the start of the `run` method of the `TaskInstance`, 
regardless of whether the TI is subsequently executed (e.g. if the TI has 
previously succeeded, it won't execute). This means that if a TI for a DagRun 
is run twice in close succession, the latter will correctly not execute (since 
the former TI succeeded or is still running), but WILL clear any xcoms set by 
the former TI. Therefore, any downstream tasks depending on these xcoms will 
fail.

I noticed this bug when I changed num_runs of the scheduler from None to 10. It 
didn't happen every time, but probably 50% or so.

However, I can reproduce this reliably and repeatably with the following test 
dag:

```
def func1(ti, **kwargs):
    ti.xcom_push("k1", "xcom_custom")
    return "xcom_default"


def func2(ti, **kwargs):
    time.sleep(10)
    xcom_custom = ti.xcom_pull("op1", key="k1")
    xcom_default = ti.xcom_pull("op1")
    logging.info("Default: %s, Custom: %s", xcom_default, xcom_custom)
    assert None not in (xcom_custom, xcom_default)


bug_dag = DAG(
    'xcom_bug',
    max_active_runs=1,
    schedule_interval=timedelta(minutes=3),
    start_date=datetime(2016, 12, 16, 14, 57)
)
op1 = PythonOperator(python_callable=func1, provide_context=True, 
task_id="op1", dag=bug_dag)
op2 = PythonOperator(python_callable=func2, provide_context=True, 
task_id="op2", dag=bug_dag)
op1.set_downstream(op2)
```

To make op1 execute twice, I use the UI to run it twice while op2 is doing the 
`time.sleep`.

Logs from running this:
[will attach]

The fix seems straightforward: don't clear xcom unless the TI will actually 
execute. Will happily create a PR.


  was:

Xcom data is cleared at the start of the `run` method of the `TaskInstance`, 
regardless of whether the TI is subsequently executed (e.g. if the TI has 
previously succeeded, it won't execute). This means that if a TI for a DagRun 
is run twice in close succession, the latter will not execute (since the former 
TI succeeded or is still running), but WILL clear any xcoms set by the former 
TI. Therefore, and downstream tasks depending on these xcoms will fail.

I noticed this bug when I changed num_runs of the scheduler from None to 10. It 
didn't happen every time, but probably 50% or so.

However, I can reproduce this reliably and repeatably with the following test 
dag:

```
def func1(ti, **kwargs):
    ti.xcom_push("k1", "xcom_custom")
    return "xcom_default"


def func2(ti, **kwargs):
    time.sleep(10)
    xcom_custom = ti.xcom_pull("op1", key="k1")
    xcom_default = ti.xcom_pull("op1")
    logging.info("Default: %s, Custom: %s", xcom_default, xcom_custom)
    assert None not in (xcom_custom, xcom_default)


bug_dag = DAG(
    'xcom_bug',
    max_active_runs=1,
    schedule_interval=timedelta(minutes=3),
    start_date=datetime(2016, 12, 16, 14, 57)
)
op1 = PythonOperator(python_callable=func1, provide_context=True, 
task_id="op1", dag=bug_dag)
op2 = PythonOperator(python_callable=func2, provide_context=True, 
task_id="op2", dag=bug_dag)
op1.set_downstream(op2)
```

To make op1 execute twice, I use the UI to run it twice while op2 is doing the 
`time.sleep`.

Logs from running this:
[will attach]

The fix seems straightforward: don't clear xcom unless the TI will actually 
execute. Will happily create a PR.



> Xcom data cleared too soon
> --------------------------
>
>                 Key: AIRFLOW-703
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-703
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core, scheduler, xcom
>    Affects Versions: Airflow 2.0, Airflow 1.7.1.3
>         Environment: Tested using Dockerized Airflow setup with MySQL backend 
> and Celery executor
>            Reporter: Len Frodgers
>              Labels: xcom
>
> Xcom data is cleared at the start of the `run` method of the `TaskInstance`, 
> regardless of whether the TI is subsequently executed (e.g. if the TI has 
> previously succeeded, it won't execute). This means that if a TI for a DagRun 
> is run twice in close succession, the latter will correctly not execute 
> (since the former TI succeeded or is still running), but WILL clear any xcoms 
> set by the former TI. Therefore, any downstream tasks depending on these 
> xcoms will fail.
> I noticed this bug when I changed num_runs of the scheduler from None to 10. 
> It didn't happen every time, but probably 50% or so.
> However, I can reproduce this reliably and repeatably with the following test 
> dag:
> ```
> def func1(ti, **kwargs):
>     ti.xcom_push("k1", "xcom_custom")
>     return "xcom_default"
> def func2(ti, **kwargs):
>     time.sleep(10)
>     xcom_custom = ti.xcom_pull("op1", key="k1")
>     xcom_default = ti.xcom_pull("op1")
>     logging.info("Default: %s, Custom: %s", xcom_default, xcom_custom)
>     assert None not in (xcom_custom, xcom_default)
> bug_dag = DAG(
>     'xcom_bug',
>     max_active_runs=1,
>     schedule_interval=timedelta(minutes=3),
>     start_date=datetime(2016, 12, 16, 14, 57)
> )
> op1 = PythonOperator(python_callable=func1, provide_context=True, 
> task_id="op1", dag=bug_dag)
> op2 = PythonOperator(python_callable=func2, provide_context=True, 
> task_id="op2", dag=bug_dag)
> op1.set_downstream(op2)
> ```
> To make op1 execute twice, I use the UI to run it twice while op2 is doing 
> the `time.sleep`.
> Logs from running this:
> [will attach]
> The fix seems straightforward: don't clear xcom unless the TI will actually 
> execute. Will happily create a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to