This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new abe01fa Move setgid as the first command executed in forked task
runner (#20040)
abe01fa is described below
commit abe01fad324c6b22620685de8b9cf384d8ab0b68
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Dec 4 23:38:33 2021 +0100
Move setgid as the first command executed in forked task runner (#20040)
The runner setgid command was executed after importing several airflow
imports, which - when executed for the first time could take quite
some time (possibly even few seconds). The setgid command should be
done as soon as possible, in case of any errors in the import, it
would fail and the setgid could be never set.
Also this caused the test_start_and_terminate test to fail in CI
because the imports could take arbitrary long time (depending on
parallel tests and whether the imported modules were already
loaded in the process so setting the gid could be set after more
than 0.5 seconds.
This change fixes it twofold:
* setgid is moved to be first instruction to be executed (also
signal handling was moved to before the potentially long
imports)
* the test was fixed to wait actively and only fail after the
timeout of 1s (which should not happen before of the fix above)
Additionally the test was using `task test` command rather than task run,
and in some circumstances when you tried to run it locally,
when FORK was disabled (MacOS) the same test could fail with
a different error because --error-file flag is not defined for
`task test` command but it is automatically added by the runner.
The task command has been changed to `run'
Fixing this tests caused occasional test_on_kill failure
which suffered from similar problem and had similar sleep
implemented.
Thanks to that the test will be usually faster as no significant delays
will be introduced.
---
airflow/task/task_runner/standard_task_runner.py | 10 +++----
.../task/task_runner/test_standard_task_runner.py | 31 +++++++++++++---------
2 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/airflow/task/task_runner/standard_task_runner.py
b/airflow/task/task_runner/standard_task_runner.py
index 3c54249..381cf5a 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -52,17 +52,17 @@ class StandardTaskRunner(BaseTaskRunner):
self.log.info("Started process %d to run task", pid)
return psutil.Process(pid)
else:
+ # Start a new process group
+ os.setpgid(0, 0)
import signal
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
from airflow import settings
from airflow.cli.cli_parser import get_parser
from airflow.sentry import Sentry
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- # Start a new process group
- os.setpgid(0, 0)
-
# Force a new SQLAlchemy session. We can't share open DB handles
# between process. The cli code will re-create this as part of its
# normal startup
diff --git a/tests/task/task_runner/test_standard_task_runner.py
b/tests/task/task_runner/test_standard_task_runner.py
index 918f3c9..c2e9069 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -33,6 +33,7 @@ from airflow.utils import timezone
from airflow.utils.platform import getuser
from airflow.utils.session import create_session
from airflow.utils.state import State
+from airflow.utils.timeout import timeout
from tests.test_utils.db import clear_db_runs
TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
@@ -79,7 +80,7 @@ class TestStandardTaskRunner:
local_task_job.task_instance.command_as_list.return_value = [
'airflow',
'tasks',
- 'test',
+ 'run',
'test_on_kill',
'task1',
'2016-01-01',
@@ -87,14 +88,17 @@ class TestStandardTaskRunner:
runner = StandardTaskRunner(local_task_job)
runner.start()
- time.sleep(0.5)
-
- pgid = os.getpgid(runner.process.pid)
- assert pgid > 0
- assert pgid != os.getpgid(0), "Task should be in a different process
group to us"
-
- processes = list(self._procs_in_pgroup(pgid))
-
+ # Wait until process sets its pgid to be equal to pid
+ with timeout(seconds=1):
+ while True:
+ runner_pgid = os.getpgid(runner.process.pid)
+ if runner_pgid == runner.process.pid:
+ break
+ time.sleep(0.01)
+
+ assert runner_pgid > 0
+ assert runner_pgid != os.getpgid(0), "Task should be in a different
process group to us"
+ processes = list(self._procs_in_pgroup(runner_pgid))
runner.terminate()
for process in processes:
@@ -217,10 +221,11 @@ class TestStandardTaskRunner:
session.close() # explicitly close as `create_session`s commit
will blow up otherwise
# Wait some time for the result
- for _ in range(20):
- if os.path.exists(path):
- break
- time.sleep(2)
+ with timeout(seconds=40):
+ while True:
+ if os.path.exists(path):
+ break
+ time.sleep(0.01)
with open(path) as f:
assert "ON_KILL_TEST" == f.readline()