John Doe created AIRFLOW-1258:
---------------------------------

             Summary: TaskInstances within SubDagOperator are marked as Failed 
after an hour
                 Key: AIRFLOW-1258
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1258
             Project: Apache Airflow
          Issue Type: Bug
    Affects Versions: 1.8.1
            Reporter: John Doe


We have multiple SubDagOperators which we use to isolate individual units in 
our broader dags (we typically have tens of SubDagOperators in a given DAG). 
For any TaskInstance inside the SubDag which runs over an hour, the dag fails 
right after the 1 hour mark.

This is completely unrelated to our codebase and can be recreated with a sleep 
BashOperator:

{code}
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}

def define_sub(dag, step_name, sleeptime):
    op = BashOperator(
        task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", 
dag=dag
    )
    return dag

def gen_sub_dag(parent_name, step_name, sleeptime):
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), 
default_args=DEFAULT_ARGS)
    define_sub(sub, step_name, sleeptime)
    return sub

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, 
schedule_interval=None)

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), 
task_id='long_runner_sub', dag=long_runner_parent
)
{code}

Under Airflow 1.7.1.3, we would see the following error in the SubDagOperator:

{code} [2017-05-25 17:08:56,082] {jobs.py:965} ERROR - The airflow run command 
failed at reporting an error. This should not occur in normal circumstances. 
Task state is 'running',reported state is 'success'. TI is <TaskInstance: 
long_runner.long_runner_sub.long_runner_sub 2017-05-24 16:00:00 [running]> 
{code}

which we could then manually mark as 'success' in the airflow database. 
However, starting in 1.8.1 (we skipped 1.8.0 as AIRFLOW-1004 was a hard 
blocker) the SubDag now instead fails outright.

Nothing in the logs indicate any reason for the failure - we've reduced the 
level to DEBUG and still see nothing.

airflow-scheduler.log:
{code}
2017-05-31 19:44:10,260 INFO - Heartbeating the process manager
2017-05-31 19:44:10,263 INFO - Started a process (PID: 6462) to generate tasks 
for /efs/airflow/dags/long_running.py - logging into 
/opt/airflow/logs/scheduler/2017-05-31/long_running.py.log
2017-05-31 19:44:10,268 INFO - Heartbeating the executor
2017-05-31 19:44:10,271 INFO - Executor reports long_runner.long_runner_sub 
execution_date=2017-05-31 18:42:55.400517 as failed
2017-05-31 19:44:10,324 INFO - Heartbeating the scheduler
{code}
 
In the SubDagOperator log, we see a second task queued immediately before the 
failure - despite the original task running unabated:

{code}
[2017-05-31 19:44:04,441] {base_task_runner.py:112} INFO - Running: ['bash', 
'-c', u'airflow run long_runner long_runner_sub 2017-05-31T18:42:55.400517 
--job_id 108 --raw -sd DAGS_FOLDER/long_running.py']
[2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:05,815] {models.py:1122} INFO - Dependencies not met for <TaskInstance: 
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]>, dependency 
'Task Instance State' FAILED: Task is in the 'running' state which is not a 
valid state for execution. The task must be cleared in order to be run.
[2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:05,816] {models.py:1148} DEBUG - <TaskInstance: 
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 
'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:05,817] {models.py:1148} DEBUG - <TaskInstance: 
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 
'Task Instance Not Already Running' PASSED: False, Task is already running, it 
started on 2017-05-31 18:43:03.494829.
[2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:05,817] {models.py:1122} INFO - Dependencies not met for <TaskInstance: 
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]>, dependency 
'Task Instance Not Already Running' FAILED: Task is already running, it started 
on 2017-05-31 18:43:03.494829.
[2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:05,817] {models.py:1148} DEBUG - <TaskInstance: 
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 
'Not In Retry Period' PASSED: True, The task instance was not marked for 
retrying.
[2017-05-31 19:44:05,818] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:05,817] {models.py:1148} DEBUG - <TaskInstance: 
long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:10,466] {jobs.py:1722} DEBUG - Executor state: failed task <TaskInstance: 
long_runner.long_runner_sub.long_runner_sub 2017-05-31 18:42:55.400517 
[running]>
[2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 
19:44:10,467] {jobs.py:1729} ERROR - Executor reports task instance 
<TaskInstance: long_runner.long_runner_sub.long_runner_sub 2017-05-31 
18:42:55.400517 [running]> finished (failed) although the task says its 
running. Was the task killed externally?
{code}

It's unclear if the second task is a response to the first incorrectly being 
marked as a failure, or if the second task being queued causes the failure 
state when it takes the poison pill)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to