This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new abe01fa  Move setgid as the first command executed in forked task 
runner (#20040)
abe01fa is described below

commit abe01fad324c6b22620685de8b9cf384d8ab0b68
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Dec 4 23:38:33 2021 +0100

    Move setgid as the first command executed in forked task runner (#20040)
    
    The runner setgid command was executed after importing several airflow
    imports, which - when executed for the first time could take quite
    some time (possibly even few seconds). The setgid command should be
    done as soon as possible, in case of any errors in the import, it
    would fail and the setgid could be never set.
    
    Also this caused the test_start_and_terminate test to fail in CI
    because the imports could take arbitrary long time (depending on
    parallel tests and whether the imported modules were already
    loaded in the process so setting the gid could be set after more
    than 0.5 seconds.
    
    This change fixes it twofold:
    
    * setgid is moved to be first instruction to be executed (also
      signal handling was moved to before the potentially long
      imports)
    * the test was fixed to wait actively and only fail after the
      timeout of 1s (which should not happen before of the fix above)
    
    Additionally the test was using `task test` command rather than task run,
    and in some circumstances when you tried to run it locally,
    when FORK was disabled (MacOS) the same test could fail with
    a different error because --error-file flag is not defined for
    `task test` command but it is automatically added by the runner.
    
    The task command has been changed to `run'
    
    Fixing this tests caused occasional test_on_kill failure
    which suffered from similar problem and had similar sleep
    implemented.
    
    Thanks to that the test will be usually faster as no significant delays
    will be introduced.
---
 airflow/task/task_runner/standard_task_runner.py   | 10 +++----
 .../task/task_runner/test_standard_task_runner.py  | 31 +++++++++++++---------
 2 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/airflow/task/task_runner/standard_task_runner.py 
b/airflow/task/task_runner/standard_task_runner.py
index 3c54249..381cf5a 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -52,17 +52,17 @@ class StandardTaskRunner(BaseTaskRunner):
             self.log.info("Started process %d to run task", pid)
             return psutil.Process(pid)
         else:
+            # Start a new process group
+            os.setpgid(0, 0)
             import signal
 
+            signal.signal(signal.SIGINT, signal.SIG_DFL)
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
             from airflow import settings
             from airflow.cli.cli_parser import get_parser
             from airflow.sentry import Sentry
 
-            signal.signal(signal.SIGINT, signal.SIG_DFL)
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            # Start a new process group
-            os.setpgid(0, 0)
-
             # Force a new SQLAlchemy session. We can't share open DB handles
             # between process. The cli code will re-create this as part of its
             # normal startup
diff --git a/tests/task/task_runner/test_standard_task_runner.py 
b/tests/task/task_runner/test_standard_task_runner.py
index 918f3c9..c2e9069 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -33,6 +33,7 @@ from airflow.utils import timezone
 from airflow.utils.platform import getuser
 from airflow.utils.session import create_session
 from airflow.utils.state import State
+from airflow.utils.timeout import timeout
 from tests.test_utils.db import clear_db_runs
 
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
@@ -79,7 +80,7 @@ class TestStandardTaskRunner:
         local_task_job.task_instance.command_as_list.return_value = [
             'airflow',
             'tasks',
-            'test',
+            'run',
             'test_on_kill',
             'task1',
             '2016-01-01',
@@ -87,14 +88,17 @@ class TestStandardTaskRunner:
 
         runner = StandardTaskRunner(local_task_job)
         runner.start()
-        time.sleep(0.5)
-
-        pgid = os.getpgid(runner.process.pid)
-        assert pgid > 0
-        assert pgid != os.getpgid(0), "Task should be in a different process 
group to us"
-
-        processes = list(self._procs_in_pgroup(pgid))
-
+        # Wait until process sets its pgid to be equal to pid
+        with timeout(seconds=1):
+            while True:
+                runner_pgid = os.getpgid(runner.process.pid)
+                if runner_pgid == runner.process.pid:
+                    break
+                time.sleep(0.01)
+
+        assert runner_pgid > 0
+        assert runner_pgid != os.getpgid(0), "Task should be in a different 
process group to us"
+        processes = list(self._procs_in_pgroup(runner_pgid))
         runner.terminate()
 
         for process in processes:
@@ -217,10 +221,11 @@ class TestStandardTaskRunner:
             session.close()  # explicitly close as `create_session`s commit 
will blow up otherwise
 
         # Wait some time for the result
-        for _ in range(20):
-            if os.path.exists(path):
-                break
-            time.sleep(2)
+        with timeout(seconds=40):
+            while True:
+                if os.path.exists(path):
+                    break
+                time.sleep(0.01)
 
         with open(path) as f:
             assert "ON_KILL_TEST" == f.readline()

Reply via email to