This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9216489d9a DagFileProcessorManager: Start a new process group only if 
current process not a session leader (#23872)
9216489d9a is described below

commit 9216489d9a25f56f7a55d032b0ebfc1bf0bf4a83
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri May 27 17:29:11 2022 +0300

    DagFileProcessorManager: Start a new process group only if current process 
not a session leader (#23872)
---
 airflow/dag_processing/manager.py |  9 ++++++---
 airflow/utils/process_utils.py    | 15 +++++++++++++++
 tests/utils/test_process_utils.py | 19 +++++++++++++++++++
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 3dc72caa99..a262cf1d42 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -49,7 +49,11 @@ from airflow.utils.file import list_py_file_paths, 
might_contain_dag
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.mixins import MultiprocessingStartMethodMixin
 from airflow.utils.net import get_hostname
-from airflow.utils.process_utils import kill_child_processes_by_pids, 
reap_process_group
+from airflow.utils.process_utils import (
+    kill_child_processes_by_pids,
+    reap_process_group,
+    set_new_process_group,
+)
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, 
with_row_locks
 
@@ -471,8 +475,7 @@ class DagFileProcessorManager(LoggingMixin):
         """
         self.register_exit_signals()
 
-        # Start a new process group
-        os.setpgid(0, 0)
+        set_new_process_group()
 
         self.log.info("Processing files using up to %s processes at a time ", 
self._parallelism)
         self.log.info("Process each file at most once every %s seconds", 
self._file_process_interval)
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index ca8fc2433e..1cbb8e8b6c 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -311,3 +311,18 @@ def check_if_pidfile_process_is_running(pid_file: str, 
process_name: str):
         except psutil.NoSuchProcess:
             # If process is dead remove the pidfile
             pid_lock_file.break_lock()
+
+
+def set_new_process_group() -> None:
+    """
+    Tries to set current process to a new process group
+    That makes it easy to kill all sub-process of this at the OS-level,
+    rather than having to iterate the child processes.
+    If current process spawn by system call ``exec()`` than keep current 
process group
+    """
+
+    if os.getpid() == os.getsid(0):
+        # If PID = SID than process a session leader, and it is not possible 
to change process group
+        return
+
+    os.setpgid(0, 0)
diff --git a/tests/utils/test_process_utils.py 
b/tests/utils/test_process_utils.py
index ab76a05172..3154fa8805 100644
--- a/tests/utils/test_process_utils.py
+++ b/tests/utils/test_process_utils.py
@@ -39,6 +39,7 @@ from airflow.utils.process_utils import (
     check_if_pidfile_process_is_running,
     execute_in_subprocess,
     execute_in_subprocess_with_kwargs,
+    set_new_process_group,
 )
 
 
@@ -220,3 +221,21 @@ class 
TestCheckIfPidfileProcessIsRunning(unittest.TestCase):
             f.flush()
             with pytest.raises(AirflowException, match="is already running 
under PID"):
                 check_if_pidfile_process_is_running(f.name, 
process_name="test")
+
+
+class TestSetNewProcessGroup(unittest.TestCase):
+    @mock.patch("os.setpgid")
+    def test_not_session_leader(self, mock_set_pid):
+        pid = os.getpid()
+        with mock.patch('os.getsid', autospec=True) as mock_get_sid:
+            mock_get_sid.return_value = pid + 1
+            set_new_process_group()
+            assert mock_set_pid.call_count == 1
+
+    @mock.patch("os.setpgid")
+    def test_session_leader(self, mock_set_pid):
+        pid = os.getpid()
+        with mock.patch('os.getsid', autospec=True) as mock_get_sid:
+            mock_get_sid.return_value = pid
+            set_new_process_group()
+            assert mock_set_pid.call_count == 0

Reply via email to