[ https://issues.apache.org/jira/browse/AIRFLOW-62?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274859#comment-15274859 ]
Alex Papanicolaou edited comment on AIRFLOW-62 at 5/6/16 10:40 PM: ------------------------------------------------------------------- Here is a run from the Airflow version PyPI. {noformat} -------------------------------------------------------------------------------- Attempt 1 out of 2 -------------------------------------------------------------------------------- [2016-05-06 22:15:51,093] {models.py:1041} INFO - Executing <Task(PythonOperator): upload_activity_status> on 2016-05-06 20:00:00 [2016-05-06 22:15:51,103] {xcom_test_dag_pypi.py:44} INFO - Getting status from upstream task ['poll_stream0', 'poll_stream1', 'poll_stream2', 'poll_stream3', 'poll_stream4', 'poll_stream5', 'poll_stream6', 'poll_stream7', 'poll_stream8', 'poll_stream9', 'poll_stream10', 'poll_stream11', 'poll_stream12', 'poll_stream13', 'poll_stream14', 'poll_stream15', 'poll_stream16', 'poll_stream17', 'poll_stream18', 'poll_stream19', 'poll_stream20', 'poll_stream21', 'poll_stream22', 'poll_stream23', 'poll_stream24', 'poll_stream25', 'poll_stream26', 'poll_stream27', 'poll_stream28', 'poll_stream29'] [2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:46} INFO - Xcom pull results: ({'data_stream': 'stream0', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream1', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream2', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream3', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream4', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream5', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream6', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream7', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream8', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream9', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream10', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream11', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream12', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream13', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream14', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream15', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream16', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream17', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream18', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream19', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream20', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream21', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream22', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream23', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream24', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream25', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream26', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream27', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream28', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream29', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}) [2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:47} INFO - Upload to DB here [2016-05-06 22:15:51,163] {python_operator.py:66} INFO - Done. Returned value was: None {noformat} was (Author: alex.papanic): {noformat} -------------------------------------------------------------------------------- Attempt 1 out of 2 -------------------------------------------------------------------------------- [2016-05-06 22:15:51,093] {models.py:1041} INFO - Executing <Task(PythonOperator): upload_activity_status> on 2016-05-06 20:00:00 [2016-05-06 22:15:51,103] {xcom_test_dag_pypi.py:44} INFO - Getting status from upstream task ['poll_stream0', 'poll_stream1', 'poll_stream2', 'poll_stream3', 'poll_stream4', 'poll_stream5', 'poll_stream6', 'poll_stream7', 'poll_stream8', 'poll_stream9', 'poll_stream10', 'poll_stream11', 'poll_stream12', 'poll_stream13', 'poll_stream14', 'poll_stream15', 'poll_stream16', 'poll_stream17', 'poll_stream18', 'poll_stream19', 'poll_stream20', 'poll_stream21', 'poll_stream22', 'poll_stream23', 'poll_stream24', 'poll_stream25', 'poll_stream26', 'poll_stream27', 'poll_stream28', 'poll_stream29'] [2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:46} INFO - Xcom pull results: ({'data_stream': 'stream0', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream1', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream2', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream3', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream4', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream5', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream6', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream7', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream8', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream9', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream10', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream11', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream12', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream13', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream14', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream15', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream16', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream17', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream18', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream19', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream20', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream21', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream22', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream23', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream24', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream25', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream26', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream27', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream28', 'status': False, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}, {'data_stream': 'stream29', 'status': True, 'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)}) [2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:47} INFO - Upload to DB here [2016-05-06 22:15:51,163] {python_operator.py:66} INFO - Done. Returned value was: None {noformat} > XCom push not working reliably > ------------------------------ > > Key: AIRFLOW-62 > URL: https://issues.apache.org/jira/browse/AIRFLOW-62 > Project: Apache Airflow > Issue Type: Bug > Components: db, operators > Affects Versions: Airflow 1.7.0 > Environment: Postgres backed Airflow running with Celery inside of > the puckel Docker setup. > Reporter: Alex Papanicolaou > Assignee: Jeremiah Lowin > > I have a DAG that polls for activity in various data streams from a database > and then uploads the activity statuses to a table. Each of the polling tasks > are python operators that once they get the polling result, return a dict as > an XCom push. The dict contains two entries which are strings, one which is > a bool, and one which is a datetime object. There is a final task that pulls > all the results and uploads the collective statuses to a table. I chose this > pattern since I figured it might be better to do one collective write > operation on all the results. > Before I moved ahead to the github master branch I was using 1.7.0 from PyPI > and this worked fine. Now that I am on the github master branch, I find that > the XCom pushing is unreliable. The returned values in the logs show up > correctly but when doing the XCom pull, I get None for some of the returned > values. Investigating the XCom result in the Webserver also shows nothing > there. But if I rerun a task where the XCom failed, the push works and the > XCom result is as it should be. > Nothing appears to have changed in the codebase so I am at a loss. Perhaps > it really wasn't working before? How would the backing postgres handle these > simultaneous writes? I can't imagine that would be a problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)