This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/sbp by this push:
     new 33cac1db Set immutable permissions on extracted archive files and 
directories
33cac1db is described below

commit 33cac1db6dd40cb855685c058a843c73c7b9fea4
Author: Sean B. Palmer <[email protected]>
AuthorDate: Thu Mar 12 19:46:13 2026 +0000

    Set immutable permissions on extracted archive files and directories
---
 atr/server.py                          | 46 ++++++++++++++++++++++++++----
 atr/tasks/quarantine.py                |  6 ++++
 tests/unit/test_archive_permissions.py | 52 ++++++++++++++++++++++++++++++++++
 tests/unit/test_quarantine_backfill.py |  3 ++
 tests/unit/test_quarantine_task.py     |  3 ++
 5 files changed, 105 insertions(+), 5 deletions(-)

diff --git a/atr/server.py b/atr/server.py
index f6d6f5ce..c791d22e 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -185,15 +185,17 @@ def _app_dirs_setup(state_dir_str: str, hot_reload: bool) 
-> None:
         paths.get_tmp_dir(),
         paths.get_unfinished_dir(),
     ]
+    archives_dir = paths.get_archives_dir()
     unfinished_dir = paths.get_unfinished_dir()
     for directory in directories_to_ensure:
         directory.mkdir(parents=True, exist_ok=True)
-        if directory != unfinished_dir:
-            util.chmod_directories(directory, permissions=0o755)
-        else:
-            # Revision directories and descendants must be 555
-            # Therefore we handle those separately
+        # Some directories need custom permissions
+        if directory == archives_dir:
+            _enforce_archives_permissions(archives_dir)
+        elif directory == unfinished_dir:
             _enforce_unfinished_permissions(unfinished_dir)
+        else:
+            util.chmod_directories(directory, permissions=0o755)
 
 
 def _app_setup_api_docs(app: base.QuartApp) -> None:
@@ -651,6 +653,40 @@ def _create_app(app_config: type[config.AppConfig]) -> 
base.QuartApp:
     return app
 
 
