John Doe created AIRFLOW-1258:
---------------------------------
Summary: TaskInstances within SubDagOperator are marked as Failed
after an hour
Key: AIRFLOW-1258
URL: https://issues.apache.org/jira/browse/AIRFLOW-1258
Project: Apache Airflow
Issue Type: Bug
Affects Versions: 1.8.1
Reporter: John Doe
We have multiple SubDagOperators which we use to isolate individual units in
our broader dags (we typically have tens of SubDagOperators in a given DAG).
For any TaskInstance inside the SubDag which runs over an hour, the dag fails
right after the 1 hour mark.
This is completely unrelated to our codebase and can be recreated with a sleep
BashOperator:
{code}
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
def define_sub(dag, step_name, sleeptime):
op = BashOperator(
task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model",
dag=dag
)
return dag
def gen_sub_dag(parent_name, step_name, sleeptime):
sub = DAG(dag_id='%s.%s' % (parent_name, step_name),
default_args=DEFAULT_ARGS)
define_sub(sub, step_name, sleeptime)
return sub
long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS,
schedule_interval=None)
long_sub_dag = SubDagOperator(
subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500),
task_id='long_runner_sub', dag=long_runner_parent
)
{code}
Under Airflow 1.7.1.3, we would see the following error in the SubDagOperator:
{code} [2017-05-25 17:08:56,082] {jobs.py:965} ERROR - The airflow run command
failed at reporting an error. This should not occur in normal circumstances.
Task state is 'running',reported state is 'success'. TI is <TaskInstance:
long_runner.long_runner_sub.long_runner_sub 2017-05-24 16:00:00 [running]>
{code}
which we could then manually mark as 'success' in the airflow database.
However, starting in 1.8.1 (we skipped 1.8.0 as AIRFLOW-1004 was a hard
blocker) the SubDag now instead fails outright.
Nothing in the logs indicate any reason for the failure - we've reduced the
level to DEBUG and still see nothing.
airflow-scheduler.log:
{code}
2017-05-31 19:44:10,260 INFO - Heartbeating the process manager
2017-05-31 19:44:10,263 INFO - Started a process (PID: 6462) to generate tasks
for /efs/airflow/dags/long_running.py - logging into
/opt/airflow/logs/scheduler/2017-05-31/long_running.py.log
2017-05-31 19:44:10,268 INFO - Heartbeating the executor
2017-05-31 19:44:10,271 INFO - Executor reports long_runner.long_runner_sub
execution_date=2017-05-31 18:42:55.400517 as failed
2017-05-31 19:44:10,324 INFO - Heartbeating the scheduler
{code}
In the SubDagOperator log, we see a second task queued immediately before the
failure - despite the original task running unabated:
{code}
[2017-05-31 19:44:04,441] {base_task_runner.py:112} INFO - Running: ['bash',
'-c', u'airflow run long_runner long_runner_sub 2017-05-31T18:42:55.400517
--job_id 108 --raw -sd DAGS_FOLDER/long_running.py']
[2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:05,815] {models.py:1122} INFO - Dependencies not met for <TaskInstance:
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]>, dependency
'Task Instance State' FAILED: Task is in the 'running' state which is not a
valid state for execution. The task must be cleared in order to be run.
[2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:05,816] {models.py:1148} DEBUG - <TaskInstance:
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency
'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:05,817] {models.py:1148} DEBUG - <TaskInstance:
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency
'Task Instance Not Already Running' PASSED: False, Task is already running, it
started on 2017-05-31 18:43:03.494829.
[2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:05,817] {models.py:1122} INFO - Dependencies not met for <TaskInstance:
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]>, dependency
'Task Instance Not Already Running' FAILED: Task is already running, it started
on 2017-05-31 18:43:03.494829.
[2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:05,817] {models.py:1148} DEBUG - <TaskInstance:
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency
'Not In Retry Period' PASSED: True, The task instance was not marked for
retrying.
[2017-05-31 19:44:05,818] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:05,817] {models.py:1148} DEBUG - <TaskInstance:
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:10,466] {jobs.py:1722} DEBUG - Executor state: failed task <TaskInstance:
long_runner.long_runner_sub.long_runner_sub 2017-05-31 18:42:55.400517
[running]>
[2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31
19:44:10,467] {jobs.py:1729} ERROR - Executor reports task instance
<TaskInstance: long_runner.long_runner_sub.long_runner_sub 2017-05-31
18:42:55.400517 [running]> finished (failed) although the task says its
running. Was the task killed externally?
{code}
It's unclear if the second task is a response to the first incorrectly being
marked as a failure, or if the second task being queued causes the failure
state when it takes the poison pill)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)