Artiom created AIRFLOW-2468:
-------------------------------

             Summary: Clearing tasks results in zombie processes and tasks 
queuing
                 Key: AIRFLOW-2468
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2468
             Project: Apache Airflow
          Issue Type: Bug
          Components: core
    Affects Versions: Airflow 1.8, Airflow 1.7.1.3
            Reporter: Artiom
         Attachments: zombies.PNG

Hi everyone,

I was in the process of upgrading airflow to 1.8 when I spotted strange airflow 
behavior. Currently I have a default airflow 1.8 set up with LocalExecutor and 
Postgres database. Set up using systemd.

To make this error reproducible I created two identical dags. 

 
{code:java}
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(
    dag_id='example_clearing_job', default_args=args,
    schedule_interval='0 0 * * *')

first_task = BashOperator(
    task_id='first_task', bash_command='sleep 60', dag=dag)

second_task = BashOperator(
    task_id='second_task',
    bash_command='sleep 180',
    dag=dag)
second_task.set_upstream(first_task)

run_this = BashOperator(
    task_id='run_this', bash_command='echo 1', dag=dag)
run_this.set_upstream(second_task)

run_that = BashOperator(
    task_id='run_that', bash_command='ls -l', dag=dag)
run_that.set_upstream(second_task)
{code}
and

 

 
{code:java}
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(
    dag_id='example_clearing_job_2', default_args=args,
    schedule_interval='0 0 * * *')

first_task = BashOperator(
    task_id='first_task', bash_command='sleep 60', dag=dag)

second_task = BashOperator(
    task_id='second_task',
    bash_command='sleep 180',
    dag=dag)
second_task.set_upstream(first_task)

run_this = BashOperator(
    task_id='run_this', bash_command='echo 1', dag=dag)
run_this.set_upstream(second_task)

run_that = BashOperator(
    task_id='run_that', bash_command='ls -l', dag=dag)
run_that.set_upstream(second_task)
{code}
 

Suppose I have only these two dags in my airflow and some finished runs in each 
dag. If I will go and clear all task in first dag and then do exactly the same 
thing in other dag I start noticing weird things.
 * The first task in the dag that I cleared first kicks off
 * The first task in the dag that I cleared second does not start until first 
task in the first dag finishes (after sleep 60 finishes).
 * On the output of ps -ax | grep airflow I see a lot of zombie processes.
  !zombies.PNG!

I found couple of things that helps with this issue.

Firstly I noticed that if I disable second DAG before clearing tasks it does 
not create zombies and works fine alongside first DAG. 

Secondly, I noticed a slight improvement when I increased number of max_threads 
in scheduler config.

Zombie processes eventually disappear when the first job in the first dag 
finishes, but before this happens no other task gets scheduled.

Is there an explanation to this strange behavior or maybe I am doing something 
incorrectly?What parts of airflow are related to this error and are there any 
plans to fix it?

 

Thanks,

Artiom 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to