[ 
https://issues.apache.org/jira/browse/AIRFLOW-62?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274859#comment-15274859
 ] 

Alex Papanicolaou edited comment on AIRFLOW-62 at 5/6/16 10:40 PM:
-------------------------------------------------------------------

Here is a run from the Airflow version PyPI.

{noformat}
--------------------------------------------------------------------------------
Attempt 1 out of 2
--------------------------------------------------------------------------------

[2016-05-06 22:15:51,093] {models.py:1041} INFO - Executing 
<Task(PythonOperator): upload_activity_status> on 2016-05-06 20:00:00
[2016-05-06 22:15:51,103] {xcom_test_dag_pypi.py:44} INFO - Getting status from 
upstream task ['poll_stream0', 'poll_stream1', 'poll_stream2', 'poll_stream3', 
'poll_stream4', 'poll_stream5', 'poll_stream6', 'poll_stream7', 'poll_stream8', 
'poll_stream9', 'poll_stream10', 'poll_stream11', 'poll_stream12', 
'poll_stream13', 'poll_stream14', 'poll_stream15', 'poll_stream16', 
'poll_stream17', 'poll_stream18', 'poll_stream19', 'poll_stream20', 
'poll_stream21', 'poll_stream22', 'poll_stream23', 'poll_stream24', 
'poll_stream25', 'poll_stream26', 'poll_stream27', 'poll_stream28', 
'poll_stream29']
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:46} INFO - Xcom pull results:
({'data_stream': 'stream0',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream1',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream2',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream3',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream4',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream5',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream6',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream7',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream8',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream9',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream10',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream11',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream12',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream13',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream14',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream15',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream16',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream17',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream18',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream19',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream20',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream21',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream22',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream23',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream24',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream25',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream26',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream27',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream28',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream29',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)})
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:47} INFO - Upload to DB here
[2016-05-06 22:15:51,163] {python_operator.py:66} INFO - Done. Returned value 
was: None
{noformat}


was (Author: alex.papanic):

{noformat}
--------------------------------------------------------------------------------
Attempt 1 out of 2
--------------------------------------------------------------------------------

[2016-05-06 22:15:51,093] {models.py:1041} INFO - Executing 
<Task(PythonOperator): upload_activity_status> on 2016-05-06 20:00:00
[2016-05-06 22:15:51,103] {xcom_test_dag_pypi.py:44} INFO - Getting status from 
upstream task ['poll_stream0', 'poll_stream1', 'poll_stream2', 'poll_stream3', 
'poll_stream4', 'poll_stream5', 'poll_stream6', 'poll_stream7', 'poll_stream8', 
'poll_stream9', 'poll_stream10', 'poll_stream11', 'poll_stream12', 
'poll_stream13', 'poll_stream14', 'poll_stream15', 'poll_stream16', 
'poll_stream17', 'poll_stream18', 'poll_stream19', 'poll_stream20', 
'poll_stream21', 'poll_stream22', 'poll_stream23', 'poll_stream24', 
'poll_stream25', 'poll_stream26', 'poll_stream27', 'poll_stream28', 
'poll_stream29']
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:46} INFO - Xcom pull results:
({'data_stream': 'stream0',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream1',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream2',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream3',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream4',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream5',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream6',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream7',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream8',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream9',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream10',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream11',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream12',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream13',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream14',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream15',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream16',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream17',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream18',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream19',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream20',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream21',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream22',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream23',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream24',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream25',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream26',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream27',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream28',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream29',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)})
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:47} INFO - Upload to DB here
[2016-05-06 22:15:51,163] {python_operator.py:66} INFO - Done. Returned value 
was: None
{noformat}

> XCom push not working reliably
> ------------------------------
>
>                 Key: AIRFLOW-62
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-62
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: db, operators
>    Affects Versions: Airflow 1.7.0
>         Environment: Postgres backed Airflow running with Celery inside of 
> the puckel Docker setup.
>            Reporter: Alex Papanicolaou
>            Assignee: Jeremiah Lowin
>
> I have a DAG that polls for activity in various data streams from a database 
> and then uploads the activity statuses to a table.  Each of the polling tasks 
> are python operators that once they get the polling result, return a dict as 
> an XCom push.  The dict contains two entries which are strings, one which is 
> a bool, and one which is a datetime object.  There is a final task that pulls 
> all the results and uploads the collective statuses to a table.  I chose this 
> pattern since I figured it might be better to do one collective write 
> operation on all the results.
> Before I moved ahead to the github master branch I was using 1.7.0 from PyPI 
> and this worked fine.  Now that I am on the github master branch, I find that 
> the XCom pushing is unreliable.  The returned values in the logs show up 
> correctly but when doing the XCom pull, I get None for some of the returned 
> values.  Investigating the XCom result in the Webserver also shows nothing 
> there.  But if I rerun a task where the XCom failed, the push works and the 
> XCom result is as it should be.
> Nothing appears to have changed in the codebase so I am at a loss.  Perhaps 
> it really wasn't working before?  How would the backing postgres handle these 
> simultaneous writes?  I can't imagine that would be a problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to