Repository: incubator-airflow Updated Branches: refs/heads/master 3e6babe8e -> be54f0485
[AIRFLOW-1885] Fix IndexError in ready_prefix_on_cmdline If while trying to obtain a list of ready gunicorn workers, one of them becomes a zombie, psutil.cmdline returns [] (see here: https://github.com/giampaolo/psutil/blob/release-4 .2.0/psutil/_pslinux.py#L1007) Boom: Traceback (most recent call last): File "/usr/local/bin/airflow", line 28, in <module> args.func(args) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 803, in webserver restart_workers(gunicorn_master_proc, num_workers) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 687, in restart_workers num_ready_workers_running = get_num_ready_workers_ running(gunicorn_master_proc) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 663, in get_num_ready_workers_running proc for proc in workers File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 664, in <listcomp> if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0] IndexError: list index out of range So ensure a cmdline is actually returned before doing the cmdline prefix check in ready_prefix_on_cmdline. Also: * Treat psutil.NoSuchProcess error as non ready worker * Add in tests for get_num_ready_workers_running Closes #2844 from j16r/bugfix/poll_zombie_process Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/be54f048 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/be54f048 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/be54f048 Branch: refs/heads/master Commit: be54f0485eb0ec52b3147bea057b399565601e10 Parents: 3e6babe Author: John Barker <jebar...@gmail.com> Authored: Tue Dec 12 12:49:06 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Tue Dec 12 12:49:06 2017 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 24 ++++++++++++------- tests/cli/__init__.py | 13 ++++++++++ tests/cli/test_cli.py | 59 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/be54f048/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 1367362..812977b 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -553,6 +553,22 @@ def clear(args): include_subdags=not args.exclude_subdags) +def get_num_ready_workers_running(gunicorn_master_proc): + workers = psutil.Process(gunicorn_master_proc.pid).children() + + def ready_prefix_on_cmdline(proc): + try: + cmdline = proc.cmdline() + if len(cmdline) > 0: + return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0] + except psutil.NoSuchProcess: + pass + return False + + ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)] + return len(ready_workers) + + def restart_workers(gunicorn_master_proc, num_workers_expected): """ Runs forever, monitoring the child processes of @gunicorn_master_proc and @@ -590,14 +606,6 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): workers = psutil.Process(gunicorn_master_proc.pid).children() return len(workers) - def get_num_ready_workers_running(gunicorn_master_proc): - workers = psutil.Process(gunicorn_master_proc.pid).children() - ready_workers = [ - proc for proc in workers - if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0] - ] - return len(ready_workers) - def start_refresh(gunicorn_master_proc): batch_size = conf.getint('webserver', 'worker_refresh_batch_size') log.debug('%s doing a refresh of %s workers', state, batch_size) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/be54f048/tests/cli/__init__.py ---------------------------------------------------------------------- diff --git a/tests/cli/__init__.py b/tests/cli/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/tests/cli/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/be54f048/tests/cli/test_cli.py ---------------------------------------------------------------------- diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py new file mode 100644 index 0000000..6db29cf --- /dev/null +++ b/tests/cli/test_cli.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from mock import patch, Mock, MagicMock + +import psutil + +from airflow import settings +from airflow.bin.cli import get_num_ready_workers_running + + +class TestCLI(unittest.TestCase): + + def setUp(self): + self.gunicorn_master_proc = Mock(pid=None) + self.children = MagicMock() + self.child = MagicMock() + self.process = MagicMock() + + def test_ready_prefix_on_cmdline(self): + self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX] + self.process.children.return_value = [self.child] + + with patch('psutil.Process', return_value=self.process): + self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 1) + + def test_ready_prefix_on_cmdline_no_children(self): + self.process.children.return_value = [] + + with patch('psutil.Process', return_value=self.process): + self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0) + + def test_ready_prefix_on_cmdline_zombie(self): + self.child.cmdline.return_value = [] + self.process.children.return_value = [self.child] + + with patch('psutil.Process', return_value=self.process): + self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0) + + def test_ready_prefix_on_cmdline_dead_process(self): + self.child.cmdline.side_effect = psutil.NoSuchProcess(11347) + self.process.children.return_value = [self.child] + + with patch('psutil.Process', return_value=self.process): + self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)