[
https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16161526#comment-16161526
]
Ash Berlin-Taylor commented on AIRFLOW-1131:
--------------------------------------------
[~tedmiston] I think that is a different issue to the one I commented about. It
does sound like an important feature/bug fix though so I would open a PR for it
anyway.
I think there are two different issues at work for different people in this
thread. One appears to be related to long running tasks, and the Celery config
would help with that (and I _think_) that would solve the problem the OP had.
And then there's your case which may or may not also be the problem the OP had.
> DockerOperator jobs time out and get stuck in "running" forever
> ---------------------------------------------------------------
>
> Key: AIRFLOW-1131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1131
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.9.0
> Environment: Python 2.7.12
> git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]
> Reporter: Vitor Baptista
>
> With the following DAG and task:
> {code}
> import os
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.docker_operator import DockerOperator
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2017, 1, 1),
> 'retries': 3,
> 'retry_delay': timedelta(minutes=10),
> }
> dag = DAG(
> dag_id='smoke_test',
> default_args=default_args,
> max_active_runs=1,
> schedule_interval='@daily'
> )
> sleep_forever_task = DockerOperator(
> task_id='sleep_forever',
> dag=dag,
> image='alpine:latest',
> api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
> command='sleep {}'.format(60 * 60 * 24),
> )
> {code}
> When I run it, this is what I get:
> {code}
> *** Log file isn't local.
> *** Fetching here:
> http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00
> [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash',
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id
> 2537 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:
> /usr/local/airflow/src/airflow/airflow/configuration.py:128:
> DeprecationWarning: This method will be removed in future versions. Use
> 'parser.read_file()' instead.
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor
> CeleryExecutor
> [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [queued]>
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [queued]>
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,605] {models.py:1338} INFO -
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask:
> --------------------------------------------------------------------------------
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting
> attempt 1 of 4
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask:
> --------------------------------------------------------------------------------
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,620] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,620] {models.py:1362} INFO - Executing
> <Task(DockerOperator): sleep_forever> on 2017-04-18 00:00:00
> [2017-04-20 11:19:59,662] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 11:19:59,661] {docker_operator.py:132} INFO - Starting docker
> container from image alpine:latest
> [2017-04-20 12:21:25,661] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:25,809] {base_task_runner.py:112} INFO - Running: ['bash',
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id
> 2574 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 12:21:26,117] {base_task_runner.py:95} INFO - Subtask:
> /usr/local/airflow/src/airflow/airflow/configuration.py:128:
> DeprecationWarning: This method will be removed in future versions. Use
> 'parser.read_file()' instead.
> [2017-04-20 12:21:26,117] {base_task_runner.py:95} INFO - Subtask:
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 12:21:26,301] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,301] {__init__.py:56} INFO - Using executor
> CeleryExecutor
> [2017-04-20 12:21:26,310] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,310] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 12:21:26,324] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,324] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 12:21:26,426] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,426] {models.py:172} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:26,564] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,563] {models.py:1140} INFO - Dependencies not met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [running]>,
> dependency 'Task Instance State' FAILED: Task is in the 'running' state which
> is not a valid state for execution. The task must be cleared in order to be
> run.
> [2017-04-20 12:21:26,564] {base_task_runner.py:95} INFO - Subtask:
> [2017-04-20 12:21:26,564] {models.py:1140} INFO - Dependencies not met for
> <TaskInstance: smoke_test.sleep_forever 2017-04-18 00:00:00 [running]>,
> dependency 'Task Instance Not Already Running' FAILED: Task is already
> running, it started on 2017-04-20 11:19:59.597425.
> [2017-04-20 12:21:30,821] {jobs.py:2163} WARNING - Recorded pid 113 is not a
> descendant of the current pid 178
> {code}
> Note that it runs the task normally, and after about 1 hour it tries to
> re-load the task, running it again, but then fails because the {{subprocess}}
> started isn't a child of the current process. After this, the task is still
> in {{running}} state, never changing to {{failed}}.
> I haven't tested, but I suspect this doesn't happen if the task keeps
> printing something.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)