This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new 79f836a  Address some path traversal vulnerabilities
79f836a is described below

commit 79f836a4a8d47e39b1b414caf6749b2e029be9d8
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Dec 23 17:17:00 2025 +0000

    Address some path traversal vulnerabilities
---
 atr/form.py                    | 117 ++++++++++++++++++++++++++++++++---------
 atr/get/docs.py                |   6 ++-
 atr/get/download.py            |  18 ++++---
 atr/get/draft.py               |  17 +++---
 atr/get/file.py                |  11 ++--
 atr/get/published.py           |   6 ++-
 atr/get/ref.py                 |  41 ++++++++-------
 atr/get/report.py              |  14 +++--
 atr/get/sbom.py                |  13 +++--
 atr/post/draft.py              |  16 ++++--
 atr/post/finish.py             |  12 +++--
 atr/post/sbom.py               |  28 ++++++----
 atr/post/upload.py             |   8 ++-
 atr/server.py                  |   2 +
 atr/shared/draft.py            |   2 +-
 atr/shared/finish.py           |  16 +++---
 atr/storage/writers/release.py |  22 ++++----
 17 files changed, 240 insertions(+), 109 deletions(-)

diff --git a/atr/form.py b/atr/form.py
index 97566a6..e4e8ae4 100644
--- a/atr/form.py
+++ b/atr/form.py
@@ -22,6 +22,7 @@ import json
 import pathlib
 import re
 import types
+import unicodedata
 from typing import TYPE_CHECKING, Annotated, Any, Final, Literal, 
TypeAliasType, get_args, get_origin
 
 import htpy
@@ -417,22 +418,23 @@ def to_filename(v: Any) -> pathlib.Path | None:
     if not v:
         return None
 
-    path = pathlib.Path(str(v))
+    name = str(v).strip()
 