+def _enforce_archives_permissions(archives_dir: pathlib.Path) -> None:
+    if not archives_dir.exists():
+        return
+    fixed_files = 0
+    fixed_dirs = 0
+
+    # Set ancestor directories of archive files to 755
+    for dirpath, _, _ in os.walk(archives_dir, topdown=True):
+        path = pathlib.Path(dirpath)
+        depth = len(path.relative_to(archives_dir).parts)
+        if depth < 3:
+            os.chmod(path, 0o755)
+
+    # Set archive files to 444
+    for file_path in archives_dir.rglob("*"):
+        if not file_path.is_file():
+            continue
+        depth = len(file_path.relative_to(archives_dir).parts)
+        if (depth >= 3) and (stat.S_IMODE(file_path.stat().st_mode) != 0o444):
+            os.chmod(file_path, 0o444)
+            fixed_files += 1
+
+    # Set archive directories to 555
+    for dirpath, _, _ in os.walk(archives_dir, topdown=False):
+        path = pathlib.Path(dirpath)
+        depth = len(path.relative_to(archives_dir).parts)
+        if (depth >= 3) and (stat.S_IMODE(path.stat().st_mode) != 0o555):
+            os.chmod(path, 0o555)
+            fixed_dirs += 1
+
+    if (fixed_files > 0) or (fixed_dirs > 0):
+        log.info(f"Fixed archive permissions: {fixed_files} files to 0o444, 
{fixed_dirs} directories to 0o555")
+
+
 def _enforce_unfinished_permissions(unfinished_dir: pathlib.Path) -> None:
     # Set ancestor directories of revisions to 755
     for dirpath, _dirnames, _filenames in os.walk(unfinished_dir, 
topdown=True):
diff --git a/atr/tasks/quarantine.py b/atr/tasks/quarantine.py
index e5dd9561..cebf48a9 100644
--- a/atr/tasks/quarantine.py
+++ b/atr/tasks/quarantine.py
@@ -207,6 +207,7 @@ def _extract_archive_to_cache_dir(
                 shutil.rmtree(staging_dir, ignore_errors=True)
             else:
                 raise
+        _set_archive_permissions(cache_dir)
         return time.monotonic() - start
     except Exception:
         shutil.rmtree(staging_dir, ignore_errors=True)
@@ -376,3 +377,8 @@ async def _run_safety_checks(
         if errors:
             any_failed = True
     return file_entries, any_failed
+
+
+def _set_archive_permissions(archive_dir: pathlib.Path) -> None:
+    util.chmod_files(archive_dir, 0o444)
+    util.chmod_directories(archive_dir, 0o555)
diff --git a/tests/unit/test_archive_permissions.py 
b/tests/unit/test_archive_permissions.py
new file mode 100644
index 00000000..4ce401fe
--- /dev/null
+++ b/tests/unit/test_archive_permissions.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pathlib
+import stat
+
+import atr.tasks.quarantine as quarantine
+
+
+def test_set_archive_permissions_locks_files_and_directories(tmp_path: 
pathlib.Path) -> None:
+    archive_dir = tmp_path / "extracted"
+    nested_dir = archive_dir / "src" / "main"
+    nested_dir.mkdir(parents=True)
+    (archive_dir / "README.txt").write_text("hello")
+    (nested_dir / "App.java").write_text("class App {}")
+
+    quarantine._set_archive_permissions(archive_dir)
+
+    assert stat.S_IMODE(archive_dir.stat().st_mode) == 0o555
+    assert stat.S_IMODE((archive_dir / "src").stat().st_mode) == 0o555
+    assert stat.S_IMODE(nested_dir.stat().st_mode) == 0o555
+    assert stat.S_IMODE((archive_dir / "README.txt").stat().st_mode) == 0o444
+    assert stat.S_IMODE((nested_dir / "App.java").stat().st_mode) == 0o444
+
+
+def test_set_archive_permissions_repairs_world_writable(tmp_path: 
pathlib.Path) -> None:
+    archive_dir = tmp_path / "extracted"
+    archive_dir.mkdir()
+    file_path = archive_dir / "file.txt"
+    file_path.write_text("content")
+    os.chmod(file_path, 0o666)
+    os.chmod(archive_dir, 0o777)
+
+    quarantine._set_archive_permissions(archive_dir)
+
+    assert stat.S_IMODE(file_path.stat().st_mode) == 0o444
+    assert stat.S_IMODE(archive_dir.stat().st_mode) == 0o555
diff --git a/tests/unit/test_quarantine_backfill.py 
b/tests/unit/test_quarantine_backfill.py
index f0370441..5b29b801 100644
--- a/tests/unit/test_quarantine_backfill.py
+++ b/tests/unit/test_quarantine_backfill.py
@@ -17,6 +17,7 @@
 
 import io
 import pathlib
+import stat
 import tarfile
 
 import pytest
@@ -137,6 +138,8 @@ def test_backfill_extracts_uncached_archive(monkeypatch: 
pytest.MonkeyPatch, tmp
     assert (result_cache_dir / "README.txt").read_text() == "Hello"
     assert duration >= 0
     assert (tmp_path / "cache" / "archive-backfill.done").is_file()
+    assert stat.S_IMODE(result_cache_dir.stat().st_mode) == 0o555
+    assert stat.S_IMODE((result_cache_dir / "README.txt").stat().st_mode) == 
0o444
 
 
 def test_backfill_skips_non_archive_files(monkeypatch: pytest.MonkeyPatch, 
tmp_path: pathlib.Path) -> None:
diff --git a/tests/unit/test_quarantine_task.py 
b/tests/unit/test_quarantine_task.py
index 05efa15c..2979c911 100644
--- a/tests/unit/test_quarantine_task.py
+++ b/tests/unit/test_quarantine_task.py
@@ -18,6 +18,7 @@
 import errno
 import io
 import pathlib
+import stat
 import tarfile
 import unittest.mock as mock
 
@@ -228,6 +229,8 @@ async def 
test_extract_archives_to_cache_stages_in_temporary_then_promotes(
     assert cache_dir.is_dir()
     assert (cache_dir / "content.txt").read_text() == "cached"
     assert list(staging_base.iterdir()) == []
+    assert stat.S_IMODE(cache_dir.stat().st_mode) == 0o555
+    assert stat.S_IMODE((cache_dir / "content.txt").stat().st_mode) == 0o444
 
 
 @pytest.mark.asyncio


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to