This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1b958ca85dc9abafb0ad687c0a32992f59ae6186
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Dec 3 00:44:59 2021 +0100

    Fix race condition when starting DagProcessorAgent (#19935)
    
    As described in detail in #19860, there was a race condition in
    starting and terminating DagProcessorAgent that caused us a lot
    of headeaches with flaky test_scheduler_job failures on our CI
    and after long investigation, it turned out to be a race
    condition. Not very likely, but possible to happen in production.
    
    The race condition involved starting DagProcessorAgent via
    multiprocessing, where the first action of the agent was changing
    the process GID to be the same as PID. If the DagProcessorAgent
    was terminated quickly (on a busy system) before the process
    could change the GID, the `reap_process_group` that was supposed
    to kill the whole group, was failing and the DagProcessorAgent
    remained running.
    
    This problem revealed a wrong behaviour of Airflow in some edge
    conditions when 'spawn' mode was used for starting the DAG processor
    Details are described in #19934, but this problem will have to be
    solved differently (avoiding ORM reinitialization during DAG
    processor starting).
    
    This change also moves the tests for `spawn` method out from
    test_scheduler_job.py (it was a remnant of old Airlfow and it
    did not really test what it was supposed to test). Instead tests
    were added for different spawn modes and killing the processor
    agent in both spawn and "default" mode.
    
    (cherry picked from commit 525484388464619832a14d1b28e06e3a097aac97)
---
 airflow/dag_processing/manager.py      | 10 +++++++
 airflow/utils/process_utils.py         | 55 ++++++++++++++++++++++++----------
 tests/dag_processing/test_processor.py | 50 +++++++++++++++++++++++++++++++
 3 files changed, 100 insertions(+), 15 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 19a97ff..00bffc5 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -260,6 +260,16 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
         os.environ['AIRFLOW__LOGGING__COLORED_CONSOLE_LOG'] = 'False'
         # Replicating the behavior of how logging module was loaded
         # in logging_config.py
+
+        # TODO: This reloading should be removed when we fix our logging 
behaviour
+        # In case of "spawn" method of starting processes for multiprocessing, 
reinitializing of the
+        # SQLAlchemy engine causes extremely unexpected behaviour of messing 
with objects already loaded
+        # in a parent process (likely via resources shared in memory by the 
ORM libraries).
+        # This caused flaky tests in our CI for many months and has been 
discovered while
+        # iterating on https://github.com/apache/airflow/pull/19860
+        # The issue that describes the problem and possible remediation is
+        # at https://github.com/apache/airflow/issues/19934
+
         
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 
1)[0]))  # type: ignore
         importlib.reload(airflow.settings)
         airflow.settings.initialize()
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 3c03917..3b12115 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -45,7 +45,7 @@ DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint('core', 
'KILLED_TASK_CLEANUP_TI
 
 
 def reap_process_group(
-    pgid: int,
+    process_group_id: int,
     logger,
     sig: 'signal.Signals' = signal.SIGTERM,
     timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
@@ -55,7 +55,11 @@ def reap_process_group(
     sig (SIGTERM) to the process group of pid. If any process is alive after 
timeout
     a SIGKILL will be send.
 
-    :param pgid: process group id to kill
+    :param process_group_id: process group id to kill.
+           The process that wants to create the group should run 
`os.setpgid(0, 0)` as the first
+           command it executes which will set group id = process_id. 
Effectively the process that is the
+           "root" of the group has pid = gid and all other processes in the 
group have different
+           pids but the same gid (equal the pid of the root process)
     :param logger: log handler
     :param sig: signal type
     :param timeout: how much time a process has to terminate
@@ -68,36 +72,57 @@ def reap_process_group(
 
     def signal_procs(sig):
         try:
-            os.killpg(pgid, sig)
-        except OSError as err:
+            logger.info("Sending the signal %s to group %s", sig, 
process_group_id)
+            os.killpg(process_group_id, sig)
+        except OSError as err_killpg:
             # If operation not permitted error is thrown due to run_as_user,
             # use sudo -n(--non-interactive) to kill the process
-            if err.errno == errno.EPERM:
+            if err_killpg.errno == errno.EPERM:
                 subprocess.check_call(
-                    ["sudo", "-n", "kill", "-" + str(int(sig))] + [str(p.pid) 
for p in children]
+                    ["sudo", "-n", "kill", "-" + str(int(sig))]
+                    + [str(p.pid) for p in all_processes_in_the_group]
+                )
+            elif err_killpg.errno == errno.ESRCH:
+                # There is a rare condition that the process has not managed 
yet to change it's process
+                # group. In this case os.killpg fails with ESRCH error
+                # So we additionally send a kill signal to the process itself.
+                logger.info(
+                    "Sending the signal %s to process %s as process group is 
missing.", sig, process_group_id
                 )
+                try:
+                    os.kill(process_group_id, sig)
+                except OSError as err_kill:
+                    if err_kill.errno == errno.EPERM:
+                        subprocess.check_call(["sudo", "-n", "kill", "-" + 
str(process_group_id)])
+                    else:
+                        raise
             else:
                 raise
 
-    if pgid == os.getpgid(0):
+    if process_group_id == os.getpgid(0):
         raise RuntimeError("I refuse to kill myself")
 
     try:
-        parent = psutil.Process(pgid)
+        parent = psutil.Process(process_group_id)
 
-        children = parent.children(recursive=True)
-        children.append(parent)
+        all_processes_in_the_group = parent.children(recursive=True)
+        all_processes_in_the_group.append(parent)
     except psutil.NoSuchProcess:
         # The process already exited, but maybe it's children haven't.
-        children = []
+        all_processes_in_the_group = []
         for proc in psutil.process_iter():
             try:
-                if os.getpgid(proc.pid) == pgid and proc.pid != 0:
-                    children.append(proc)
+                if os.getpgid(proc.pid) == process_group_id and proc.pid != 0:
+                    all_processes_in_the_group.append(proc)
             except OSError:
                 pass
 
-    logger.info("Sending %s to GPID %s", sig, pgid)
+    logger.info(
+        "Sending %s to group %s. PIDs of all processes in the group: %s",
+        sig,
+        process_group_id,
+        [p.pid for p in all_processes_in_the_group],
+    )
     try:
         signal_procs(sig)
     except OSError as err:
@@ -106,7 +131,7 @@ def reap_process_group(
         if err.errno == errno.ESRCH:
             return returncodes
 
-    _, alive = psutil.wait_procs(children, timeout=timeout, 
callback=on_terminate)
+    _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, 
callback=on_terminate)
 
     if alive:
         for proc in alive:
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index 6bfec8f..c9ecfb0 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -27,6 +27,7 @@ import pytest
 
 from airflow import settings
 from airflow.configuration import TEST_DAGS_FOLDER, conf
+from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessor
 from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors
 from airflow.models.taskinstance import SimpleTaskInstance
@@ -723,3 +724,52 @@ class TestDagFileProcessor:
         dags = session.query(DagModel).all()
         assert [dag.dag_id for dag in dags if dag.is_active] == 
['test_only_dummy_tasks']
         assert [dag.dag_id for dag in dags if not dag.is_active] == 
['missing_dag']
+
+
+class TestProcessorAgent:
+    @pytest.fixture(autouse=True)
+    def per_test(self):
+        self.processor_agent = None
+        yield
+        if self.processor_agent:
+            self.processor_agent.end()
+
+    def test_error_when_waiting_in_async_mode(self, tmp_path):
+        self.processor_agent = DagFileProcessorAgent(
+            dag_directory=str(tmp_path),
+            max_runs=1,
+            processor_timeout=datetime.timedelta(1),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+        self.processor_agent.start()
+        with pytest.raises(RuntimeError, match="wait_until_finished should 
only be called in sync_mode"):
+            self.processor_agent.wait_until_finished()
+
+    def test_default_multiprocessing_behaviour(self, tmp_path):
+        self.processor_agent = DagFileProcessorAgent(
+            dag_directory=str(tmp_path),
+            max_runs=1,
+            processor_timeout=datetime.timedelta(1),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=False,
+        )
+        self.processor_agent.start()
+        self.processor_agent.run_single_parsing_loop()
+        self.processor_agent.wait_until_finished()
+
+    @conf_vars({("core", "mp_start_method"): "spawn"})
+    def test_spawn_multiprocessing_behaviour(self, tmp_path):
+        self.processor_agent = DagFileProcessorAgent(
+            dag_directory=str(tmp_path),
+            max_runs=1,
+            processor_timeout=datetime.timedelta(1),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=False,
+        )
+        self.processor_agent.start()
+        self.processor_agent.run_single_parsing_loop()
+        self.processor_agent.wait_until_finished()

Reply via email to