We have several remote workers running on a server independent from our scheduler. We are using Celery executor with Redis, Redis runs alongside our scheduler.
Periodically, we see messages in the the logs for the workers about losing connectivity to Redis: File "/usr/local/lib/python2.7/site-packages/redis/connection.py", line 168, in readline self._read_from_socket() File "/usr/local/lib/python2.7/site-packages/redis/connection.py", line 143, in _read_from_socket (e.args,)) ConnectionError: Error while reading from socket: ('Connection closed by server.',) This causes Celery to retrigger any unacknowledged jobs: [2017-07-11 12:42:42,420: WARNING/MainProcess] Restoring 11 unacknowledged message(s) For standard, non subtasks, this appears to be a nonissue and the DAG can continue with the second triggered task failing. For subtasks on the other hand, this appears to trigger an update to the task instance's entry in the database which causes the subdag operator to see the task instance as failed (even though the original task is still running). Any tasks downstream of this task are then marked as upstream failed. For many of our DAGs this is not an issue but for some which operate on depends_on_past this obviously creates issues. Here is an example of the logs from one of the task instances when the second job starts: [2017-07-11 12:42:44,733] {models.py:168} INFO - Filling up the DagBag from /usr/lib/airflow/shared/dags/workflow/my_dags.py [2017-07-11 12:42:45,671] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run convert_tables_15m_v1.convert_collapse_opportunity_line_item collapse_opportunity_line_item 2017-07-11T11:47:00 --job_id 132921 --raw -sd DAGS_FOLDER/convert_tables_dags.py'] [2017-07-11 12:42:46,232] {base_task_runner.py:95} INFO - Subtask: [2017-07-11 12:42:46,232] {__init__.py:57} INFO - Using executor CeleryExecutor [2017-07-11 12:42:47,436] {base_task_runner.py:95} INFO - Subtask: [2017-07-11 12:42:47,435] {models.py:168} INFO - Filling up the DagBag from /usr/lib/airflow/shared/dags/workflow/convert_tables_dags.py [2017-07-11 12:42:50,498] {base_task_runner.py:95} INFO - Subtask: [2017-07-11 12:42:50,498] {models.py:1122} INFO - Dependencies not met for <TaskInstance: convert_tables_15m_v1.convert_collapse_opportunity_line_item.collapse_opportunity_line_item 2017-07-11 11:47:00 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2017-07-11 12:31:43.569564. [2017-07-11 12:42:50,499] {base_task_runner.py:95} INFO - Subtask: [2017-07-11 12:42:50,499] {models.py:1122} INFO - Dependencies not met for <TaskInstance: convert_tables_15m_v1.convert_collapse_opportunity_line_item.collapse_opportunity_line_item 2017-07-11 11:47:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run. [2017-07-11 12:42:50,754] {jobs.py:2172} WARNING - Recorded pid 4769 is not a descendant of the current pid 6552 [2017-07-11 12:42:56,255] {jobs.py:2179} WARNING - State of this instance has been externally set to failed. Taking the poison pill. So long. Is there something we can do in order to fix this issue or work around this issue when jobs are retriggered from Airflow? I am not sure it is actually connectivity issues to our Redis instance since not all of our worker processes have `Connection closed by server.` log messages simultaneously. Irregardless it seems like a situation Airflow should gracefully be able to handle. We are on Airflow 1.8.1 Thanks! Alex