This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e6ab8fc158d377f1593fdddf7dd35a9200964d35
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Mar 18 19:28:37 2024 +0100

    Fix excessive permission changing for log task handler (#38164)
    
    When log dir permission was changed by log handler, we've
    implemented also changing permissions of parent folders recursively,
    however it was quite a bit too much to change it for home directory
    where the log folder could have been created - because in some cases
    changing permissions might lead to unexpected side-effects - such
    as loosing ability to login to ssh server.
    
    Fixes: #38137
    (cherry picked from commit 2c1d0f8c4121589e92e4c0ca1665b89a2691d2d8)
---
 airflow/utils/log/file_task_handler.py | 41 ++++++++--------------------------
 tests/utils/test_log_handlers.py       | 28 ++++++++++-------------
 2 files changed, 21 insertions(+), 48 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 7e1faca4ec..e6dd540267 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -159,31 +159,6 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
         raise AirflowException(f"Could not find TaskInstance for {ti}")
 
 
-def _change_directory_permissions_up(directory: Path, folder_permissions: int):
-    """
-    Change permissions of the given directory and its parents.
-
-    Only attempt to change permissions for directories owned by the current 
user.
-
-    :param directory: directory to change permissions of (including parents)
-    :param folder_permissions: permissions to set
-    """
-    if directory.stat().st_uid == os.getuid():
-        if directory.stat().st_mode % 0o1000 != folder_permissions % 0o1000:
-            print(f"Changing {directory} permission to {folder_permissions}")
-            try:
-                directory.chmod(folder_permissions)
-            except PermissionError as e:
-                # In some circumstances (depends on user and filesystem) we 
might not be able to
-                # change the permission for the folder (when the folder was 
created by another user
-                # before or when the filesystem does not allow to change 
permission). We should not
-                # fail in this case but rather ignore it.
-                print(f"Failed to change {directory} permission to 
{folder_permissions}: {e}")
-                return
-        if directory.parent != directory:
-            _change_directory_permissions_up(directory.parent, 
folder_permissions)
-
-
 class FileTaskHandler(logging.Handler):
     """
     FileTaskHandler is a python log handler that handles and reads task 
instance logs.
@@ -481,7 +456,8 @@ class FileTaskHandler(logging.Handler):
 
         return logs, metadata_array
 
-    def _prepare_log_folder(self, directory: Path):
+    @staticmethod
+    def _prepare_log_folder(directory: Path, new_folder_permissions: int):
         """
         Prepare the log folder and ensure its mode is as configured.
 
@@ -505,11 +481,9 @@ class FileTaskHandler(logging.Handler):
         sure that the same group is set as default group for both - 
impersonated user and main airflow
         user.
         """
-        new_folder_permissions = int(
-            conf.get("logging", "file_task_handler_new_folder_permissions", 
fallback="0o775"), 8
-        )
-        directory.mkdir(mode=new_folder_permissions, parents=True, 
exist_ok=True)
-        _change_directory_permissions_up(directory, new_folder_permissions)
+        for parent in reversed(directory.parents):
+            parent.mkdir(mode=new_folder_permissions, exist_ok=True)
+        directory.mkdir(mode=new_folder_permissions, exist_ok=True)
 
     def _init_file(self, ti, *, identifier: str | None = None):
         """
@@ -531,7 +505,10 @@ class FileTaskHandler(logging.Handler):
             # if this is true, we're invoked via set_context in the context of
             # setting up individual trigger logging. return trigger log path.
             full_path = self.add_triggerer_suffix(full_path=full_path, 
job_id=ti.triggerer_job.id)
-        self._prepare_log_folder(Path(full_path).parent)
+        new_folder_permissions = int(
+            conf.get("logging", "file_task_handler_new_folder_permissions", 
fallback="0o775"), 8
+        )
+        self._prepare_log_folder(Path(full_path).parent, 
new_folder_permissions)
 
         if not os.path.exists(full_path):
             open(full_path, "a").close()
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 36810506da..6017c761c9 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -22,7 +22,6 @@ import logging.config
 import os
 import re
 import shutil
-import tempfile
 from pathlib import Path
 from unittest import mock
 from unittest.mock import patch
@@ -42,7 +41,6 @@ from airflow.operators.python import PythonOperator
 from airflow.utils.log.file_task_handler import (
     FileTaskHandler,
     LogType,
-    _change_directory_permissions_up,
     _interleave_logs,
     _parse_timestamps_in_log_file,
 )
@@ -724,28 +722,26 @@ def test_interleave_logs_correct_ordering():
     assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, 
"", sample_with_dupe))
 
 
-def test_permissions_for_new_directories():
-    tmp_path = Path(tempfile.mkdtemp())
+def test_permissions_for_new_directories(tmpdir):
+    tmpdir_path = Path(tmpdir)
     try:
         # Set umask to 0o027: owner rwx, group rx-w, other -rwx
         old_umask = os.umask(0o027)
         try:
-            subdir = tmp_path / "subdir1" / "subdir2"
+            base_dir = tmpdir_path / "base"
+            base_dir.mkdir()
+            log_dir = base_dir / "subdir1" / "subdir2"
             # force permissions for the new folder to be owner rwx, group 
-rxw, other -rwx
             new_folder_permissions = 0o700
             # default permissions are owner rwx, group rx-w, other -rwx (umask 
bit negative)
             default_permissions = 0o750
-            subdir.mkdir(mode=new_folder_permissions, parents=True, 
exist_ok=True)
-            assert subdir.exists()
-            assert subdir.is_dir()
-            assert subdir.stat().st_mode % 0o1000 == new_folder_permissions
-            # initially parent permissions are as per umask
-            assert subdir.parent.stat().st_mode % 0o1000 == default_permissions
-            _change_directory_permissions_up(subdir, new_folder_permissions)
-            assert subdir.stat().st_mode % 0o1000 == new_folder_permissions
-            # now parent permissions are as per new_folder_permissions
-            assert subdir.parent.stat().st_mode % 0o1000 == 
new_folder_permissions
+            FileTaskHandler._prepare_log_folder(log_dir, 
new_folder_permissions)
+            assert log_dir.exists()
+            assert log_dir.is_dir()
+            assert log_dir.stat().st_mode % 0o1000 == new_folder_permissions
+            assert log_dir.parent.stat().st_mode % 0o1000 == 
new_folder_permissions
+            assert base_dir.stat().st_mode % 0o1000 == default_permissions
         finally:
             os.umask(old_umask)
     finally:
-        shutil.rmtree(tmp_path)
+        shutil.rmtree(tmpdir_path)

Reply via email to