Sam Danbury created AIRFLOW-3778:
------------------------------------
Summary: QUEUED task not being scheduled
Key: AIRFLOW-3778
URL: https://issues.apache.org/jira/browse/AIRFLOW-3778
Project: Apache Airflow
Issue Type: Bug
Affects Versions: 1.9.0
Reporter: Sam Danbury
Can you point me to the bit of the code that queues tasks that are in a QUEUED
state please?
I can find code that schedules tasks in a NONE or UP_FOR_RETRY state, but not
QUEUED.
I had a situation where the scheduler crashed whilst my DAG was being
scheduled, so some tasks were left set to QUEUED. When I restart the scheduler,
the tasks still don't get scheduled and remain in QUEUED state indefinitely.
I have recreated the scenario by waiting until some tasks are in a QUEUED
state, then kill the scheduler process. Restarting the scheduler doesn't
automatically run the tasks.
I setup a simple DAG to test:
{code:java}
from airflow import utils
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
now = datetime.now()
now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0,
second=0, microsecond=0)
START_DATE = now_to_the_hour
DAG_NAME = 'pool_dag'
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': utils.dates.days_ago(2)
}
dag = DAG(DAG_NAME, schedule_interval=None, default_args=default_args,
catchup=False)
for i in range(16):
pool = None
if (i % 2) == 0:
pool = 'task_pool'
task = BashOperator(
task_id='runme_{}'.format(i),
params={
'duration': i * 10
},
pool=pool,
bash_command='sleep {{ params.duration }}',
dag=dag)
{code}
I was trying to create a scenario with a LocalExecutor (parallelism=8) where we
had 16 tasks running with different sleep times, half of which would use a pool
and half use the default pool of workers.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)