This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d2eab1de0c3ee93e5e1c7830ffcfdb534657fc21
Author: Paul Lockaby <[email protected]>
AuthorDate: Wed Dec 8 08:00:14 2021 -0800

    fixing #19028 by moving chown to use sudo (#20114)
    
    * fixing #19028 by having chown be in a sudo call
    
    * removing unused import
    
    * trying to clean up a test
    
    * combine sudo chown calls
    
    * force exception when chown fails
    
    * Update tests/task/task_runner/test_base_task_runner.py
    
    * Fix tests
    
    * Fix formatting
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
    (cherry picked from commit b37c0efabd29b9f20ba05c0e1281de22809e0624)
---
 airflow/task/task_runner/base_task_runner.py    | 15 +++++----------
 tests/jobs/test_local_task_job.py               |  6 ++++--
 tests/task/task_runner/test_base_task_runner.py | 11 +++++------
 3 files changed, 14 insertions(+), 18 deletions(-)

diff --git a/airflow/task/task_runner/base_task_runner.py 
b/airflow/task/task_runner/base_task_runner.py
index 28bb847..5551508 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -19,7 +19,6 @@
 import os
 import subprocess
 import threading
-from pwd import getpwnam
 from tempfile import NamedTemporaryFile
 from typing import Optional, Union
 
@@ -58,6 +57,8 @@ class BaseTaskRunner(LoggingMixin):
             except AirflowConfigException:
                 self.run_as_user = None
 
+        self._error_file = NamedTemporaryFile(delete=True)
+
         # Add sudo commands to change user if we need to. Needed to handle 
SubDagOperator
         # case using a SequentialExecutor.
         self.log.debug("Planning to run as the %s user", self.run_as_user)
@@ -69,7 +70,9 @@ class BaseTaskRunner(LoggingMixin):
             cfg_path = tmp_configuration_copy(chmod=0o600, include_env=True, 
include_cmds=True)
 
             # Give ownership of file to user; only they can read and write
-            subprocess.call(['sudo', 'chown', self.run_as_user, cfg_path], 
close_fds=True)
+            subprocess.check_call(
+                ['sudo', 'chown', self.run_as_user, cfg_path, 
self._error_file.name], close_fds=True
+            )
 
             # propagate PYTHONPATH environment variable
             pythonpath_value = os.environ.get(PYTHONPATH_VAR, '')
@@ -85,14 +88,6 @@ class BaseTaskRunner(LoggingMixin):
             # - the runner can read/execute those values as it needs
             cfg_path = tmp_configuration_copy(chmod=0o600, include_env=False, 
include_cmds=False)
 
-        self._error_file = NamedTemporaryFile(delete=True)
-        if self.run_as_user:
-            try:
-                os.chown(self._error_file.name, 
getpwnam(self.run_as_user).pw_uid, -1)
-            except KeyError:
-                # No user `run_as_user` found
-                pass
-
         self._cfg_path = cfg_path
         self._command = (
             popen_prepend
diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index f23a94f..d9d677d 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -148,8 +148,9 @@ class TestLocalTaskJob:
         with pytest.raises(AirflowException):
             job1.heartbeat_callback()
 
+    @mock.patch('subprocess.check_call')
     @mock.patch('airflow.jobs.local_task_job.psutil')
-    def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, 
dag_maker):
+    def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, 
dag_maker):
         session = settings.Session()
         with dag_maker('test_localtaskjob_heartbeat'):
             op1 = DummyOperator(task_id='op1', run_as_user='myuser')
@@ -190,8 +191,9 @@ class TestLocalTaskJob:
             job1.heartbeat_callback()
 
     @conf_vars({('core', 'default_impersonation'): 'testuser'})
+    @mock.patch('subprocess.check_call')
     @mock.patch('airflow.jobs.local_task_job.psutil')
-    def test_localtaskjob_heartbeat_with_default_impersonation(self, 
psutil_mock, dag_maker):
+    def test_localtaskjob_heartbeat_with_default_impersonation(self, 
psutil_mock, _, dag_maker):
         session = settings.Session()
         with dag_maker('test_localtaskjob_heartbeat'):
             op1 = DummyOperator(task_id='op1')
diff --git a/tests/task/task_runner/test_base_task_runner.py 
b/tests/task/task_runner/test_base_task_runner.py
index 499bba7..a0bd0ac 100644
--- a/tests/task/task_runner/test_base_task_runner.py
+++ b/tests/task/task_runner/test_base_task_runner.py
@@ -25,10 +25,11 @@ from airflow.task.task_runner.base_task_runner import 
BaseTaskRunner
 
 
 @pytest.mark.parametrize(["impersonation"], (("nobody",), (None,)))
[email protected]('subprocess.call')
[email protected]('os.chown')
[email protected]('subprocess.check_call')
 @mock.patch('airflow.task.task_runner.base_task_runner.tmp_configuration_copy')
-def test_config_copy_mode(tmp_configuration_copy, chown, subprocess_call, 
dag_maker, impersonation):
+def test_config_copy_mode(tmp_configuration_copy, subprocess_call, dag_maker, 
impersonation):
+    tmp_configuration_copy.return_value = "/tmp/some-string"
+
     with dag_maker("test"):
         BaseOperator(task_id="task_1", run_as_user=impersonation)
 
@@ -45,10 +46,8 @@ def test_config_copy_mode(tmp_configuration_copy, chown, 
subprocess_call, dag_ma
     tmp_configuration_copy.assert_called_with(chmod=0o600, 
include_env=includes, include_cmds=includes)
 
     if impersonation:
-        chown.assert_called()
         subprocess_call.assert_called_with(
-            ['sudo', 'chown', impersonation, 
tmp_configuration_copy.return_value], close_fds=True
+            ['sudo', 'chown', impersonation, "/tmp/some-string", 
runner._error_file.name], close_fds=True
         )
     else:
-        chown.assert_not_called()
         subprocess_call.not_assert_called()

Reply via email to