potiuk commented on a change in pull request #8997:
URL: https://github.com/apache/airflow/pull/8997#discussion_r447063973



##########
File path: airflow/cli/commands/webserver_command.py
##########
@@ -83,92 +63,247 @@ def restart_workers(gunicorn_master_proc, 
num_workers_expected, master_timeout):
     master process, which increases and decreases the number of child workers
     respectively. Gunicorn guarantees that on TTOU workers are terminated
     gracefully and that the oldest worker is terminated.
-    """
 
-    def wait_until_true(fn, timeout=0):
+    :param gunicorn_master_proc:  handle for the main Gunicorn process
+    :param num_workers_expected:  Number of workers to run the Gunicorn web 
server
+    :param master_timeout: Number of seconds the webserver waits before 
killing gunicorn master that
+        doesn't respond
+    :param worker_refresh_interval: Number of seconds to wait before 
refreshing a batch of workers.
+    :param worker_refresh_batch_size: Number of workers to refresh at a time. 
When set to 0, worker
+        refresh is disabled. When nonzero, airflow periodically refreshes 
webserver workers by
+        bringing up new ones and killing old ones.
+    :param reload_on_plugin_change: If set to True, Airflow will track files 
in plugins_follder directory.
+        When it detects changes, then reload the gunicorn.
+    """
+    def __init__(
+        self,
+        gunicorn_master_proc: psutil.Process,
+        num_workers_expected: int,
+        master_timeout: int,
+        worker_refresh_interval: int,
+        worker_refresh_batch_size: int,
+        reload_on_plugin_change: bool
+    ):
+        super(GunicornMonitor, self).__init__()
+        self.gunicorn_master_proc = gunicorn_master_proc
+        self.num_workers_expected = num_workers_expected
+        self.master_timeout = master_timeout
+        self.worker_refresh_interval = worker_refresh_interval
+        self.worker_refresh_batch_size = worker_refresh_batch_size
+        self.reload_on_plugin_change = reload_on_plugin_change
+
+        self._num_workers_running = 0
+        self._num_ready_workers_running = 0
+        self._last_refresh_time = time.time() if worker_refresh_interval > 0 
else None
+        self._last_plugin_state = self._generate_plugin_state() if 
reload_on_plugin_change else None
+        self._restart_on_next_plugin_check = False
+
+    def _generate_plugin_state(self) -> Dict[str, float]:
+        """
+        Generate dict of filenames and last modification time of all files in 
settings.PLUGINS_FOLDER
+        directory.
+        """
+        if not settings.PLUGINS_FOLDER:
+            return {}
+
+        all_filenames: List[str] = []
+        for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+            all_filenames.extend(os.path.join(root, f) for f in filenames)
+        plugin_state = {f: self._get_file_hash(f) for f in 
sorted(all_filenames)}

Review comment:
       It's not sorted. It returns entries in the sequence stored in the 
filesystem directory index, so it's usually in order of creation of the 
directories but when you delete a dir/recreate another they might reuse empty 
slot so for all purposes it's essentially random.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to