If you're using 1.7.1.3, there's a bug (fixed in master) involving a race condition that can occasionally result in XCom values being cleared.
The short version is: 1) task get's scheduled; 2) worker A grabs task; 3) worker B grabs task; 4) worker A clears the XCom, completes task, and pushes XCom; 5) worker B clears the XCom, sees that another worker has already completed the task, and aborts On Wed, Aug 31, 2016 at 1:59 PM Vishal Doshi (JIRA) <[email protected]> wrote: > > [ > https://issues.apache.org/jira/browse/AIRFLOW-62?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453343#comment-15453343 > ] > > Vishal Doshi commented on AIRFLOW-62: > ------------------------------------- > > I'm getting this error as well. It's intermittent. When I look at the > logs, it looks like the TaskInstance is being run twice, and when run is > called the second time, `self.clear_xcom_data()` is called before the > system realizes the run previously succeeded. > > Not sure why it's run twice. > > From my log: > > {code} > [2016-08-31 17:59:16,221] {models.py:154} INFO - Filling up the DagBag > from /dags/dag_compute.py > [2016-08-31 17:59:17,451] {main.py:21} INFO - Sparse computation disabled. > [2016-08-31 17:59:18,528] {main.py:35} INFO - Map reduce machinery > disabled. > [2016-08-31 17:59:20,969] {models.py:154} INFO - Filling up the DagBag > from /dags/dag_compute.py > [2016-08-31 17:59:22,632] {main.py:21} INFO - Sparse computation disabled. > [2016-08-31 17:59:23,759] {main.py:35} INFO - Map reduce machinery > disabled. > [2016-08-31 17:59:25,617] {models.py:154} INFO - Filling up the DagBag > from /dags/dag_compute.py > [2016-08-31 17:59:26,365] {models.py:1196} INFO - > > -------------------------------------------------------------------------------- > Starting attempt 1 of 1 > > -------------------------------------------------------------------------------- > > [2016-08-31 17:59:26,395] {models.py:1219} INFO - Executing > <Task(PythonOperator): myoperator> on 2016-08-31 17:58:28 > [2016-08-31 17:59:27,781] {main.py:21} INFO - Sparse computation disabled. > [2016-08-31 17:59:28,443] {python_operator.py:67} INFO - Done. Returned > value was: None > [2016-08-31 17:59:28,912] {main.py:35} INFO - Map reduce machinery > disabled. > [2016-08-31 17:59:33,337] {models.py:154} INFO - Filling up the DagBag > from /dags/dag_compute.py > [2016-08-31 17:59:35,571] {main.py:21} INFO - Sparse computation disabled. > [2016-08-31 17:59:36,669] {main.py:35} INFO - Map reduce machinery > disabled. > [2016-08-31 17:59:37,915] {models.py:1150} INFO - Task <TaskInstance: > compute.myoperator 2016-08-31 17:58:28 [success]> previously succeeded on > 2016-08-31 17:59:28 > {code} > > airflow==1.7.1.3 > > > XCom push not working reliably > > ------------------------------ > > > > Key: AIRFLOW-62 > > URL: https://issues.apache.org/jira/browse/AIRFLOW-62 > > Project: Apache Airflow > > Issue Type: Bug > > Components: celery > > Affects Versions: Airflow 1.7.0 > > Environment: Postgres backed Airflow running with Celery inside > of the puckel Docker setup. > > Reporter: Alex Papanicolaou > > Assignee: Jeremiah Lowin > > Fix For: Airflow 1.7.0 > > > > > > I have a DAG that polls for activity in various data streams from a > database and then uploads the activity statuses to a table. Each of the > polling tasks are python operators that once they get the polling result, > return a dict as an XCom push. The dict contains two entries which are > strings, one which is a bool, and one which is a datetime object. There is > a final task that pulls all the results and uploads the collective statuses > to a table. I chose this pattern since I figured it might be better to do > one collective write operation on all the results. > > Before I moved ahead to the github master branch I was using 1.7.0 from > PyPI and this worked fine. Now that I am on the github master branch, I > find that the XCom pushing is unreliable. The returned values in the logs > show up correctly but when doing the XCom pull, I get None for some of the > returned values. Investigating the XCom result in the Webserver also shows > nothing there. But if I rerun a task where the XCom failed, the push works > and the XCom result is as it should be. > > Nothing appears to have changed in the codebase so I am at a loss. > Perhaps it really wasn't working before? How would the backing postgres > handle these simultaneous writes? I can't imagine that would be a problem. > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >
