[
https://issues.apache.org/jira/browse/AIRFLOW-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
John Culver updated AIRFLOW-1157:
---------------------------------
Description:
If a dag is run that contains a task using a pool that doesn't exist, the
scheduler will crash.
Manually triggering the run of this dag on an environment without a pool named
'a_non_existent_pool' will crash the scheduler:
{code}
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
dag = DAG(dag_id='crash_scheduler',
start_date=datetime(2017,1,1),
schedule_interval=None)
t1 = DummyOperator(task_id='crash',
pool='a_non_existent_pool',
dag=dag)
{code}
Here is the relevant log output on the scheduler:
{noformat}
[2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/test-3.py finished
[2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/test_s3_file_move.py finished
[2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process
(PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - logging
into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
[2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process
(PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py -
logging into
/tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
[2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
<TaskInstance: move_s3_file_test.move_files 2017-04-27 19:31:22.298893
[scheduled]>
[2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in
Pool(name=None) with 128 open slots and 1 task instances in queue
[2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 0/16
running tasks
[2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor
(u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of
(u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
22, 298893)) to queued
[2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: airflow
run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local -sd
/opt/airflow/dags/test_s3_file_move.py
[2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
[2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing
(u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/crash_scheduler.py finished
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/configuration/constants.py finished
[2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process
(PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py -
logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
[2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process
(PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into
/tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
[2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
<TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542
[scheduled]>
[2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
[2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for processes
to exit...
Traceback (most recent call last):
File "/usr/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in
scheduler
job.run()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in
_execute
self._execute_helper(processor_manager)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in
_execute_helper
(State.SCHEDULED,))
File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in
wrapper
result = func(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in
_execute_task_instances
open_slots = pools[pool].open_slots(session=session)
KeyError: u'a_non_existant_pool'
{noformat}
was:
If a dag is run that contains a task using a pool that doesn't exist, the
scheduler will crash.
Manually triggering the run of this dag on an environment without a pool named
'a_non_existent_pool' will crash the scheduler:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
dag = DAG(dag_id='crash_scheduler',
start_date=datetime(2017,1,1),
schedule_interval=None)
t1 = DummyOperator(task_id='crash',
pool='a_non_existent_pool',
dag=dag)
Here is the relevant log output on the scheduler:
[2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/test-3.py finished
[2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/test_s3_file_move.py finished
[2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process
(PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - logging
into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
[2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process
(PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py -
logging into
/tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
[2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
<TaskInstance: move_s3_file_test.move_files 2017-04-27 19:31:22.298893
[scheduled]>
[2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in
Pool(name=None) with 128 open slots and 1 task instances in queue
[2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 0/16
running tasks
[2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor
(u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of
(u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
22, 298893)) to queued
[2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: airflow
run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local -sd
/opt/airflow/dags/test_s3_file_move.py
[2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
[2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing
(u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/crash_scheduler.py finished
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for
/opt/airflow/dags/configuration/constants.py finished
[2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process
(PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py -
logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
[2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process
(PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into
/tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
[2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
<TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542
[scheduled]>
[2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
[2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for processes
to exit...
Traceback (most recent call last):
File "/usr/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in
scheduler
job.run()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in
_execute
self._execute_helper(processor_manager)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in
_execute_helper
(State.SCHEDULED,))
File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in
wrapper
result = func(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in
_execute_task_instances
open_slots = pools[pool].open_slots(session=session)
KeyError: u'a_non_existant_pool'
> Assigning a task to a pool that doesn't exist crashes the scheduler
> -------------------------------------------------------------------
>
> Key: AIRFLOW-1157
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1157
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: Airflow 1.8
> Reporter: John Culver
> Priority: Critical
>
> If a dag is run that contains a task using a pool that doesn't exist, the
> scheduler will crash.
> Manually triggering the run of this dag on an environment without a pool
> named 'a_non_existent_pool' will crash the scheduler:
> {code}
> from datetime import datetime
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> dag = DAG(dag_id='crash_scheduler',
> start_date=datetime(2017,1,1),
> schedule_interval=None)
> t1 = DummyOperator(task_id='crash',
> pool='a_non_existent_pool',
> dag=dag)
> {code}
> Here is the relevant log output on the scheduler:
> {noformat}
> [2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for
> /opt/airflow/dags/test-3.py finished
> [2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for
> /opt/airflow/dags/test_s3_file_move.py finished
> [2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process
> (PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py -
> logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
> [2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process
> (PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py
> - logging into
> /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
> [2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
> <TaskInstance: move_s3_file_test.move_files 2017-04-27
> 19:31:22.298893 [scheduled]>
> [2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in
> Pool(name=None) with 128 open slots and 1 task instances in queue
> [2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has
> 0/16 running tasks
> [2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor
> (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
> 22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of
> (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
> 22, 298893)) to queued
> [2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue:
> airflow run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local
> -sd /opt/airflow/dags/test_s3_file_move.py
> [2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
> [2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing
> (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31,
> 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process
> manager
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for
> /opt/airflow/dags/crash_scheduler.py finished
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for
> /opt/airflow/dags/configuration/constants.py finished
> [2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process
> (PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py -
> logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
> [2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process
> (PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into
> /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
> [2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
> <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542
> [scheduled]>
> [2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
> [2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for
> processes to exit...
> Traceback (most recent call last):
> File "/usr/bin/airflow", line 28, in <module>
> args.func(args)
> File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in
> scheduler
> job.run()
> File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
> self._execute()
> File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in
> _execute
> self._execute_helper(processor_manager)
> File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in
> _execute_helper
> (State.SCHEDULED,))
> File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in
> wrapper
> result = func(*args, **kwargs)
> File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in
> _execute_task_instances
> open_slots = pools[pool].open_slots(session=session)
> KeyError: u'a_non_existant_pool'
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)