This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9216489d9a DagFileProcessorManager: Start a new process group only if
current process not a session leader (#23872)
9216489d9a is described below
commit 9216489d9a25f56f7a55d032b0ebfc1bf0bf4a83
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri May 27 17:29:11 2022 +0300
DagFileProcessorManager: Start a new process group only if current process
not a session leader (#23872)
---
airflow/dag_processing/manager.py | 9 ++++++---
airflow/utils/process_utils.py | 15 +++++++++++++++
tests/utils/test_process_utils.py | 19 +++++++++++++++++++
3 files changed, 40 insertions(+), 3 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 3dc72caa99..a262cf1d42 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -49,7 +49,11 @@ from airflow.utils.file import list_py_file_paths,
might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
-from airflow.utils.process_utils import kill_child_processes_by_pids,
reap_process_group
+from airflow.utils.process_utils import (
+ kill_child_processes_by_pids,
+ reap_process_group,
+ set_new_process_group,
+)
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked,
with_row_locks
@@ -471,8 +475,7 @@ class DagFileProcessorManager(LoggingMixin):
"""
self.register_exit_signals()
- # Start a new process group
- os.setpgid(0, 0)
+ set_new_process_group()
self.log.info("Processing files using up to %s processes at a time ",
self._parallelism)
self.log.info("Process each file at most once every %s seconds",
self._file_process_interval)
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index ca8fc2433e..1cbb8e8b6c 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -311,3 +311,18 @@ def check_if_pidfile_process_is_running(pid_file: str,
process_name: str):
except psutil.NoSuchProcess:
# If process is dead remove the pidfile
pid_lock_file.break_lock()
+
+
+def set_new_process_group() -> None:
+ """
+ Tries to set current process to a new process group
+ That makes it easy to kill all sub-process of this at the OS-level,
+ rather than having to iterate the child processes.
+ If current process spawn by system call ``exec()`` than keep current
process group
+ """
+
+ if os.getpid() == os.getsid(0):
+ # If PID = SID than process a session leader, and it is not possible
to change process group
+ return
+
+ os.setpgid(0, 0)
diff --git a/tests/utils/test_process_utils.py
b/tests/utils/test_process_utils.py
index ab76a05172..3154fa8805 100644
--- a/tests/utils/test_process_utils.py
+++ b/tests/utils/test_process_utils.py
@@ -39,6 +39,7 @@ from airflow.utils.process_utils import (
check_if_pidfile_process_is_running,
execute_in_subprocess,
execute_in_subprocess_with_kwargs,
+ set_new_process_group,
)
@@ -220,3 +221,21 @@ class
TestCheckIfPidfileProcessIsRunning(unittest.TestCase):
f.flush()
with pytest.raises(AirflowException, match="is already running
under PID"):
check_if_pidfile_process_is_running(f.name,
process_name="test")
+
+
+class TestSetNewProcessGroup(unittest.TestCase):
+ @mock.patch("os.setpgid")
+ def test_not_session_leader(self, mock_set_pid):
+ pid = os.getpid()
+ with mock.patch('os.getsid', autospec=True) as mock_get_sid:
+ mock_get_sid.return_value = pid + 1
+ set_new_process_group()
+ assert mock_set_pid.call_count == 1
+
+ @mock.patch("os.setpgid")
+ def test_session_leader(self, mock_set_pid):
+ pid = os.getpid()
+ with mock.patch('os.getsid', autospec=True) as mock_get_sid:
+ mock_get_sid.return_value = pid
+ set_new_process_group()
+ assert mock_set_pid.call_count == 0