This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e6ab8fc158d377f1593fdddf7dd35a9200964d35 Author: Jarek Potiuk <[email protected]> AuthorDate: Mon Mar 18 19:28:37 2024 +0100 Fix excessive permission changing for log task handler (#38164) When log dir permission was changed by log handler, we've implemented also changing permissions of parent folders recursively, however it was quite a bit too much to change it for home directory where the log folder could have been created - because in some cases changing permissions might lead to unexpected side-effects - such as loosing ability to login to ssh server. Fixes: #38137 (cherry picked from commit 2c1d0f8c4121589e92e4c0ca1665b89a2691d2d8) --- airflow/utils/log/file_task_handler.py | 41 ++++++++-------------------------- tests/utils/test_log_handlers.py | 28 ++++++++++------------- 2 files changed, 21 insertions(+), 48 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 7e1faca4ec..e6dd540267 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -159,31 +159,6 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: raise AirflowException(f"Could not find TaskInstance for {ti}") -def _change_directory_permissions_up(directory: Path, folder_permissions: int): - """ - Change permissions of the given directory and its parents. - - Only attempt to change permissions for directories owned by the current user. - - :param directory: directory to change permissions of (including parents) - :param folder_permissions: permissions to set - """ - if directory.stat().st_uid == os.getuid(): - if directory.stat().st_mode % 0o1000 != folder_permissions % 0o1000: - print(f"Changing {directory} permission to {folder_permissions}") - try: - directory.chmod(folder_permissions) - except PermissionError as e: - # In some circumstances (depends on user and filesystem) we might not be able to - # change the permission for the folder (when the folder was created by another user - # before or when the filesystem does not allow to change permission). We should not - # fail in this case but rather ignore it. - print(f"Failed to change {directory} permission to {folder_permissions}: {e}") - return - if directory.parent != directory: - _change_directory_permissions_up(directory.parent, folder_permissions) - - class FileTaskHandler(logging.Handler): """ FileTaskHandler is a python log handler that handles and reads task instance logs. @@ -481,7 +456,8 @@ class FileTaskHandler(logging.Handler): return logs, metadata_array - def _prepare_log_folder(self, directory: Path): + @staticmethod + def _prepare_log_folder(directory: Path, new_folder_permissions: int): """ Prepare the log folder and ensure its mode is as configured. @@ -505,11 +481,9 @@ class FileTaskHandler(logging.Handler): sure that the same group is set as default group for both - impersonated user and main airflow user. """ - new_folder_permissions = int( - conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8 - ) - directory.mkdir(mode=new_folder_permissions, parents=True, exist_ok=True) - _change_directory_permissions_up(directory, new_folder_permissions) + for parent in reversed(directory.parents): + parent.mkdir(mode=new_folder_permissions, exist_ok=True) + directory.mkdir(mode=new_folder_permissions, exist_ok=True) def _init_file(self, ti, *, identifier: str | None = None): """ @@ -531,7 +505,10 @@ class FileTaskHandler(logging.Handler): # if this is true, we're invoked via set_context in the context of # setting up individual trigger logging. return trigger log path. full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id) - self._prepare_log_folder(Path(full_path).parent) + new_folder_permissions = int( + conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8 + ) + self._prepare_log_folder(Path(full_path).parent, new_folder_permissions) if not os.path.exists(full_path): open(full_path, "a").close() diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 36810506da..6017c761c9 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -22,7 +22,6 @@ import logging.config import os import re import shutil -import tempfile from pathlib import Path from unittest import mock from unittest.mock import patch @@ -42,7 +41,6 @@ from airflow.operators.python import PythonOperator from airflow.utils.log.file_task_handler import ( FileTaskHandler, LogType, - _change_directory_permissions_up, _interleave_logs, _parse_timestamps_in_log_file, ) @@ -724,28 +722,26 @@ def test_interleave_logs_correct_ordering(): assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe)) -def test_permissions_for_new_directories(): - tmp_path = Path(tempfile.mkdtemp()) +def test_permissions_for_new_directories(tmpdir): + tmpdir_path = Path(tmpdir) try: # Set umask to 0o027: owner rwx, group rx-w, other -rwx old_umask = os.umask(0o027) try: - subdir = tmp_path / "subdir1" / "subdir2" + base_dir = tmpdir_path / "base" + base_dir.mkdir() + log_dir = base_dir / "subdir1" / "subdir2" # force permissions for the new folder to be owner rwx, group -rxw, other -rwx new_folder_permissions = 0o700 # default permissions are owner rwx, group rx-w, other -rwx (umask bit negative) default_permissions = 0o750 - subdir.mkdir(mode=new_folder_permissions, parents=True, exist_ok=True) - assert subdir.exists() - assert subdir.is_dir() - assert subdir.stat().st_mode % 0o1000 == new_folder_permissions - # initially parent permissions are as per umask - assert subdir.parent.stat().st_mode % 0o1000 == default_permissions - _change_directory_permissions_up(subdir, new_folder_permissions) - assert subdir.stat().st_mode % 0o1000 == new_folder_permissions - # now parent permissions are as per new_folder_permissions - assert subdir.parent.stat().st_mode % 0o1000 == new_folder_permissions + FileTaskHandler._prepare_log_folder(log_dir, new_folder_permissions) + assert log_dir.exists() + assert log_dir.is_dir() + assert log_dir.stat().st_mode % 0o1000 == new_folder_permissions + assert log_dir.parent.stat().st_mode % 0o1000 == new_folder_permissions + assert base_dir.stat().st_mode % 0o1000 == default_permissions finally: os.umask(old_umask) finally: - shutil.rmtree(tmp_path) + shutil.rmtree(tmpdir_path)
