This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 79f836a Address some path traversal vulnerabilities
79f836a is described below
commit 79f836a4a8d47e39b1b414caf6749b2e029be9d8
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Dec 23 17:17:00 2025 +0000
Address some path traversal vulnerabilities
---
atr/form.py | 117 ++++++++++++++++++++++++++++++++---------
atr/get/docs.py | 6 ++-
atr/get/download.py | 18 ++++---
atr/get/draft.py | 17 +++---
atr/get/file.py | 11 ++--
atr/get/published.py | 6 ++-
atr/get/ref.py | 41 ++++++++-------
atr/get/report.py | 14 +++--
atr/get/sbom.py | 13 +++--
atr/post/draft.py | 16 ++++--
atr/post/finish.py | 12 +++--
atr/post/sbom.py | 28 ++++++----
atr/post/upload.py | 8 ++-
atr/server.py | 2 +
atr/shared/draft.py | 2 +-
atr/shared/finish.py | 16 +++---
atr/storage/writers/release.py | 22 ++++----
17 files changed, 240 insertions(+), 109 deletions(-)
diff --git a/atr/form.py b/atr/form.py
index 97566a6..e4e8ae4 100644
--- a/atr/form.py
+++ b/atr/form.py
@@ -22,6 +22,7 @@ import json
import pathlib
import re
import types
+import unicodedata
from typing import TYPE_CHECKING, Annotated, Any, Final, Literal,
TypeAliasType, get_args, get_origin
import htpy
@@ -417,22 +418,23 @@ def to_filename(v: Any) -> pathlib.Path | None:
if not v:
return None
- path = pathlib.Path(str(v))
+ name = str(v).strip()
- if len(path.parts) != 1:
- raise ValueError("Expected a filename, not a path containing
directories")
+ if not name:
+ raise ValueError("Filename cannot be empty")
- if path.is_absolute():
- # This branch should be unreachable
- raise ValueError("Absolute paths are not allowed")
+ if "\0" in name:
+ raise ValueError("Filename cannot contain null bytes")
+
+ name = unicodedata.normalize("NFC", name)
- if "." in path.parts:
- raise ValueError("Self directory references (.) are not allowed")
+ if ("/" in name) or ("\\" in name):
+ raise ValueError("Filename cannot contain path separators")
- if ".." in path.parts:
- raise ValueError("Parent directory references (..) are not allowed")
+ if name in (".", ".."):
+ raise ValueError("Invalid filename")
- return path
+ return pathlib.Path(name)
def to_int(v: Any) -> int:
@@ -450,6 +452,36 @@ def to_optional_url(v: Any) -> pydantic.HttpUrl | None:
return pydantic.TypeAdapter(pydantic.HttpUrl).validate_python(v)
+def to_relpath(v: Any) -> pathlib.Path | None:
+ """Validate a relative filesystem path."""
+ if not v:
+ return None
+
+ path_str = str(v).strip()
+ if not path_str:
+ raise ValueError("Path cannot be empty")
+
+ validated = _validate_relpath_string(path_str)
+ return pathlib.Path(validated)
+
+
+def to_relpath_list(v: Any) -> list[pathlib.Path]:
+ if isinstance(v, list):
+ result = []
+ for item in v:
+ validated = to_relpath(item)
+ if validated is None:
+ raise ValueError("Path list items cannot be empty")
+ result.append(validated)
+ return result
+ if isinstance(v, str):
+ validated = to_relpath(v)
+ if validated is None:
+ raise ValueError("Path cannot be empty")
+ return [validated]
+ raise ValueError(f"Expected a path or list of paths, got
{type(v).__name__}")
+
+
def to_str_list(v: Any) -> list[str]:
# TODO: Might need to handle the empty case
if isinstance(v, list):
@@ -460,23 +492,16 @@ def to_str_list(v: Any) -> list[str]:
def to_url_path(v: Any) -> str | None:
+ """Validate a relative URL style path, e.g. for SVN paths."""
if not v:
return None
- url_path = str(v)
-
- if url_path.startswith("/"):
- raise ValueError("Absolute paths are not allowed")
-
- segments = url_path.split("/")
+ path_str = str(v).strip()
+ if not path_str:
+ raise ValueError("Path cannot be empty")
- if "." in segments:
- raise ValueError("Self directory references (.) are not allowed")
-
- if ".." in segments:
- raise ValueError("Parent directory references (..) are not allowed")
-
- return url_path
+ validated = _validate_relpath_string(path_str)
+ return str(validated)
# Validator types come before other functions
@@ -535,6 +560,18 @@ OptionalURL = Annotated[
pydantic.Field(default=None),
]
+RelPath = Annotated[
+ pathlib.Path | None,
+ functional_validators.BeforeValidator(to_relpath),
+ pydantic.Field(default=None),
+]
+
+RelPathList = Annotated[
+ list[pathlib.Path],
+ functional_validators.BeforeValidator(to_relpath_list),
+ pydantic.Field(default_factory=list),
+]
+
StrList = Annotated[
list[str],
functional_validators.BeforeValidator(to_str_list),
@@ -1027,3 +1064,35 @@ def _render_widget( # noqa: C901
elements.append(error_div)
return htm.div[elements] if (len(elements) > 1) else elements[0]
+
+
+def _validate_relpath_string(path_str: str) -> pathlib.PurePosixPath:
+ if "\0" in path_str:
+ raise ValueError("Path cannot contain null bytes")
+
+ path_str = unicodedata.normalize("NFC", path_str)
+
+ if "\\" in path_str:
+ raise ValueError("Path cannot contain backslashes")
+
+ # PurePosixPath normalises empty components
+ # Therefore, we must do this check on the path string
+ if "//" in path_str:
+ raise ValueError("Path cannot contain //")
+
+ # Check for absolute paths using both POSIX and Windows semantics
+ # We don't support Windows paths, but we want to detect all bad inputs
+ # PurePosixPath doesn't recognise Windows drive letters as absolute
+ # PureWindowsPath treats leading "/" differently
+ posix_path = pathlib.PurePosixPath(path_str)
+ windows_path = pathlib.PureWindowsPath(path_str)
+ if posix_path.is_absolute() or windows_path.is_absolute():
+ raise ValueError("Absolute paths are not allowed")
+
+ for part in posix_path.parts:
+ if part == "..":
+ raise ValueError("Parent directory references (..) are not
allowed")
+ if part == ".":
+ raise ValueError("Self directory references (.) are not allowed")
+
+ return posix_path
diff --git a/atr/get/docs.py b/atr/get/docs.py
index b700433..dfaab4a 100644
--- a/atr/get/docs.py
+++ b/atr/get/docs.py
@@ -25,6 +25,7 @@ import quart
import atr.blueprints.get as get
import atr.config as config
+import atr.form as form
import atr.template as template
import atr.web as web
@@ -55,7 +56,10 @@ async def index(session: web.Committer | None) -> str:
@get.public("/docs/<path:page>")
async def page(session: web.Committer | None, page: str) -> str:
- return await _serve_docs_page(page)
+ validated_page = form.to_relpath(page)
+ if validated_page is None:
+ quart.abort(400)
+ return await _serve_docs_page(str(validated_page))
async def _serve_docs_page(page: str) -> str:
diff --git a/atr/get/download.py b/atr/get/download.py
index f9c6fa9..814fc70 100644
--- a/atr/get/download.py
+++ b/atr/get/download.py
@@ -27,6 +27,7 @@ import zipstream
import atr.blueprints.get as get
import atr.config as config
import atr.db as db
+import atr.form as form
import atr.htm as htm
import atr.mapping as mapping
import atr.models.sql as sql
@@ -141,20 +142,23 @@ async def _download_or_list(project_name: str,
version_name: str, file_path: str
# await session.check_access(project_name)
- # Check that path is relative
- original_path = pathlib.Path(file_path)
- if (file_path != ".") and (not
original_path.is_relative_to(original_path.anchor)):
- raise web.FlashError("Path must be relative")
+ # Validate the path, and allow "." for root directory
+ if file_path == ".":
+ validated_path = pathlib.Path(".")
+ else:
+ validated_path = form.to_relpath(file_path)
+ if validated_path is None:
+ raise base.ASFQuartException("Invalid file path", errorcode=400)
# We allow downloading files from any phase
async with db.session() as data:
release = await data.release(project_name=project_name,
version=version_name).demand(
base.ASFQuartException("Release does not exist", errorcode=404)
)
- full_path = util.release_directory(release) / file_path
+ full_path = util.release_directory(release) / validated_path
if await aiofiles.os.path.isdir(full_path):
- return await _list(original_path, full_path, project_name,
version_name, file_path)
+ return await _list(validated_path, full_path, project_name,
version_name, str(validated_path))
# Check that the path is a regular file
if not await aiofiles.os.path.isfile(full_path):
@@ -166,7 +170,7 @@ async def _download_or_list(project_name: str,
version_name: str, file_path: str
# Send the file with original filename
return await quart.send_file(
- full_path, as_attachment=True, attachment_filename=original_path.name,
mimetype="application/octet-stream"
+ full_path, as_attachment=True,
attachment_filename=validated_path.name, mimetype="application/octet-stream"
)
diff --git a/atr/get/draft.py b/atr/get/draft.py
index 06431db..763584a 100644
--- a/atr/get/draft.py
+++ b/atr/get/draft.py
@@ -18,7 +18,6 @@
from __future__ import annotations
import datetime
-import pathlib
import aiofiles.os
import asfquart.base as base
@@ -37,8 +36,12 @@ async def tools(session: web.Committer, project_name: str,
version_name: str, fi
"""Show the tools for a specific file."""
await session.check_access(project_name)
+ validated_path = form.to_relpath(file_path)
+ if validated_path is None:
+ raise base.ASFQuartException("Invalid file path", errorcode=400)
+
release = await session.release(project_name, version_name)
- full_path = str(util.release_directory(release) / file_path)
+ full_path = str(util.release_directory(release) / validated_path)
# Check that the file exists
if not await aiofiles.os.path.exists(full_path):
@@ -47,14 +50,16 @@ async def tools(session: web.Committer, project_name: str,
version_name: str, fi
modified = int(await aiofiles.os.path.getmtime(full_path))
file_size = await aiofiles.os.path.getsize(full_path)
+ validated_path_str = str(validated_path)
+
file_data = {
- "filename": pathlib.Path(file_path).name,
+ "filename": validated_path.name,
"bytes_size": file_size,
"uploaded": datetime.datetime.fromtimestamp(modified, tz=datetime.UTC),
}
hashgen_action = util.as_url(
- post.draft.hashgen, project_name=project_name,
version_name=version_name, file_path=file_path
+ post.draft.hashgen, project_name=project_name,
version_name=version_name, file_path=validated_path_str
)
sha512_form = form.render(
model_cls=shared.draft.HashGen,
@@ -66,7 +71,7 @@ async def tools(session: web.Committer, project_name: str,
version_name: str, fi
sbom_form = form.render(
model_cls=form.Empty,
action=util.as_url(
- post.draft.sbomgen, project_name=project_name,
version_name=version_name, file_path=file_path
+ post.draft.sbomgen, project_name=project_name,
version_name=version_name, file_path=validated_path_str
),
submit_label="Generate CycloneDX SBOM (.cdx.json)",
submit_classes="btn-outline-secondary",
@@ -78,7 +83,7 @@ async def tools(session: web.Committer, project_name: str,
version_name: str, fi
asf_id=session.uid,
project_name=project_name,
version_name=version_name,
- file_path=file_path,
+ file_path=validated_path_str,
file_data=file_data,
release=release,
format_file_size=util.format_file_size,
diff --git a/atr/get/file.py b/atr/get/file.py
index 2018af9..225f892 100644
--- a/atr/get/file.py
+++ b/atr/get/file.py
@@ -18,6 +18,7 @@
from typing import Literal
import atr.blueprints.get as get
+import atr.form as form
import atr.get.compose as compose
import atr.get.finish as finish
import atr.get.vote as vote
@@ -128,9 +129,13 @@ async def selected_path(session: web.Committer,
project_name: str, version_name:
"""View the content of a specific file in a release (any phase)."""
await session.check_access(project_name)
+ validated_path = form.to_relpath(file_path)
+ if validated_path is None:
+ raise web.FlashError("Invalid file path")
+
release = await session.release(project_name, version_name, phase=None)
_max_view_size = 512 * 1024
- full_path = util.release_directory(release) / file_path
+ full_path = util.release_directory(release) / validated_path
content_listing = await util.archive_listing(full_path)
content, is_text, is_truncated, error_message = await
util.read_file_for_viewer(full_path, _max_view_size)
@@ -141,7 +146,7 @@ async def selected_path(session: web.Committer,
project_name: str, version_name:
block.a(href=back_url, class_="atr-back-link")[f"← Back to {phase_name}
files"]
block.div(".p-3.mt-4.mb-4.bg-light.border.rounded")[
- htm.h2(".mt-0")[f"Viewing file: {file_path}"],
+ htm.h2(".mt-0")[f"Viewing file: {validated_path}"],
htm.p(".mb-0")[htm.strong["Release:"], " ", release.name],
]
@@ -166,7 +171,7 @@ async def selected_path(session: web.Committer,
project_name: str, version_name:
block.div(".alert.alert-secondary")["No content available for this
file."]
return await template.blank(
- f"View
{release.project.short_display_name}/{release.version}/{file_path}",
content=block.collect()
+ f"View
{release.project.short_display_name}/{release.version}/{validated_path}",
content=block.collect()
)
diff --git a/atr/get/published.py b/atr/get/published.py
index eb5621f..2213b5d 100644
--- a/atr/get/published.py
+++ b/atr/get/published.py
@@ -23,6 +23,7 @@ import aiofiles.os
import quart
import atr.blueprints.get as get
+import atr.form as form
import atr.htm as htm
import atr.util as util
import atr.web as web
@@ -34,7 +35,10 @@ async def path(session: web.Committer, path: str) ->
web.QuartResponse:
# This route is for debugging
# When developing locally, there is no proxy to view the downloads
directory
# Therefore this path acts as a way to check the contents of that directory
- return await _path(session, path)
+ validated_path = form.to_relpath(path)
+ if validated_path is None:
+ return quart.abort(400)
+ return await _path(session, str(validated_path))
@get.committer("/published/")
diff --git a/atr/get/ref.py b/atr/get/ref.py
index 807b050..527c6b7 100644
--- a/atr/get/ref.py
+++ b/atr/get/ref.py
@@ -22,6 +22,7 @@ import quart
import atr.blueprints.get as get
import atr.config as config
+import atr.form as form
import atr.web as web
# Perhaps GitHub will get around to implementing symbol permalinks:
@@ -35,13 +36,7 @@ async def resolve(session: web.Committer | None, ref_path:
str) -> web.WerkzeugR
if ":" in ref_path:
file_path_str, symbol = ref_path.rsplit(":", 1)
- file_path = project_root / file_path_str
-
- try:
- resolved_file = file_path.resolve()
- resolved_file.relative_to(project_root)
- except (FileNotFoundError, ValueError):
- quart.abort(404)
+ resolved_file, validated_path_str =
_validate_and_resolve_path(file_path_str, project_root)
if (not resolved_file.exists()) or (not resolved_file.is_file()):
quart.abort(404)
@@ -51,19 +46,12 @@ async def resolve(session: web.Committer | None, ref_path:
str) -> web.WerkzeugR
if line_number is None:
quart.abort(404)
- github_url =
f"https://github.com/apache/tooling-trusted-releases/blob/main/{file_path_str}#L{line_number}"
+ github_url =
f"https://github.com/apache/tooling-trusted-releases/blob/main/{validated_path_str}#L{line_number}"
return quart.redirect(github_url, code=303)
is_directory = ref_path.endswith("/")
path_str = ref_path.rstrip("/")
-
- file_path = project_root / path_str
-
- try:
- resolved_path = file_path.resolve()
- resolved_path.relative_to(project_root)
- except (FileNotFoundError, ValueError):
- quart.abort(404)
+ resolved_path, validated_path_str = _validate_and_resolve_path(path_str,
project_root)
if not resolved_path.exists():
quart.abort(404)
@@ -71,11 +59,11 @@ async def resolve(session: web.Committer | None, ref_path:
str) -> web.WerkzeugR
if is_directory:
if not resolved_path.is_dir():
quart.abort(404)
- github_url =
f"https://github.com/apache/tooling-trusted-releases/tree/main/{path_str}"
+ github_url =
f"https://github.com/apache/tooling-trusted-releases/tree/main/{validated_path_str}"
else:
if not resolved_path.is_file():
quart.abort(404)
- github_url =
f"https://github.com/apache/tooling-trusted-releases/blob/main/{path_str}"
+ github_url =
f"https://github.com/apache/tooling-trusted-releases/blob/main/{validated_path_str}"
return quart.redirect(github_url, code=303)
@@ -100,3 +88,20 @@ async def _resolve_symbol_to_line(file_path: pathlib.Path,
symbol: str) -> int |
return node.lineno
return None
+
+
+def _validate_and_resolve_path(path_str: str, project_root: pathlib.Path) ->
tuple[pathlib.Path, str]:
+ validated_path = form.to_relpath(path_str)
+ if validated_path is None:
+ quart.abort(400)
+ validated_path_str = str(validated_path)
+
+ file_path = project_root / validated_path
+
+ try:
+ resolved_path = file_path.resolve()
+ resolved_path.relative_to(project_root)
+ except (FileNotFoundError, ValueError):
+ quart.abort(404)
+
+ return resolved_path, validated_path_str
diff --git a/atr/get/report.py b/atr/get/report.py
index 4d4a68e..6559b93 100644
--- a/atr/get/report.py
+++ b/atr/get/report.py
@@ -16,12 +16,12 @@
# under the License.
import datetime
-import pathlib
import aiofiles.os
import asfquart.base as base
import atr.blueprints.get as get
+import atr.form as form
import atr.models.sql as sql
import atr.storage as storage
import atr.template as template
@@ -34,6 +34,10 @@ async def selected_path(session: web.Committer,
project_name: str, version_name:
"""Show the report for a specific file."""
await session.check_access(project_name)
+ validated_path = form.to_relpath(rel_path)
+ if validated_path is None:
+ raise base.ASFQuartException("Invalid file path", errorcode=400)
+
# If the draft is not found, we try to get the release candidate
try:
release = await session.release(project_name, version_name,
with_committee=True)
@@ -46,7 +50,7 @@ async def selected_path(session: web.Committer, project_name:
str, version_name:
raise base.ASFQuartException("Release has no committee", errorcode=500)
# TODO: When we do more than one thing in a dir, we should use the
revision directory directly
- abs_path = util.release_directory(release) / rel_path
+ abs_path = util.release_directory(release) / validated_path
if release.latest_revision_number is None:
raise base.ASFQuartException("Release has no revision", errorcode=500)
@@ -60,10 +64,10 @@ async def selected_path(session: web.Committer,
project_name: str, version_name:
# Get all check results for this file
async with storage.read() as read:
ragp = read.as_general_public()
- check_results = await ragp.checks.by_release_path(release,
pathlib.Path(rel_path))
+ check_results = await ragp.checks.by_release_path(release,
validated_path)
file_data = {
- "filename": pathlib.Path(rel_path).name,
+ "filename": validated_path.name,
"bytes_size": file_size,
"uploaded": datetime.datetime.fromtimestamp(modified, tz=datetime.UTC),
}
@@ -72,7 +76,7 @@ async def selected_path(session: web.Committer, project_name:
str, version_name:
"report-selected-path.html",
project_name=project_name,
version_name=version_name,
- rel_path=rel_path,
+ rel_path=str(validated_path),
package=file_data,
release=release,
primary_results=check_results.primary_results_list,
diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index d60ca1a..783f238 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -44,6 +44,11 @@ if TYPE_CHECKING:
async def report(session: web.Committer, project: str, version: str,
file_path: str) -> str:
await session.check_access(project)
+ validated_path = form.to_relpath(file_path)
+ if validated_path is None:
+ raise base.ASFQuartException("Invalid file path", errorcode=400)
+ validated_path_str = str(validated_path)
+
# If the draft is not found, we try to get the release candidate
try:
release = await session.release(project, version, with_committee=True)
@@ -60,7 +65,7 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
version_name=version,
revision_number=release.latest_revision_number,
task_type=sql.TaskType.SBOM_TOOL_SCORE,
- primary_rel_path=file_path,
+ primary_rel_path=validated_path_str,
)
.order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
.all()
@@ -70,7 +75,7 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
project_name=project,
version_name=version,
task_type=sql.TaskType.SBOM_AUGMENT,
- primary_rel_path=file_path,
+ primary_rel_path=validated_path_str,
)
.order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
.all()
@@ -81,7 +86,7 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
project_name=project,
version_name=version,
task_type=sql.TaskType.SBOM_OSV_SCAN,
- primary_rel_path=file_path,
+ primary_rel_path=validated_path_str,
revision_number=release.latest_revision_number,
)
.order_by(sql.sqlmodel.desc(via(sql.Task.added)))
@@ -123,7 +128,7 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
vulnerabilities = []
_vulnerability_scan_section(
- block, project, version, file_path, task_result, vulnerabilities,
osv_tasks, is_release_candidate
+ block, project, version, validated_path_str, task_result,
vulnerabilities, osv_tasks, is_release_candidate
)
_outdated_tool_section(block, task_result)
diff --git a/atr/post/draft.py b/atr/post/draft.py
index 8ddefac..9060b73 100644
--- a/atr/post/draft.py
+++ b/atr/post/draft.py
@@ -18,7 +18,6 @@
from __future__ import annotations
import datetime
-import pathlib
import aiofiles.os
import aioshutil
@@ -79,7 +78,10 @@ async def delete_file(
"""Delete a specific file from the release candidate, creating a new
revision."""
await session.check_access(project_name)
- rel_path_to_delete = pathlib.Path(str(delete_file_form.file_path))
+ rel_path_to_delete = delete_file_form.file_path
+ if rel_path_to_delete is None:
+ await quart.flash("No file path specified", "error")
+ return await session.redirect(get.compose.selected,
project_name=project_name, version_name=version_name)
try:
async with storage.write(session) as write:
@@ -133,7 +135,10 @@ async def hashgen(session: web.Committer, project_name:
str, version_name: str,
"""Generate an sha512 hash file for a candidate draft file, creating a new
revision."""
await session.check_access(project_name)
- rel_path = pathlib.Path(file_path)
+ rel_path = form.to_relpath(file_path)
+ if rel_path is None:
+ await quart.flash("Invalid file path", "error")
+ return await session.redirect(get.compose.selected,
project_name=project_name, version_name=version_name)
try:
async with storage.write(session) as write:
@@ -159,7 +164,10 @@ async def sbomgen(session: web.Committer, project_name:
str, version_name: str,
"""Generate a CycloneDX SBOM file for a candidate draft file, creating a
new revision."""
await session.check_access(project_name)
- rel_path = pathlib.Path(file_path)
+ rel_path = form.to_relpath(file_path)
+ if rel_path is None:
+ await quart.flash("Invalid file path", "error")
+ return await session.redirect(get.compose.selected,
project_name=project_name, version_name=version_name)
# Check that the file is a .tar.gz archive before creating a revision
if not (file_path.endswith(".tar.gz") or file_path.endswith(".tgz") or
file_path.endswith(".zip")):
diff --git a/atr/post/finish.py b/atr/post/finish.py
index 7df9a76..997f9ca 100644
--- a/atr/post/finish.py
+++ b/atr/post/finish.py
@@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-import pathlib
-
import quart
import atr.blueprints.post as post
@@ -50,7 +48,9 @@ async def _delete_empty_directory(
version_name: str,
respond: shared.finish.Respond,
) -> tuple[web.QuartResponse, int] | web.WerkzeugResponse:
- dir_to_delete_rel = pathlib.Path(delete_form.directory_to_delete)
+ dir_to_delete_rel = delete_form.directory_to_delete
+ if dir_to_delete_rel is None:
+ return await respond(400, "No directory specified.")
try:
async with storage.write(session) as write:
wacp = await write.as_project_committee_member(project_name)
@@ -71,8 +71,10 @@ async def _move_file_to_revision(
version_name: str,
respond: shared.finish.Respond,
) -> tuple[web.QuartResponse, int] | web.WerkzeugResponse:
- source_files_rel = [pathlib.Path(sf) for sf in move_form.source_files]
- target_dir_rel = pathlib.Path(move_form.target_directory)
+ source_files_rel = move_form.source_files
+ target_dir_rel = move_form.target_directory
+ if target_dir_rel is None:
+ return await respond(400, "No target directory specified.")
try:
async with storage.write(session) as write:
wacp = await write.as_project_committee_member(project_name)
diff --git a/atr/post/sbom.py b/atr/post/sbom.py
index 65f7c0e..09c779e 100644
--- a/atr/post/sbom.py
+++ b/atr/post/sbom.py
@@ -17,13 +17,14 @@
from __future__ import annotations
-import pathlib
+from typing import TYPE_CHECKING
import asfquart.base as base
import quart
import atr.blueprints.post as post
import atr.db as db
+import atr.form as form
import atr.get as get
import atr.log as log
import atr.shared as shared
@@ -31,6 +32,9 @@ import atr.storage as storage
import atr.util as util
import atr.web as web
+if TYPE_CHECKING:
+ import pathlib
+
@post.committer("/sbom/report/<project>/<version>/<path:file_path>")
@post.form(shared.sbom.SBOMForm)
@@ -39,22 +43,24 @@ async def report(
) -> web.WerkzeugResponse:
await session.check_access(project)
+ validated_path = form.to_relpath(file_path)
+ if validated_path is None:
+ raise base.ASFQuartException("Invalid file path", errorcode=400)
+
match sbom_form:
case shared.sbom.AugmentSBOMForm():
- return await _augment(session, project, version, file_path)
+ return await _augment(session, project, version, validated_path)
case shared.sbom.ScanSBOMForm():
- return await _scan(session, project, version, file_path)
+ return await _scan(session, project, version, validated_path)
async def _augment(
- session: web.Committer, project_name: str, version_name: str, file_path:
str
+ session: web.Committer, project_name: str, version_name: str, rel_path:
pathlib.Path
) -> web.WerkzeugResponse:
"""Augment a CycloneDX SBOM file."""
- rel_path = pathlib.Path(file_path)
-
# Check that the file is a .cdx.json archive before creating a revision
- if not (file_path.endswith(".cdx.json")):
+ if not (rel_path.name.endswith(".cdx.json")):
raise base.ASFQuartException("SBOM augmentation is only supported for
.cdx.json files", errorcode=400)
try:
@@ -93,11 +99,11 @@ async def _augment(
)
-async def _scan(session: web.Committer, project_name: str, version_name: str,
file_path: str) -> web.WerkzeugResponse:
+async def _scan(
+ session: web.Committer, project_name: str, version_name: str, rel_path:
pathlib.Path
+) -> web.WerkzeugResponse:
"""Scan a CycloneDX SBOM file for vulnerabilities using OSV."""
- rel_path = pathlib.Path(file_path)
-
- if not (file_path.endswith(".cdx.json")):
+ if not (rel_path.name.endswith(".cdx.json")):
raise base.ASFQuartException("OSV scanning is only supported for
.cdx.json files", errorcode=400)
try:
diff --git a/atr/post/upload.py b/atr/post/upload.py
index 3a0505c..a3565ca 100644
--- a/atr/post/upload.py
+++ b/atr/post/upload.py
@@ -28,6 +28,7 @@ import werkzeug.wrappers.response as response
import atr.blueprints.post as post
import atr.db as db
+import atr.form as form
import atr.get as get
import atr.log as log
import atr.shared as shared
@@ -118,9 +119,12 @@ async def stage(
if (not file) or (not file.filename):
return _json_error("No file provided", 400)
- filename = pathlib.Path(file.filename).name
- if (not filename) or (filename in (".", "..")):
+ # Extract basename and validate
+ basename = pathlib.Path(file.filename).name
+ validated_filename = form.to_filename(basename)
+ if validated_filename is None:
return _json_error("Invalid filename", 400)
+ filename = str(validated_filename)
await aiofiles.os.makedirs(staging_dir, exist_ok=True)
diff --git a/atr/server.py b/atr/server.py
index 88962c2..2352660 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -354,6 +354,8 @@ def _app_setup_logging(app: base.QuartApp, config_mode:
config.Mode, app_config:
def _create_app(app_config: type[config.AppConfig]) -> base.QuartApp:
"""Create and configure the application."""
+ if os.sep != "/":
+ raise RuntimeError('ATR requires a POSIX compatible filesystem where
os.sep is "/"')
config_mode = config.get_mode()
_app_dirs_setup(app_config)
log.performance_init()
diff --git a/atr/shared/draft.py b/atr/shared/draft.py
index e6bce01..a52a9bf 100644
--- a/atr/shared/draft.py
+++ b/atr/shared/draft.py
@@ -19,7 +19,7 @@ import atr.form as form
class DeleteFileForm(form.Form):
- file_path: str = form.label("File path", widget=form.Widget.HIDDEN)
+ file_path: form.RelPath = form.label("File path",
widget=form.Widget.HIDDEN)
class HashGen(form.Empty):
diff --git a/atr/shared/finish.py b/atr/shared/finish.py
index 70cf00f..4f5501a 100644
--- a/atr/shared/finish.py
+++ b/atr/shared/finish.py
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-import pathlib
from collections.abc import Awaitable, Callable
from typing import Annotated, Literal
@@ -33,23 +32,24 @@ type REMOVE_RC_TAGS = Literal["REMOVE_RC_TAGS"]
class DeleteEmptyDirectoryForm(form.Form):
variant: DELETE_DIR = form.value(DELETE_DIR)
- directory_to_delete: str = form.label("Directory to delete",
widget=form.Widget.SELECT)
+ directory_to_delete: form.RelPath = form.label("Directory to delete",
widget=form.Widget.SELECT)
class MoveFileForm(form.Form):
variant: MOVE_FILE = form.value(MOVE_FILE)
- source_files: form.StrList = form.label("Files to move")
- target_directory: str = form.label("Target directory")
+ source_files: form.RelPathList = form.label("Files to move")
+ target_directory: form.RelPath = form.label("Target directory")
@pydantic.model_validator(mode="after")
def validate_move(self) -> "MoveFileForm":
if not self.source_files:
raise ValueError("Please select at least one file to move.")
- source_paths = [pathlib.Path(sf) for sf in self.source_files]
- target_dir = pathlib.Path(self.target_directory)
- for source_path in source_paths:
- if source_path.parent == target_dir:
+ if self.target_directory is None:
+ raise ValueError("Target directory is required.")
+
+ for source_path in self.source_files:
+ if source_path.parent == self.target_directory:
raise ValueError(f"Target directory cannot be the same as the
source directory for {source_path.name}.")
return self
diff --git a/atr/storage/writers/release.py b/atr/storage/writers/release.py
index 840946b..30a8669 100644
--- a/atr/storage/writers/release.py
+++ b/atr/storage/writers/release.py
@@ -23,7 +23,6 @@ import base64
import contextlib
import datetime
import hashlib
-import pathlib
from typing import TYPE_CHECKING, Final
import aiofiles.os
@@ -35,6 +34,7 @@ import sqlmodel
import atr.analysis as analysis
import atr.config as config
import atr.db as db
+import atr.form as form
import atr.log as log
import atr.models.api as api
import atr.models.sql as sql
@@ -43,6 +43,7 @@ import atr.storage.types as types
import atr.util as util
if TYPE_CHECKING:
+ import pathlib
from collections.abc import AsyncGenerator, Sequence
import werkzeug.datastructures as datastructures
@@ -406,10 +407,12 @@ class CommitteeParticipant(FoundationCommitter):
async def upload_file(self, args: api.ReleaseUploadArgs) -> sql.Revision:
file_bytes = base64.b64decode(args.content, validate=True)
- file_path = args.relpath.lstrip("/")
- description = f"Upload via API: {file_path}"
+ validated_path = form.to_relpath(args.relpath)
+ if validated_path is None:
+ raise storage.AccessError("Invalid file path")
+ description = f"Upload via API: {validated_path}"
async with self.create_and_manage_revision(args.project, args.version,
description) as creating:
- target_path = pathlib.Path(creating.interim_path) / file_path
+ target_path = creating.interim_path / validated_path
await aiofiles.os.makedirs(target_path.parent, exist_ok=True)
if target_path.exists():
raise storage.AccessError("File already exists")
@@ -442,12 +445,13 @@ class CommitteeParticipant(FoundationCommitter):
if not file_name:
if not file.filename:
raise storage.AccessError("No filename provided")
- # Use the original name
- relative_file_path = pathlib.Path(file.filename)
+ # Validate the filename from multipart upload
+ validated_path = form.to_relpath(file.filename)
+ if validated_path is None:
+ raise storage.AccessError("Invalid filename")
+ relative_file_path = validated_path
else:
- # Use the provided name, relative to its anchor
- # In other words, ignore the leading "/"
- relative_file_path =
file_name.relative_to(file_name.anchor)
+ relative_file_path = file_name
# Construct path inside the new revision directory
target_path = creating.interim_path / relative_file_path
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]