-    if len(path.parts) != 1:
-        raise ValueError("Expected a filename, not a path containing 
directories")
+    if not name:
+        raise ValueError("Filename cannot be empty")
 
-    if path.is_absolute():
-        # This branch should be unreachable
-        raise ValueError("Absolute paths are not allowed")
+    if "\0" in name:
+        raise ValueError("Filename cannot contain null bytes")
+
+    name = unicodedata.normalize("NFC", name)
 
-    if "." in path.parts:
-        raise ValueError("Self directory references (.) are not allowed")
+    if ("/" in name) or ("\\" in name):
+        raise ValueError("Filename cannot contain path separators")
 
-    if ".." in path.parts:
-        raise ValueError("Parent directory references (..) are not allowed")
+    if name in (".", ".."):
+        raise ValueError("Invalid filename")
 
-    return path
+    return pathlib.Path(name)
 
 
 def to_int(v: Any) -> int:
@@ -450,6 +452,36 @@ def to_optional_url(v: Any) -> pydantic.HttpUrl | None:
     return pydantic.TypeAdapter(pydantic.HttpUrl).validate_python(v)
 
 
+def to_relpath(v: Any) -> pathlib.Path | None:
+    """Validate a relative filesystem path."""
+    if not v:
+        return None
+
+    path_str = str(v).strip()
+    if not path_str:
+        raise ValueError("Path cannot be empty")
+
+    validated = _validate_relpath_string(path_str)
+    return pathlib.Path(validated)
+
+
+def to_relpath_list(v: Any) -> list[pathlib.Path]:
+    if isinstance(v, list):
+        result = []
+        for item in v:
+            validated = to_relpath(item)
+            if validated is None:
+                raise ValueError("Path list items cannot be empty")
+            result.append(validated)
+        return result
+    if isinstance(v, str):
+        validated = to_relpath(v)
+        if validated is None:
+            raise ValueError("Path cannot be empty")
+        return [validated]
+    raise ValueError(f"Expected a path or list of paths, got 
{type(v).__name__}")
+
+
 def to_str_list(v: Any) -> list[str]:
     # TODO: Might need to handle the empty case
     if isinstance(v, list):
@@ -460,23 +492,16 @@ def to_str_list(v: Any) -> list[str]:
 
 
 def to_url_path(v: Any) -> str | None:
+    """Validate a relative URL style path, e.g. for SVN paths."""
     if not v:
         return None
 
-    url_path = str(v)
-
-    if url_path.startswith("/"):
-        raise ValueError("Absolute paths are not allowed")
-
-    segments = url_path.split("/")
+    path_str = str(v).strip()
+    if not path_str:
+        raise ValueError("Path cannot be empty")
 
-    if "." in segments:
-        raise ValueError("Self directory references (.) are not allowed")
-
-    if ".." in segments:
-        raise ValueError("Parent directory references (..) are not allowed")
-
-    return url_path
+    validated = _validate_relpath_string(path_str)
+    return str(validated)
 
 
 # Validator types come before other functions
@@ -535,6 +560,18 @@ OptionalURL = Annotated[
     pydantic.Field(default=None),
 ]
 
+RelPath = Annotated[
+    pathlib.Path | None,
+    functional_validators.BeforeValidator(to_relpath),
+    pydantic.Field(default=None),
+]
+
+RelPathList = Annotated[
+    list[pathlib.Path],
+    functional_validators.BeforeValidator(to_relpath_list),
+    pydantic.Field(default_factory=list),
+]
+
 StrList = Annotated[
     list[str],
     functional_validators.BeforeValidator(to_str_list),
@@ -1027,3 +1064,35 @@ def _render_widget(  # noqa: C901
         elements.append(error_div)
 
     return htm.div[elements] if (len(elements) > 1) else elements[0]
+
+
+def _validate_relpath_string(path_str: str) -> pathlib.PurePosixPath:
+    if "\0" in path_str:
+        raise ValueError("Path cannot contain null bytes")
+
+    path_str = unicodedata.normalize("NFC", path_str)
+
+    if "\\" in path_str:
+        raise ValueError("Path cannot contain backslashes")
+
+    # PurePosixPath normalises empty components
+    # Therefore, we must do this check on the path string
+    if "//" in path_str:
+        raise ValueError("Path cannot contain //")
+
+    # Check for absolute paths using both POSIX and Windows semantics
+    # We don't support Windows paths, but we want to detect all bad inputs
+    # PurePosixPath doesn't recognise Windows drive letters as absolute
+    # PureWindowsPath treats leading "/" differently
+    posix_path = pathlib.PurePosixPath(path_str)
+    windows_path = pathlib.PureWindowsPath(path_str)
+    if posix_path.is_absolute() or windows_path.is_absolute():
+        raise ValueError("Absolute paths are not allowed")
+
+    for part in posix_path.parts:
+        if part == "..":
+            raise ValueError("Parent directory references (..) are not 
allowed")
+        if part == ".":
+            raise ValueError("Self directory references (.) are not allowed")
+
+    return posix_path
diff --git a/atr/get/docs.py b/atr/get/docs.py
index b700433..dfaab4a 100644
--- a/atr/get/docs.py
+++ b/atr/get/docs.py
@@ -25,6 +25,7 @@ import quart
 
 import atr.blueprints.get as get
 import atr.config as config
+import atr.form as form
 import atr.template as template
 import atr.web as web
 
@@ -55,7 +56,10 @@ async def index(session: web.Committer | None) -> str:
 
 @get.public("/docs/<path:page>")
 async def page(session: web.Committer | None, page: str) -> str:
-    return await _serve_docs_page(page)
+    validated_page = form.to_relpath(page)
+    if validated_page is None:
+        quart.abort(400)
+    return await _serve_docs_page(str(validated_page))
 
 
 async def _serve_docs_page(page: str) -> str:
diff --git a/atr/get/download.py b/atr/get/download.py
index f9c6fa9..814fc70 100644
--- a/atr/get/download.py
+++ b/atr/get/download.py
@@ -27,6 +27,7 @@ import zipstream
 import atr.blueprints.get as get
 import atr.config as config
 import atr.db as db
+import atr.form as form
 import atr.htm as htm
 import atr.mapping as mapping
 import atr.models.sql as sql
@@ -141,20 +142,23 @@ async def _download_or_list(project_name: str, 
version_name: str, file_path: str
 
     # await session.check_access(project_name)
 
-    # Check that path is relative
-    original_path = pathlib.Path(file_path)
-    if (file_path != ".") and (not 
original_path.is_relative_to(original_path.anchor)):
-        raise web.FlashError("Path must be relative")
+    # Validate the path, and allow "." for root directory
+    if file_path == ".":
+        validated_path = pathlib.Path(".")
+    else:
+        validated_path = form.to_relpath(file_path)
+        if validated_path is None:
+            raise base.ASFQuartException("Invalid file path", errorcode=400)
 
     # We allow downloading files from any phase
     async with db.session() as data:
         release = await data.release(project_name=project_name, 
version=version_name).demand(
             base.ASFQuartException("Release does not exist", errorcode=404)
         )
-    full_path = util.release_directory(release) / file_path
+    full_path = util.release_directory(release) / validated_path
 
     if await aiofiles.os.path.isdir(full_path):
-        return await _list(original_path, full_path, project_name, 
version_name, file_path)
+        return await _list(validated_path, full_path, project_name, 
version_name, str(validated_path))
 
     # Check that the path is a regular file
     if not await aiofiles.os.path.isfile(full_path):
@@ -166,7 +170,7 @@ async def _download_or_list(project_name: str, 
version_name: str, file_path: str
 
     # Send the file with original filename
     return await quart.send_file(
-        full_path, as_attachment=True, attachment_filename=original_path.name, 
mimetype="application/octet-stream"
+        full_path, as_attachment=True, 
attachment_filename=validated_path.name, mimetype="application/octet-stream"
     )
 
 
diff --git a/atr/get/draft.py b/atr/get/draft.py
index 06431db..763584a 100644
--- a/atr/get/draft.py
+++ b/atr/get/draft.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import datetime
-import pathlib
 
 import aiofiles.os
 import asfquart.base as base
@@ -37,8 +36,12 @@ async def tools(session: web.Committer, project_name: str, 
version_name: str, fi
     """Show the tools for a specific file."""
     await session.check_access(project_name)
 
+    validated_path = form.to_relpath(file_path)
+    if validated_path is None:
+        raise base.ASFQuartException("Invalid file path", errorcode=400)
+
     release = await session.release(project_name, version_name)
-    full_path = str(util.release_directory(release) / file_path)
+    full_path = str(util.release_directory(release) / validated_path)
 
     # Check that the file exists
     if not await aiofiles.os.path.exists(full_path):
@@ -47,14 +50,16 @@ async def tools(session: web.Committer, project_name: str, 
version_name: str, fi
     modified = int(await aiofiles.os.path.getmtime(full_path))
     file_size = await aiofiles.os.path.getsize(full_path)
 
+    validated_path_str = str(validated_path)
+
     file_data = {
-        "filename": pathlib.Path(file_path).name,
+        "filename": validated_path.name,
         "bytes_size": file_size,
         "uploaded": datetime.datetime.fromtimestamp(modified, tz=datetime.UTC),
     }
 
     hashgen_action = util.as_url(
-        post.draft.hashgen, project_name=project_name, 
version_name=version_name, file_path=file_path
+        post.draft.hashgen, project_name=project_name, 
version_name=version_name, file_path=validated_path_str
     )
     sha512_form = form.render(
         model_cls=shared.draft.HashGen,
@@ -66,7 +71,7 @@ async def tools(session: web.Committer, project_name: str, 
version_name: str, fi
     sbom_form = form.render(
         model_cls=form.Empty,
         action=util.as_url(
-            post.draft.sbomgen, project_name=project_name, 
version_name=version_name, file_path=file_path
+            post.draft.sbomgen, project_name=project_name, 
version_name=version_name, file_path=validated_path_str
         ),
         submit_label="Generate CycloneDX SBOM (.cdx.json)",
         submit_classes="btn-outline-secondary",
@@ -78,7 +83,7 @@ async def tools(session: web.Committer, project_name: str, 
version_name: str, fi
         asf_id=session.uid,
         project_name=project_name,
         version_name=version_name,
-        file_path=file_path,
+        file_path=validated_path_str,
         file_data=file_data,
         release=release,
         format_file_size=util.format_file_size,
diff --git a/atr/get/file.py b/atr/get/file.py
index 2018af9..225f892 100644
--- a/atr/get/file.py
+++ b/atr/get/file.py
@@ -18,6 +18,7 @@
 from typing import Literal
 
 import atr.blueprints.get as get
+import atr.form as form
 import atr.get.compose as compose
 import atr.get.finish as finish
 import atr.get.vote as vote
@@ -128,9 +129,13 @@ async def selected_path(session: web.Committer, 
project_name: str, version_name:
     """View the content of a specific file in a release (any phase)."""
     await session.check_access(project_name)
 
+    validated_path = form.to_relpath(file_path)
+    if validated_path is None:
+        raise web.FlashError("Invalid file path")
+
     release = await session.release(project_name, version_name, phase=None)
     _max_view_size = 512 * 1024
-    full_path = util.release_directory(release) / file_path
+    full_path = util.release_directory(release) / validated_path
     content_listing = await util.archive_listing(full_path)
     content, is_text, is_truncated, error_message = await 
util.read_file_for_viewer(full_path, _max_view_size)
 
@@ -141,7 +146,7 @@ async def selected_path(session: web.Committer, 
project_name: str, version_name:
     block.a(href=back_url, class_="atr-back-link")[f"← Back to {phase_name} 
files"]
 
     block.div(".p-3.mt-4.mb-4.bg-light.border.rounded")[
-        htm.h2(".mt-0")[f"Viewing file: {file_path}"],
+        htm.h2(".mt-0")[f"Viewing file: {validated_path}"],
         htm.p(".mb-0")[htm.strong["Release:"], " ", release.name],
     ]
 
@@ -166,7 +171,7 @@ async def selected_path(session: web.Committer, 
project_name: str, version_name:
         block.div(".alert.alert-secondary")["No content available for this 
file."]
 
     return await template.blank(
-        f"View 
{release.project.short_display_name}/{release.version}/{file_path}", 
content=block.collect()
+        f"View 
{release.project.short_display_name}/{release.version}/{validated_path}", 
content=block.collect()
     )
 
 
diff --git a/atr/get/published.py b/atr/get/published.py
index eb5621f..2213b5d 100644
--- a/atr/get/published.py
+++ b/atr/get/published.py
@@ -23,6 +23,7 @@ import aiofiles.os
 import quart
 
 import atr.blueprints.get as get
+import atr.form as form
 import atr.htm as htm
 import atr.util as util
 import atr.web as web
@@ -34,7 +35,10 @@ async def path(session: web.Committer, path: str) -> 
web.QuartResponse:
     # This route is for debugging
     # When developing locally, there is no proxy to view the downloads 
directory
     # Therefore this path acts as a way to check the contents of that directory
-    return await _path(session, path)
+    validated_path = form.to_relpath(path)
+    if validated_path is None:
+        return quart.abort(400)
+    return await _path(session, str(validated_path))
 
 
 @get.committer("/published/")
diff --git a/atr/get/ref.py b/atr/get/ref.py
index 807b050..527c6b7 100644
--- a/atr/get/ref.py
+++ b/atr/get/ref.py
@@ -22,6 +22,7 @@ import quart
 
 import atr.blueprints.get as get
 import atr.config as config
+import atr.form as form
 import atr.web as web
 
 # Perhaps GitHub will get around to implementing symbol permalinks:
@@ -35,13 +36,7 @@ async def resolve(session: web.Committer | None, ref_path: 
str) -> web.WerkzeugR
 
     if ":" in ref_path:
         file_path_str, symbol = ref_path.rsplit(":", 1)
-        file_path = project_root / file_path_str
-
-        try:
-            resolved_file = file_path.resolve()
-            resolved_file.relative_to(project_root)
-        except (FileNotFoundError, ValueError):
-            quart.abort(404)
+        resolved_file, validated_path_str = 
_validate_and_resolve_path(file_path_str, project_root)
 
         if (not resolved_file.exists()) or (not resolved_file.is_file()):
             quart.abort(404)
@@ -51,19 +46,12 @@ async def resolve(session: web.Committer | None, ref_path: 
str) -> web.WerkzeugR
         if line_number is None:
             quart.abort(404)
 
-        github_url = 
f"https://github.com/apache/tooling-trusted-releases/blob/main/{file_path_str}#L{line_number}";
+        github_url = 
f"https://github.com/apache/tooling-trusted-releases/blob/main/{validated_path_str}#L{line_number}";
         return quart.redirect(github_url, code=303)
 
     is_directory = ref_path.endswith("/")
     path_str = ref_path.rstrip("/")
-
-    file_path = project_root / path_str
-
-    try:
-        resolved_path = file_path.resolve()
-        resolved_path.relative_to(project_root)
-    except (FileNotFoundError, ValueError):
-        quart.abort(404)
+    resolved_path, validated_path_str = _validate_and_resolve_path(path_str, 
project_root)
 
     if not resolved_path.exists():
         quart.abort(404)
@@ -71,11 +59,11 @@ async def resolve(session: web.Committer | None, ref_path: 
str) -> web.WerkzeugR
     if is_directory:
         if not resolved_path.is_dir():
             quart.abort(404)
-        github_url = 
f"https://github.com/apache/tooling-trusted-releases/tree/main/{path_str}";
+        github_url = 
f"https://github.com/apache/tooling-trusted-releases/tree/main/{validated_path_str}";
     else:
         if not resolved_path.is_file():
             quart.abort(404)
-        github_url = 
f"https://github.com/apache/tooling-trusted-releases/blob/main/{path_str}";
+        github_url = 
f"https://github.com/apache/tooling-trusted-releases/blob/main/{validated_path_str}";
 
     return quart.redirect(github_url, code=303)
 
@@ -100,3 +88,20 @@ async def _resolve_symbol_to_line(file_path: pathlib.Path, 
symbol: str) -> int |
                 return node.lineno
 
     return None
+
+
+def _validate_and_resolve_path(path_str: str, project_root: pathlib.Path) -> 
tuple[pathlib.Path, str]:
+    validated_path = form.to_relpath(path_str)
+    if validated_path is None:
+        quart.abort(400)
+    validated_path_str = str(validated_path)
+
+    file_path = project_root / validated_path
+
+    try:
+        resolved_path = file_path.resolve()
+        resolved_path.relative_to(project_root)
+    except (FileNotFoundError, ValueError):
+        quart.abort(404)
+
+    return resolved_path, validated_path_str
diff --git a/atr/get/report.py b/atr/get/report.py
index 4d4a68e..6559b93 100644
--- a/atr/get/report.py
+++ b/atr/get/report.py
@@ -16,12 +16,12 @@
 # under the License.
 
 import datetime
-import pathlib
 
 import aiofiles.os
 import asfquart.base as base
 
 import atr.blueprints.get as get
+import atr.form as form
 import atr.models.sql as sql
 import atr.storage as storage
 import atr.template as template
@@ -34,6 +34,10 @@ async def selected_path(session: web.Committer, 
project_name: str, version_name:
     """Show the report for a specific file."""
     await session.check_access(project_name)
 
+    validated_path = form.to_relpath(rel_path)
+    if validated_path is None:
+        raise base.ASFQuartException("Invalid file path", errorcode=400)
+
     # If the draft is not found, we try to get the release candidate
     try:
         release = await session.release(project_name, version_name, 
with_committee=True)
@@ -46,7 +50,7 @@ async def selected_path(session: web.Committer, project_name: 
str, version_name:
         raise base.ASFQuartException("Release has no committee", errorcode=500)
 
     # TODO: When we do more than one thing in a dir, we should use the 
revision directory directly
-    abs_path = util.release_directory(release) / rel_path
+    abs_path = util.release_directory(release) / validated_path
     if release.latest_revision_number is None:
         raise base.ASFQuartException("Release has no revision", errorcode=500)
 
@@ -60,10 +64,10 @@ async def selected_path(session: web.Committer, 
project_name: str, version_name:
     # Get all check results for this file
     async with storage.read() as read:
         ragp = read.as_general_public()
-        check_results = await ragp.checks.by_release_path(release, 
pathlib.Path(rel_path))
+        check_results = await ragp.checks.by_release_path(release, 
validated_path)
 
     file_data = {
-        "filename": pathlib.Path(rel_path).name,
+        "filename": validated_path.name,
         "bytes_size": file_size,
         "uploaded": datetime.datetime.fromtimestamp(modified, tz=datetime.UTC),
     }
@@ -72,7 +76,7 @@ async def selected_path(session: web.Committer, project_name: 
str, version_name:
         "report-selected-path.html",
         project_name=project_name,
         version_name=version_name,
-        rel_path=rel_path,
+        rel_path=str(validated_path),
         package=file_data,
         release=release,
         primary_results=check_results.primary_results_list,
diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index d60ca1a..783f238 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -44,6 +44,11 @@ if TYPE_CHECKING:
 async def report(session: web.Committer, project: str, version: str, 
file_path: str) -> str:
     await session.check_access(project)
 
+    validated_path = form.to_relpath(file_path)
+    if validated_path is None:
+        raise base.ASFQuartException("Invalid file path", errorcode=400)
+    validated_path_str = str(validated_path)
+
     # If the draft is not found, we try to get the release candidate
     try:
         release = await session.release(project, version, with_committee=True)
@@ -60,7 +65,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
                 version_name=version,
                 revision_number=release.latest_revision_number,
                 task_type=sql.TaskType.SBOM_TOOL_SCORE,
-                primary_rel_path=file_path,
+                primary_rel_path=validated_path_str,
             )
             .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
             .all()
@@ -70,7 +75,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
                 project_name=project,
                 version_name=version,
                 task_type=sql.TaskType.SBOM_AUGMENT,
-                primary_rel_path=file_path,
+                primary_rel_path=validated_path_str,
             )
             .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
             .all()
@@ -81,7 +86,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
                 project_name=project,
                 version_name=version,
                 task_type=sql.TaskType.SBOM_OSV_SCAN,
-                primary_rel_path=file_path,
+                primary_rel_path=validated_path_str,
                 revision_number=release.latest_revision_number,
             )
             .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
@@ -123,7 +128,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
         vulnerabilities = []
 
     _vulnerability_scan_section(
-        block, project, version, file_path, task_result, vulnerabilities, 
osv_tasks, is_release_candidate
+        block, project, version, validated_path_str, task_result, 
vulnerabilities, osv_tasks, is_release_candidate
     )
 
     _outdated_tool_section(block, task_result)
diff --git a/atr/post/draft.py b/atr/post/draft.py
index 8ddefac..9060b73 100644
--- a/atr/post/draft.py
+++ b/atr/post/draft.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import datetime
-import pathlib
 
 import aiofiles.os
 import aioshutil
@@ -79,7 +78,10 @@ async def delete_file(
     """Delete a specific file from the release candidate, creating a new 
revision."""
     await session.check_access(project_name)
 
-    rel_path_to_delete = pathlib.Path(str(delete_file_form.file_path))
+    rel_path_to_delete = delete_file_form.file_path
+    if rel_path_to_delete is None:
+        await quart.flash("No file path specified", "error")
+        return await session.redirect(get.compose.selected, 
project_name=project_name, version_name=version_name)
 
     try:
         async with storage.write(session) as write:
@@ -133,7 +135,10 @@ async def hashgen(session: web.Committer, project_name: 
str, version_name: str,
     """Generate an sha512 hash file for a candidate draft file, creating a new 
revision."""
     await session.check_access(project_name)
 
-    rel_path = pathlib.Path(file_path)
+    rel_path = form.to_relpath(file_path)
+    if rel_path is None:
+        await quart.flash("Invalid file path", "error")
+        return await session.redirect(get.compose.selected, 
project_name=project_name, version_name=version_name)
 
     try:
         async with storage.write(session) as write:
@@ -159,7 +164,10 @@ async def sbomgen(session: web.Committer, project_name: 
str, version_name: str,
     """Generate a CycloneDX SBOM file for a candidate draft file, creating a 
new revision."""
     await session.check_access(project_name)
 
-    rel_path = pathlib.Path(file_path)
+    rel_path = form.to_relpath(file_path)
+    if rel_path is None:
+        await quart.flash("Invalid file path", "error")
+        return await session.redirect(get.compose.selected, 
project_name=project_name, version_name=version_name)
 
     # Check that the file is a .tar.gz archive before creating a revision
     if not (file_path.endswith(".tar.gz") or file_path.endswith(".tgz") or 
file_path.endswith(".zip")):
diff --git a/atr/post/finish.py b/atr/post/finish.py
index 7df9a76..997f9ca 100644
--- a/atr/post/finish.py
+++ b/atr/post/finish.py
@@ -15,8 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import pathlib
-
 import quart
 
 import atr.blueprints.post as post
@@ -50,7 +48,9 @@ async def _delete_empty_directory(
     version_name: str,
     respond: shared.finish.Respond,
 ) -> tuple[web.QuartResponse, int] | web.WerkzeugResponse:
-    dir_to_delete_rel = pathlib.Path(delete_form.directory_to_delete)
+    dir_to_delete_rel = delete_form.directory_to_delete
+    if dir_to_delete_rel is None:
+        return await respond(400, "No directory specified.")
     try:
         async with storage.write(session) as write:
             wacp = await write.as_project_committee_member(project_name)
@@ -71,8 +71,10 @@ async def _move_file_to_revision(
     version_name: str,
     respond: shared.finish.Respond,
 ) -> tuple[web.QuartResponse, int] | web.WerkzeugResponse:
-    source_files_rel = [pathlib.Path(sf) for sf in move_form.source_files]
-    target_dir_rel = pathlib.Path(move_form.target_directory)
+    source_files_rel = move_form.source_files
+    target_dir_rel = move_form.target_directory
+    if target_dir_rel is None:
+        return await respond(400, "No target directory specified.")
     try:
         async with storage.write(session) as write:
             wacp = await write.as_project_committee_member(project_name)
diff --git a/atr/post/sbom.py b/atr/post/sbom.py
index 65f7c0e..09c779e 100644
--- a/atr/post/sbom.py
+++ b/atr/post/sbom.py
@@ -17,13 +17,14 @@
 
 from __future__ import annotations
 
-import pathlib
+from typing import TYPE_CHECKING
 
 import asfquart.base as base
 import quart
 
 import atr.blueprints.post as post
 import atr.db as db
+import atr.form as form
 import atr.get as get
 import atr.log as log
 import atr.shared as shared
@@ -31,6 +32,9 @@ import atr.storage as storage
 import atr.util as util
 import atr.web as web
 
+if TYPE_CHECKING:
+    import pathlib
+
 
 @post.committer("/sbom/report/<project>/<version>/<path:file_path>")
 @post.form(shared.sbom.SBOMForm)
@@ -39,22 +43,24 @@ async def report(
 ) -> web.WerkzeugResponse:
     await session.check_access(project)
 
+    validated_path = form.to_relpath(file_path)
+    if validated_path is None:
+        raise base.ASFQuartException("Invalid file path", errorcode=400)
+
     match sbom_form:
         case shared.sbom.AugmentSBOMForm():
-            return await _augment(session, project, version, file_path)
+            return await _augment(session, project, version, validated_path)
 
         case shared.sbom.ScanSBOMForm():
-            return await _scan(session, project, version, file_path)
+            return await _scan(session, project, version, validated_path)
 
 
 async def _augment(
-    session: web.Committer, project_name: str, version_name: str, file_path: 
str
+    session: web.Committer, project_name: str, version_name: str, rel_path: 
pathlib.Path
 ) -> web.WerkzeugResponse:
     """Augment a CycloneDX SBOM file."""
-    rel_path = pathlib.Path(file_path)
-
     # Check that the file is a .cdx.json archive before creating a revision
-    if not (file_path.endswith(".cdx.json")):
+    if not (rel_path.name.endswith(".cdx.json")):
         raise base.ASFQuartException("SBOM augmentation is only supported for 
.cdx.json files", errorcode=400)
 
     try:
@@ -93,11 +99,11 @@ async def _augment(
     )
 
 
-async def _scan(session: web.Committer, project_name: str, version_name: str, 
file_path: str) -> web.WerkzeugResponse:
+async def _scan(
+    session: web.Committer, project_name: str, version_name: str, rel_path: 
pathlib.Path
+) -> web.WerkzeugResponse:
     """Scan a CycloneDX SBOM file for vulnerabilities using OSV."""
-    rel_path = pathlib.Path(file_path)
-
-    if not (file_path.endswith(".cdx.json")):
+    if not (rel_path.name.endswith(".cdx.json")):
         raise base.ASFQuartException("OSV scanning is only supported for 
.cdx.json files", errorcode=400)
 
     try:
diff --git a/atr/post/upload.py b/atr/post/upload.py
index 3a0505c..a3565ca 100644
--- a/atr/post/upload.py
+++ b/atr/post/upload.py
@@ -28,6 +28,7 @@ import werkzeug.wrappers.response as response
 
 import atr.blueprints.post as post
 import atr.db as db
+import atr.form as form
 import atr.get as get
 import atr.log as log
 import atr.shared as shared
@@ -118,9 +119,12 @@ async def stage(
     if (not file) or (not file.filename):
         return _json_error("No file provided", 400)
 
-    filename = pathlib.Path(file.filename).name
-    if (not filename) or (filename in (".", "..")):
+    # Extract basename and validate
+    basename = pathlib.Path(file.filename).name
+    validated_filename = form.to_filename(basename)
+    if validated_filename is None:
         return _json_error("Invalid filename", 400)
+    filename = str(validated_filename)
 
     await aiofiles.os.makedirs(staging_dir, exist_ok=True)
 
diff --git a/atr/server.py b/atr/server.py
index 88962c2..2352660 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -354,6 +354,8 @@ def _app_setup_logging(app: base.QuartApp, config_mode: 
config.Mode, app_config:
 
 def _create_app(app_config: type[config.AppConfig]) -> base.QuartApp:
     """Create and configure the application."""
+    if os.sep != "/":
+        raise RuntimeError('ATR requires a POSIX compatible filesystem where 
os.sep is "/"')
     config_mode = config.get_mode()
     _app_dirs_setup(app_config)
     log.performance_init()
diff --git a/atr/shared/draft.py b/atr/shared/draft.py
index e6bce01..a52a9bf 100644
--- a/atr/shared/draft.py
+++ b/atr/shared/draft.py
@@ -19,7 +19,7 @@ import atr.form as form
 
 
 class DeleteFileForm(form.Form):
-    file_path: str = form.label("File path", widget=form.Widget.HIDDEN)
+    file_path: form.RelPath = form.label("File path", 
widget=form.Widget.HIDDEN)
 
 
 class HashGen(form.Empty):
diff --git a/atr/shared/finish.py b/atr/shared/finish.py
index 70cf00f..4f5501a 100644
--- a/atr/shared/finish.py
+++ b/atr/shared/finish.py
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import pathlib
 from collections.abc import Awaitable, Callable
 from typing import Annotated, Literal
 
@@ -33,23 +32,24 @@ type REMOVE_RC_TAGS = Literal["REMOVE_RC_TAGS"]
 
 class DeleteEmptyDirectoryForm(form.Form):
     variant: DELETE_DIR = form.value(DELETE_DIR)
-    directory_to_delete: str = form.label("Directory to delete", 
widget=form.Widget.SELECT)
+    directory_to_delete: form.RelPath = form.label("Directory to delete", 
widget=form.Widget.SELECT)
 
 
 class MoveFileForm(form.Form):
     variant: MOVE_FILE = form.value(MOVE_FILE)
-    source_files: form.StrList = form.label("Files to move")
-    target_directory: str = form.label("Target directory")
+    source_files: form.RelPathList = form.label("Files to move")
+    target_directory: form.RelPath = form.label("Target directory")
 
     @pydantic.model_validator(mode="after")
     def validate_move(self) -> "MoveFileForm":
         if not self.source_files:
             raise ValueError("Please select at least one file to move.")
 
-        source_paths = [pathlib.Path(sf) for sf in self.source_files]
-        target_dir = pathlib.Path(self.target_directory)
-        for source_path in source_paths:
-            if source_path.parent == target_dir:
+        if self.target_directory is None:
+            raise ValueError("Target directory is required.")
+
+        for source_path in self.source_files:
+            if source_path.parent == self.target_directory:
                 raise ValueError(f"Target directory cannot be the same as the 
source directory for {source_path.name}.")
         return self
 
diff --git a/atr/storage/writers/release.py b/atr/storage/writers/release.py
index 840946b..30a8669 100644
--- a/atr/storage/writers/release.py
+++ b/atr/storage/writers/release.py
@@ -23,7 +23,6 @@ import base64
 import contextlib
 import datetime
 import hashlib
-import pathlib
 from typing import TYPE_CHECKING, Final
 
 import aiofiles.os
@@ -35,6 +34,7 @@ import sqlmodel
 import atr.analysis as analysis
 import atr.config as config
 import atr.db as db
+import atr.form as form
 import atr.log as log
 import atr.models.api as api
 import atr.models.sql as sql
@@ -43,6 +43,7 @@ import atr.storage.types as types
 import atr.util as util
 
 if TYPE_CHECKING:
+    import pathlib
     from collections.abc import AsyncGenerator, Sequence
 
     import werkzeug.datastructures as datastructures
@@ -406,10 +407,12 @@ class CommitteeParticipant(FoundationCommitter):
 
     async def upload_file(self, args: api.ReleaseUploadArgs) -> sql.Revision:
         file_bytes = base64.b64decode(args.content, validate=True)
-        file_path = args.relpath.lstrip("/")
-        description = f"Upload via API: {file_path}"
+        validated_path = form.to_relpath(args.relpath)
+        if validated_path is None:
+            raise storage.AccessError("Invalid file path")
+        description = f"Upload via API: {validated_path}"
         async with self.create_and_manage_revision(args.project, args.version, 
description) as creating:
-            target_path = pathlib.Path(creating.interim_path) / file_path
+            target_path = creating.interim_path / validated_path
             await aiofiles.os.makedirs(target_path.parent, exist_ok=True)
             if target_path.exists():
                 raise storage.AccessError("File already exists")
@@ -442,12 +445,13 @@ class CommitteeParticipant(FoundationCommitter):
                 if not file_name:
                     if not file.filename:
                         raise storage.AccessError("No filename provided")
-                    # Use the original name
-                    relative_file_path = pathlib.Path(file.filename)
+                    # Validate the filename from multipart upload
+                    validated_path = form.to_relpath(file.filename)
+                    if validated_path is None:
+                        raise storage.AccessError("Invalid filename")
+                    relative_file_path = validated_path
                 else:
-                    # Use the provided name, relative to its anchor
-                    # In other words, ignore the leading "/"
-                    relative_file_path = 
file_name.relative_to(file_name.anchor)
+                    relative_file_path = file_name
 
                 # Construct path inside the new revision directory
                 target_path = creating.interim_path / relative_file_path


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to