[
https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16161495#comment-16161495
]
Taylor Edmiston commented on AIRFLOW-1131:
------------------------------------------
[~ashb] - We're using a custom Mesos executor, however, I think the issue is
independent of executor as it happens way after the task was successfully
assigned to a node and the docker container created by DockerOperator is
already running. The runtime for these tasks for us (big loads to Redshift) is
not over 1 hour, typically 5–10 minutes (including a force pull of the Docker
image). We're running on Docker API version 1.2.4.
What we experienced was that tasks that went over 1 minute without passing an
operation to/from the Docker container experienced the Docker container spun up
by the DockerOperator dying. This occurred when the process in that Docker
container had a task that took more than 1 minute (a big Redshift load) without
logging back until it was done.
In our case, the fix lied in increasing the timeout arg on docker's APIClient -
http://docker-py.readthedocs.io/en/stable/api.html#docker.api.client.APIClient
inside DockerOperator beyond the default of 60 seconds. IMO this is an
important parameter that should be configurable via the DockerOperator which is
not possible in the current implementation (I'm happy to submit a PR for this).
We also enabled tty mode on the call to create_container(...) to stream logs
back more granularly but I don't think that was strictly required for the fix.
Does this sound like the same issuing you're experiencing or something
different?
> DockerOperator jobs time out and get stuck in "running" forever
> ---------------------------------------------------------------
>
> Key: AIRFLOW-1131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1131
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.9.0
> Environment: Python 2.7.12
> git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]
> Reporter: Vitor Baptista
>
> With the following DAG and task:
> {code}
> import os
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.docker_operator import DockerOperator
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2017, 1, 1),
> 'retries': 3,
> 'retry_delay': timedelta(minutes=10),
> }
> dag = DAG(
> dag_id='smoke_test',
> default_args=default_args,
> max_active_runs=1,
> schedule_interval='@daily'
> )
> sleep_forever_task = DockerOperator(
> task_id='sleep_forever',
> dag=dag,
> image='alpine:latest',
> api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
> command='sleep {}'.format(60 * 60 * 24),
> )
> {code}
> When I run it, this is what I get:
> {code}
> *** Log file isn't local.
> *** Fetching here:
> http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00
> [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash',
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id
> 2537 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:
> /usr/local/airflow/src/airflow/airflow/configuration.py:128:
> DeprecationWarning: This method will be removed in future versions. Use
> 'parser.read_file()' instead.
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor
> CeleryExecutor
> [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [queued]>
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [queued]>
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,605] {models.py:1338} INFO -
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask:
> --------------------------------------------------------------------------------
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting
> attempt 1 of 4
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask:
> --------------------------------------------------------------------------------
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,620] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,620] {models.py:1362} INFO - Executing
> <Task(DockerOperator): sleep_forever> on 2017-04-18 00:00:00
> [2017-04-20 11:19:59,662] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,661] {docker_operator.py:132} INFO - Starting docker
> container from image alpine:latest
> [2017-04-20 12:21:25,661] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:25,809] {base_task_runner.py:112} INFO - Running: ['bash',
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id
> 2574 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 12:21:26,117] {base_task_runner.py:95} INFO - Subtask:
> /usr/local/airflow/src/airflow/airflow/configuration.py:128:
> DeprecationWarning: This method will be removed in future versions. Use
> 'parser.read_file()' instead.
> [2017-04-20 12:21:26,117] {base_task_runner.py:95} INFO - Subtask:
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 12:21:26,301] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,301] {__init__.py:56} INFO - Using executor
> CeleryExecutor
> [2017-04-20 12:21:26,310] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,310] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 12:21:26,324] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,324] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 12:21:26,426] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,426] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:26,564] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,563] {models.py:1140} INFO - Dependencies not met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [running]>,
> dependency 'Task Instance State' FAILED: Task is in the 'running' state which
> is not a valid state for execution. The task must be cleared in order to be
> run.
> [2017-04-20 12:21:26,564] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,564] {models.py:1140} INFO - Dependencies not met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [running]>,
> dependency 'Task Instance Not Already Running' FAILED: Task is already
> running, it started on 2017-04-20 11:19:59.597425.
> [2017-04-20 12:21:30,821] {jobs.py:2163} WARNING - Recorded pid 113 is not a
> descendant of the current pid 178
> {code}
> Note that it runs the task normally, and after about 1 hour it tries to
> re-load the task, running it again, but then fails because the {{subprocess}}
> started isn't a child of the current process. After this, the task is still
> in {{running}} state, never changing to {{failed}}.
> I haven't tested, but I suspect this doesn't happen if the task keeps
> printing something.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)