This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 09ba9c406e835178bdb4d4e0c8826597c5434e20 Author: Kamil BreguĊa <kamil.breg...@polidea.com> AuthorDate: Mon Jun 29 20:38:26 2020 +0200 Reload gunicorn when plugins has beeen changed (#8997) (cherry picked from commit 1c48ffbe25c3e304660b7e75a49e88bd114dde46) --- airflow/bin/cli.py | 350 +++++++++++++++++++-------- airflow/config_templates/config.yml | 8 + airflow/config_templates/default_airflow.cfg | 4 + tests/cli/test_cli.py | 233 +++++++++++++++--- 4 files changed, 452 insertions(+), 143 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 8b5522c..b81c7b1 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -20,6 +20,7 @@ from __future__ import print_function import errno +import hashlib import importlib import locale import logging @@ -777,31 +778,11 @@ def clear(args): ) -def get_num_ready_workers_running(gunicorn_master_proc): - workers = psutil.Process(gunicorn_master_proc.pid).children() - - def ready_prefix_on_cmdline(proc): - try: - cmdline = proc.cmdline() - if len(cmdline) > 0: - return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0] - except psutil.NoSuchProcess: - pass - return False - - ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)] - return len(ready_workers) - - -def get_num_workers_running(gunicorn_master_proc): - workers = psutil.Process(gunicorn_master_proc.pid).children() - return len(workers) - - -def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout): +class GunicornMonitor(LoggingMixin): """ Runs forever, monitoring the child processes of @gunicorn_master_proc and - restarting workers occasionally. + restarting workers occasionally or when files in the plug-in directory + has been modified. Each iteration of the loop traverses one edge of this state transition diagram, where each state (node) represents [ num_ready_workers_running / num_workers_running ]. We expect most time to @@ -818,92 +799,245 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout): master process, which increases and decreases the number of child workers respectively. Gunicorn guarantees that on TTOU workers are terminated gracefully and that the oldest worker is terminated. + + :param gunicorn_master_proc: handle for the main Gunicorn process + :param num_workers_expected: Number of workers to run the Gunicorn web server + :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that + doesn't respond + :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers. + :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker + refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by + bringing up new ones and killing old ones. + :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory. + When it detects changes, then reload the gunicorn. """ + def __init__( + self, + gunicorn_master_proc, + num_workers_expected, + master_timeout, + worker_refresh_interval, + worker_refresh_batch_size, + reload_on_plugin_change + ): + super(GunicornMonitor, self).__init__() + self.gunicorn_master_proc = gunicorn_master_proc + self.num_workers_expected = num_workers_expected + self.master_timeout = master_timeout + self.worker_refresh_interval = worker_refresh_interval + self.worker_refresh_batch_size = worker_refresh_batch_size + self.reload_on_plugin_change = reload_on_plugin_change + + self._num_workers_running = 0 + self._num_ready_workers_running = 0 + self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None + self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None + self._restart_on_next_plugin_check = False + + def _generate_plugin_state(self): + """ + Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER + directory. + """ + if not settings.PLUGINS_FOLDER: + return {} + + all_filenames = [] + for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER): + all_filenames.extend(os.path.join(root, f) for f in filenames) + plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)} + return plugin_state + + @staticmethod + def _get_file_hash(fname): + """Calculate MD5 hash for file""" + hash_md5 = hashlib.md5() + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def _get_num_ready_workers_running(self): + """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name""" + workers = psutil.Process(self.gunicorn_master_proc.pid).children() + + def ready_prefix_on_cmdline(proc): + try: + cmdline = proc.cmdline() + if len(cmdline) > 0: # pylint: disable=len-as-condition + return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0] + except psutil.NoSuchProcess: + pass + return False + + ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)] + return len(ready_workers) - def wait_until_true(fn, timeout=0): + def _get_num_workers_running(self): + """Returns number of running Gunicorn workers processes""" + workers = psutil.Process(self.gunicorn_master_proc.pid).children() + return len(workers) + + def _wait_until_true(self, fn, timeout=0): """ Sleeps until fn is true """ - t = time.time() + start_time = time.time() while not fn(): - if 0 < timeout and timeout <= time.time() - t: + if 0 < timeout <= time.time() - start_time: raise AirflowWebServerTimeout( - "No response from gunicorn master within {0} seconds" - .format(timeout)) + "No response from gunicorn master within {0} seconds".format(timeout) + ) time.sleep(0.1) - def start_refresh(gunicorn_master_proc): - batch_size = conf.getint('webserver', 'worker_refresh_batch_size') - log.debug('%s doing a refresh of %s workers', state, batch_size) - sys.stdout.flush() - sys.stderr.flush() - + def _spawn_new_workers(self, count): + """ + Send signal to kill the worker. + :param count: The number of workers to spawn + """ excess = 0 - for _ in range(batch_size): - gunicorn_master_proc.send_signal(signal.SIGTTIN) + for _ in range(count): + # TTIN: Increment the number of processes by one + self.gunicorn_master_proc.send_signal(signal.SIGTTIN) excess += 1 - wait_until_true(lambda: num_workers_expected + excess == - get_num_workers_running(gunicorn_master_proc), - master_timeout) + self._wait_until_true( + lambda: self.num_workers_expected + excess == self._get_num_workers_running(), + timeout=self.master_timeout + ) - try: - wait_until_true(lambda: num_workers_expected == - get_num_workers_running(gunicorn_master_proc), - master_timeout) - while True: - num_workers_running = get_num_workers_running(gunicorn_master_proc) - num_ready_workers_running = \ - get_num_ready_workers_running(gunicorn_master_proc) - - state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running) - - # Whenever some workers are not ready, wait until all workers are ready - if num_ready_workers_running < num_workers_running: - log.debug('%s some workers are starting up, waiting...', state) - sys.stdout.flush() + def _kill_old_workers(self, count): + """ + Send signal to kill the worker. + :param count: The number of workers to kill + """ + for _ in range(count): + count -= 1 + # TTOU: Decrement the number of processes by one + self.gunicorn_master_proc.send_signal(signal.SIGTTOU) + self._wait_until_true( + lambda: self.num_workers_expected + count == self._get_num_workers_running(), + timeout=self.master_timeout) + + def _reload_gunicorn(self): + """ + Send signal to reload the gunciron configuration. When gunciorn receive signals, it reload the + configuration, start the new worker processes with a new configuration and gracefully + shutdown older workers. + """ + # HUP: Reload the configuration. + self.gunicorn_master_proc.send_signal(signal.SIGHUP) + time.sleep(1) + self._wait_until_true( + lambda: self.num_workers_expected == self._get_num_workers_running(), + timeout=self.master_timeout + ) + + def start(self): + """ + Starts monitoring the webserver. + """ + try: # pylint: disable=too-many-nested-blocks + self._wait_until_true( + lambda: self.num_workers_expected == self._get_num_workers_running(), + timeout=self.master_timeout + ) + while True: + if self.gunicorn_master_proc.poll() is not None: + sys.exit(self.gunicorn_master_proc.returncode) + self._check_workers() + # Throttle loop time.sleep(1) - # Kill a worker gracefully by asking gunicorn to reduce number of workers - elif num_workers_running > num_workers_expected: - excess = num_workers_running - num_workers_expected - log.debug('%s killing %s workers', state, excess) - - for _ in range(excess): - gunicorn_master_proc.send_signal(signal.SIGTTOU) - excess -= 1 - wait_until_true(lambda: num_workers_expected + excess == - get_num_workers_running(gunicorn_master_proc), - master_timeout) - - # Start a new worker by asking gunicorn to increase number of workers - elif num_workers_running == num_workers_expected: - refresh_interval = conf.getint('webserver', 'worker_refresh_interval') - log.debug( - '%s sleeping for %ss starting doing a refresh...', - state, refresh_interval + except (AirflowWebServerTimeout, OSError) as err: + self.log.error(err) + self.log.error("Shutting down webserver") + try: + self.gunicorn_master_proc.terminate() + self.gunicorn_master_proc.wait() + finally: + sys.exit(1) + + def _check_workers(self): + num_workers_running = self._get_num_workers_running() + num_ready_workers_running = self._get_num_ready_workers_running() + + # Whenever some workers are not ready, wait until all workers are ready + if num_ready_workers_running < num_workers_running: + self.log.debug( + '[%d / %d] Some workers are starting up, waiting...', + num_ready_workers_running, num_workers_running + ) + time.sleep(1) + return + + # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce + # number of workers + if num_workers_running > self.num_workers_expected: + excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size) + self.log.debug( + '[%d / %d] Killing %s workers', num_ready_workers_running, num_workers_running, excess + ) + self._kill_old_workers(excess) + return + + # If there are too few workers, start a new worker by asking gunicorn + # to increase number of workers + if num_workers_running < self.num_workers_expected: + self.log.error( + "[%d / %d] Some workers seem to have died and gunicorn did not restart " + "them as expected", + num_ready_workers_running, num_workers_running + ) + time.sleep(10) + num_workers_running = self._get_num_workers_running() + if num_workers_running < self.num_workers_expected: + new_worker_count = min( + num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size + ) + self.log.debug( + '[%d / %d] Spawning %d workers', + num_ready_workers_running, num_workers_running, new_worker_count + ) + self._spawn_new_workers(num_workers_running) + return + + # Now the number of running and expected worker should be equal + + # If workers should be restarted periodically. + if self.worker_refresh_interval > 0 and self._last_refresh_time: + # and we refreshed the workers a long time ago, refresh the workers + last_refresh_diff = (time.time() - self._last_refresh_time) + if self.worker_refresh_interval < last_refresh_diff: + num_new_workers = self.worker_refresh_batch_size + self.log.debug( + '[%d / %d] Starting doing a refresh. Starting %d workers.', + num_ready_workers_running, num_workers_running, num_new_workers ) - time.sleep(refresh_interval) - start_refresh(gunicorn_master_proc) + self._spawn_new_workers(num_new_workers) + self._last_refresh_time = time.time() + return - else: - # num_ready_workers_running == num_workers_running < num_workers_expected - log.error(( - "%s some workers seem to have died and gunicorn" - "did not restart them as expected" - ), state) - time.sleep(10) - if len( - psutil.Process(gunicorn_master_proc.pid).children() - ) < num_workers_expected: - start_refresh(gunicorn_master_proc) - except (AirflowWebServerTimeout, OSError) as err: - log.error(err) - log.error("Shutting down webserver") - try: - gunicorn_master_proc.terminate() - gunicorn_master_proc.wait() - finally: - sys.exit(1) + # if we should check the directory with the plugin, + if self.reload_on_plugin_change: + # compare the previous and current contents of the directory + new_state = self._generate_plugin_state() + # If changed, wait until its content is fully saved. + if new_state != self._last_plugin_state: + self.log.debug( + '[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the ' + 'plugin directory is checked, if there is no change in it.', + num_ready_workers_running, num_workers_running + ) + self._restart_on_next_plugin_check = True + self._last_plugin_state = new_state + elif self._restart_on_next_plugin_check: + self.log.debug( + '[%d / %d] Starts reloading the gunicorn configuration.', + num_ready_workers_running, num_workers_running + ) + self._restart_on_next_plugin_check = False + self._last_refresh_time = time.time() + self._reload_gunicorn() @cli_utils.action_logging @@ -962,13 +1096,13 @@ def webserver(args): run_args = [ 'gunicorn', - '-w', str(num_workers), - '-k', str(args.workerclass), - '-t', str(worker_timeout), - '-b', args.hostname + ':' + str(args.port), - '-n', 'airflow-webserver', - '-p', str(pid), - '-c', 'python:airflow.www.gunicorn_config', + '--workers', str(num_workers), + '--worker-class', str(args.workerclass), + '--timeout', str(worker_timeout), + '--bind', args.hostname + ':' + str(args.port), + '--name', 'airflow-webserver', + '--pid', str(pid), + '--config', 'python:airflow.www.gunicorn_config', ] if args.access_logfile: @@ -978,7 +1112,7 @@ def webserver(args): run_args += ['--error-logfile', str(args.error_logfile)] if args.daemon: - run_args += ['-D'] + run_args += ['--deamon'] if ssl_cert: run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key] @@ -995,12 +1129,14 @@ def webserver(args): def monitor_gunicorn(gunicorn_master_proc): # These run forever until SIG{INT, TERM, KILL, ...} signal is sent - if conf.getint('webserver', 'worker_refresh_interval') > 0: - master_timeout = conf.getint('webserver', 'web_server_master_timeout') - restart_workers(gunicorn_master_proc, num_workers, master_timeout) - else: - while True: - time.sleep(1) + GunicornMonitor( + gunicorn_master_proc=gunicorn_master_proc, + num_workers_expected=num_workers, + master_timeout=conf.getint('webserver', 'web_server_master_timeout'), + worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10), + worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1), + reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1), + ).start() if args.daemon: base, ext = os.path.splitext(pid) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index e32b8cc..1f697b1 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -699,6 +699,14 @@ type: string example: ~ default: "30" + - name: reload_on_plugin_change + description: | + If set to True, Airflow will track files in plugins_follder directory. When it detects changes, + then reload the gunicorn. + version_added: ~ + type: boolean + example: ~ + default: False - name: secret_key description: | Secret key used to run your flask app diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c75d3ae..cfc92ad 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -341,6 +341,10 @@ worker_refresh_batch_size = 1 # Number of seconds to wait before refreshing a batch of workers. worker_refresh_interval = 30 +# If set to True, Airflow will track files in plugins_follder directory. When it detects changes, +# then reload the gunicorn. +reload_on_plugin_change = + # Secret key used to run your flask app # It should be as random as possible secret_key = temporary_key diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index a2a81ac..5748057 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. import contextlib +import errno import io import logging @@ -28,8 +29,8 @@ from six import StringIO, PY2 import sys from datetime import datetime, timedelta, time -from mock import patch, Mock, MagicMock -from time import sleep +from mock import patch, MagicMock +from time import sleep, time as timetime import psutil import pytz import subprocess @@ -37,9 +38,10 @@ import pytest from argparse import Namespace from airflow import settings import airflow.bin.cli as cli -from airflow.bin.cli import get_num_ready_workers_running, run, get_dag +from airflow.bin.cli import run, get_dag from airflow.models import TaskInstance from airflow.utils import timezone +from airflow.utils.file import TemporaryDirectory from airflow.utils.state import State from airflow.settings import Session from airflow import models @@ -154,39 +156,6 @@ class TestCLI(unittest.TestCase): cls.dagbag = models.DagBag(include_examples=True) cls.parser = cli.CLIFactory.get_parser() - def setUp(self): - self.gunicorn_master_proc = Mock(pid=None) - self.children = MagicMock() - self.child = MagicMock() - self.process = MagicMock() - - def test_ready_prefix_on_cmdline(self): - self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX] - self.process.children.return_value = [self.child] - - with patch('psutil.Process', return_value=self.process): - self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 1) - - def test_ready_prefix_on_cmdline_no_children(self): - self.process.children.return_value = [] - - with patch('psutil.Process', return_value=self.process): - self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0) - - def test_ready_prefix_on_cmdline_zombie(self): - self.child.cmdline.return_value = [] - self.process.children.return_value = [self.child] - - with patch('psutil.Process', return_value=self.process): - self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0) - - def test_ready_prefix_on_cmdline_dead_process(self): - self.child.cmdline.side_effect = psutil.NoSuchProcess(11347) - self.process.children.return_value = [self.child] - - with patch('psutil.Process', return_value=self.process): - self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0) - def test_cli_webserver_debug(self): env = os.environ.copy() p = psutil.Popen(["airflow", "webserver", "-d"], env=env) @@ -847,3 +816,195 @@ class TestShowInfo(unittest.TestCase): self.assertIn("https://file.io/TEST", temp_stdout.getvalue()) content = mock_requests.post.call_args[1]["files"]["file"][1] self.assertIn("postgresql+psycopg2://p...s:PASSWORD@postgres/airflow", content) + + +class TestGunicornMonitor(unittest.TestCase): + + def setUp(self): + self.gunicorn_master_proc = mock.Mock(pid=2137) + self.monitor = cli.GunicornMonitor( + gunicorn_master_proc=self.gunicorn_master_proc, + num_workers_expected=4, + master_timeout=60, + worker_refresh_interval=60, + worker_refresh_batch_size=2, + reload_on_plugin_change=True, + ) + mock.patch.object(self.monitor, '_generate_plugin_state', return_value={}).start() + mock.patch.object(self.monitor, '_get_num_ready_workers_running', return_value=4).start() + mock.patch.object(self.monitor, '_get_num_workers_running', return_value=4).start() + mock.patch.object(self.monitor, '_spawn_new_workers', return_value=None).start() + mock.patch.object(self.monitor, '_kill_old_workers', return_value=None).start() + mock.patch.object(self.monitor, '_reload_gunicorn', return_value=None).start() + + @mock.patch('airflow.bin.cli.time.sleep') + def test_should_wait_for_workers_to_start(self, mock_sleep): + self.monitor._get_num_ready_workers_running.return_value = 0 + self.monitor._get_num_workers_running.return_value = 4 + self.monitor._check_workers() + self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member + self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member + self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member + + @mock.patch('airflow.bin.cli.time.sleep') + def test_should_kill_excess_workers(self, mock_sleep): + self.monitor._get_num_ready_workers_running.return_value = 10 + self.monitor._get_num_workers_running.return_value = 10 + self.monitor._check_workers() + self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member + self.monitor._kill_old_workers.assert_called_once_with(2) # pylint: disable=no-member + self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member + + @mock.patch('airflow.bin.cli.time.sleep') + def test_should_start_new_workers_when_missing(self, mock_sleep): + self.monitor._get_num_ready_workers_running.return_value = 2 + self.monitor._get_num_workers_running.return_value = 2 + self.monitor._check_workers() + self.monitor._spawn_new_workers.assert_called_once_with(2) # pylint: disable=no-member + self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member + self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member + + @mock.patch('airflow.bin.cli.time.sleep') + def test_should_start_new_workers_when_refresh_interval_has_passed(self, mock_sleep): + self.monitor._last_refresh_time -= 200 + self.monitor._check_workers() + self.monitor._spawn_new_workers.assert_called_once_with(2) # pylint: disable=no-member + self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member + self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member + self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5) + + @mock.patch('airflow.bin.cli.time.sleep') + def test_should_reload_when_plugin_has_been_changed(self, mock_sleep): + self.monitor._generate_plugin_state.return_value = {'AA': 12} + + self.monitor._check_workers() + + self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member + self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member + self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member + + self.monitor._generate_plugin_state.return_value = {'AA': 32} + + self.monitor._check_workers() + + self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member + self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member + self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member + + self.monitor._generate_plugin_state.return_value = {'AA': 32} + + self.monitor._check_workers() + + self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member + self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member + self.monitor._reload_gunicorn.assert_called_once_with() # pylint: disable=no-member + self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5) + + +class TestGunicornMonitorGeneratePluginState(unittest.TestCase): + @staticmethod + def _prepare_test_file(filepath, size): + try: + os.makedirs(os.path.dirname(filepath)) + except OSError as e: + # be happy if someone already created the path + if e.errno != errno.EEXIST: + raise + with open(filepath, "w") as file: + file.write("A" * size) + file.flush() + + def test_should_detect_changes_in_directory(self): + with TemporaryDirectory(prefix="tmp") as tempdir, \ + mock.patch("airflow.bin.cli.settings.PLUGINS_FOLDER", tempdir): + self._prepare_test_file("{}/file1.txt".format(tempdir), 100) + self._prepare_test_file("{}/nested/nested/nested/nested/file2.txt".format(tempdir), 200) + self._prepare_test_file("{}/file3.txt".format(tempdir), 300) + + monitor = cli.GunicornMonitor( + gunicorn_master_proc=mock.MagicMock(), + num_workers_expected=4, + master_timeout=60, + worker_refresh_interval=60, + worker_refresh_batch_size=2, + reload_on_plugin_change=True, + ) + + # When the files have not changed, the result should be constant + state_a = monitor._generate_plugin_state() + state_b = monitor._generate_plugin_state() + + self.assertEqual(state_a, state_b) + self.assertEqual(3, len(state_a)) + + # Should detect new file + self._prepare_test_file("{}/file4.txt".format(tempdir), 400) + + state_c = monitor._generate_plugin_state() + + self.assertNotEqual(state_b, state_c) + self.assertEqual(4, len(state_c)) + + # Should detect changes in files + self._prepare_test_file("{}/file4.txt".format(tempdir), 450) + + state_d = monitor._generate_plugin_state() + + self.assertNotEqual(state_c, state_d) + self.assertEqual(4, len(state_d)) + + # Should support large files + self._prepare_test_file("{}/file4.txt".format(tempdir), 4000000) + + state_d = monitor._generate_plugin_state() + + self.assertNotEqual(state_c, state_d) + self.assertEqual(4, len(state_d)) + + +class TestCLIGetNumReadyWorkersRunning(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.parser = cli.get_parser() + + def setUp(self): + self.gunicorn_master_proc = mock.Mock(pid=2137) + self.children = mock.MagicMock() + self.child = mock.MagicMock() + self.process = mock.MagicMock() + self.monitor = cli.GunicornMonitor( + gunicorn_master_proc=self.gunicorn_master_proc, + num_workers_expected=4, + master_timeout=60, + worker_refresh_interval=60, + worker_refresh_batch_size=2, + reload_on_plugin_change=True, + ) + + def test_ready_prefix_on_cmdline(self): + self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX] + self.process.children.return_value = [self.child] + + with mock.patch('psutil.Process', return_value=self.process): + self.assertEqual(self.monitor._get_num_ready_workers_running(), 1) + + def test_ready_prefix_on_cmdline_no_children(self): + self.process.children.return_value = [] + + with mock.patch('psutil.Process', return_value=self.process): + self.assertEqual(self.monitor._get_num_ready_workers_running(), 0) + + def test_ready_prefix_on_cmdline_zombie(self): + self.child.cmdline.return_value = [] + self.process.children.return_value = [self.child] + + with mock.patch('psutil.Process', return_value=self.process): + self.assertEqual(self.monitor._get_num_ready_workers_running(), 0) + + def test_ready_prefix_on_cmdline_dead_process(self): + self.child.cmdline.side_effect = psutil.NoSuchProcess(11347) + self.process.children.return_value = [self.child] + + with mock.patch('psutil.Process', return_value=self.process): + self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)