This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d2eab1de0c3ee93e5e1c7830ffcfdb534657fc21 Author: Paul Lockaby <[email protected]> AuthorDate: Wed Dec 8 08:00:14 2021 -0800 fixing #19028 by moving chown to use sudo (#20114) * fixing #19028 by having chown be in a sudo call * removing unused import * trying to clean up a test * combine sudo chown calls * force exception when chown fails * Update tests/task/task_runner/test_base_task_runner.py * Fix tests * Fix formatting Co-authored-by: Ash Berlin-Taylor <[email protected]> Co-authored-by: Ash Berlin-Taylor <[email protected]> (cherry picked from commit b37c0efabd29b9f20ba05c0e1281de22809e0624) --- airflow/task/task_runner/base_task_runner.py | 15 +++++---------- tests/jobs/test_local_task_job.py | 6 ++++-- tests/task/task_runner/test_base_task_runner.py | 11 +++++------ 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index 28bb847..5551508 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -19,7 +19,6 @@ import os import subprocess import threading -from pwd import getpwnam from tempfile import NamedTemporaryFile from typing import Optional, Union @@ -58,6 +57,8 @@ class BaseTaskRunner(LoggingMixin): except AirflowConfigException: self.run_as_user = None + self._error_file = NamedTemporaryFile(delete=True) + # Add sudo commands to change user if we need to. Needed to handle SubDagOperator # case using a SequentialExecutor. self.log.debug("Planning to run as the %s user", self.run_as_user) @@ -69,7 +70,9 @@ class BaseTaskRunner(LoggingMixin): cfg_path = tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True) # Give ownership of file to user; only they can read and write - subprocess.call(['sudo', 'chown', self.run_as_user, cfg_path], close_fds=True) + subprocess.check_call( + ['sudo', 'chown', self.run_as_user, cfg_path, self._error_file.name], close_fds=True + ) # propagate PYTHONPATH environment variable pythonpath_value = os.environ.get(PYTHONPATH_VAR, '') @@ -85,14 +88,6 @@ class BaseTaskRunner(LoggingMixin): # - the runner can read/execute those values as it needs cfg_path = tmp_configuration_copy(chmod=0o600, include_env=False, include_cmds=False) - self._error_file = NamedTemporaryFile(delete=True) - if self.run_as_user: - try: - os.chown(self._error_file.name, getpwnam(self.run_as_user).pw_uid, -1) - except KeyError: - # No user `run_as_user` found - pass - self._cfg_path = cfg_path self._command = ( popen_prepend diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index f23a94f..d9d677d 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -148,8 +148,9 @@ class TestLocalTaskJob: with pytest.raises(AirflowException): job1.heartbeat_callback() + @mock.patch('subprocess.check_call') @mock.patch('airflow.jobs.local_task_job.psutil') - def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, dag_maker): + def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker): session = settings.Session() with dag_maker('test_localtaskjob_heartbeat'): op1 = DummyOperator(task_id='op1', run_as_user='myuser') @@ -190,8 +191,9 @@ class TestLocalTaskJob: job1.heartbeat_callback() @conf_vars({('core', 'default_impersonation'): 'testuser'}) + @mock.patch('subprocess.check_call') @mock.patch('airflow.jobs.local_task_job.psutil') - def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, dag_maker): + def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _, dag_maker): session = settings.Session() with dag_maker('test_localtaskjob_heartbeat'): op1 = DummyOperator(task_id='op1') diff --git a/tests/task/task_runner/test_base_task_runner.py b/tests/task/task_runner/test_base_task_runner.py index 499bba7..a0bd0ac 100644 --- a/tests/task/task_runner/test_base_task_runner.py +++ b/tests/task/task_runner/test_base_task_runner.py @@ -25,10 +25,11 @@ from airflow.task.task_runner.base_task_runner import BaseTaskRunner @pytest.mark.parametrize(["impersonation"], (("nobody",), (None,))) [email protected]('subprocess.call') [email protected]('os.chown') [email protected]('subprocess.check_call') @mock.patch('airflow.task.task_runner.base_task_runner.tmp_configuration_copy') -def test_config_copy_mode(tmp_configuration_copy, chown, subprocess_call, dag_maker, impersonation): +def test_config_copy_mode(tmp_configuration_copy, subprocess_call, dag_maker, impersonation): + tmp_configuration_copy.return_value = "/tmp/some-string" + with dag_maker("test"): BaseOperator(task_id="task_1", run_as_user=impersonation) @@ -45,10 +46,8 @@ def test_config_copy_mode(tmp_configuration_copy, chown, subprocess_call, dag_ma tmp_configuration_copy.assert_called_with(chmod=0o600, include_env=includes, include_cmds=includes) if impersonation: - chown.assert_called() subprocess_call.assert_called_with( - ['sudo', 'chown', impersonation, tmp_configuration_copy.return_value], close_fds=True + ['sudo', 'chown', impersonation, "/tmp/some-string", runner._error_file.name], close_fds=True ) else: - chown.assert_not_called() subprocess_call.not_assert_called()
