Bjorn Olsen created AIRFLOW-6190:
------------------------------------
Summary: Task instances queued and dequeued before worker is
ready, causing intermittently failed tasks
Key: AIRFLOW-6190
URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
Project: Apache Airflow
Issue Type: Bug
Components: core
Affects Versions: 1.10.6
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen
Attachments: image-2019-12-06-13-55-33-974.png
Below dag creates 20 identical simple tasks which depend on each other in
series.
Installing the DAG and executing all the DAG runs works perfectly the first
time around.
Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import (BranchPythonOperator,
PythonOperator)
import sys, os
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}
dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)
def func():
pass
prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task
prev_task = task
if __name__ == "__main__":
dag.cli(){code}
Example:
!image-2019-12-06-13-55-33-974.png|width=398,height=276!
The second attempt tasks have 2 Logs if they were successful.
However the tasks only have 1 log on the UI if they failed. On the OS, you can
see a log file for the Failed task attempts:
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for
<TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [scheduled]>,
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for
<TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [failed]>,
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
At first I thought this was because the workers were still busy with the
previous TaskInstance (there is a delay between when a TaskInstance state is
set to SUCCESS, and when the worker is actually done with it, because of the
worker heartbeat). The scheduler thinks the next task can be SCHEDULED ->
QUEUED, but does not start as the worker is still busy, and therefore it goes
back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the
failure above when the worker eventually wants to start it.
However what is a mystery to me is why it works the first time the dag_run
runs, and not the second time. Perhaps it is something specific to my
environment.
I'm going to try and debug this myself but if anyone else can replicate this
issue in their environment it could help me understand if it is just affecting
me (or not).
Just install the DAG, let it run 100% once, then clear it and let it run again
(and you should start seeing random failures)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)