This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 9adefac Add a section to start an OSV scan in the SBOM report UI
9adefac is described below
commit 9adefac69ef4833a54541170a5cd2cae131746a6
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Oct 17 20:31:38 2025 +0100
Add a section to start an OSV scan in the SBOM report UI
---
atr/routes/sbom.py | 230 ++++++++++++++++++++++++++++++++++++++++++--
atr/storage/writers/sbom.py | 29 ++++++
2 files changed, 253 insertions(+), 6 deletions(-)
diff --git a/atr/routes/sbom.py b/atr/routes/sbom.py
index 13c654c..4fe1c75 100644
--- a/atr/routes/sbom.py
+++ b/atr/routes/sbom.py
@@ -19,7 +19,7 @@ from __future__ import annotations
import json
import pathlib
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
import asfquart.base as base
import htpy
@@ -39,6 +39,8 @@ import atr.template as template
import atr.util as util
if TYPE_CHECKING:
+ import collections.abc
+
import werkzeug.wrappers.response as response
@@ -106,6 +108,17 @@ async def report(session: route.CommitterSession, project:
str, version: str, fi
.all()
)
+ osv_tasks = (
+ await data.task(
+ project_name=project,
+ version_name=version,
+ task_type=sql.TaskType.SBOM_OSV_SCAN,
+ primary_rel_path=file_path,
+ )
+ .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
+ .all()
+ )
+
block = htm.Block()
block.h1["SBOM report"]
@@ -121,10 +134,9 @@ async def report(session: route.CommitterSession, project:
str, version: str, fi
errors =
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in
task_result.errors]
block.p[
- """This is a report by the sbomtool, for debugging and
+ """This is a report by the ATR SBOM tool, for debugging and
informational purposes. Please use it only as an approximate
- guideline to the quality of your SBOM file. It checks for NTIA 2021
- minimum data field conformance."""
+ guideline to the quality of your SBOM file."""
]
block.p["This report is for revision ",
htpy.code[task_result.revision_number], "."]
@@ -154,13 +166,16 @@ async def report(session: route.CommitterSession,
project: str, version: str, fi
_missing_table(block, errors)
if not (warnings or errors):
- block.h2["Results"]
+ block.h2["Conformance report"]
block.p["No NTIA 2021 minimum data field conformance warnings or
errors found."]
+ block.h2["Vulnerability scan"]
+ _vulnerability_scan_section(block, project, version, file_path,
task_result.revision_number, osv_tasks, empty_form)
+
+ block.h2["Outdated tool"]
outdated = None
if task_result.outdated:
outdated =
sbom.models.maven.OutdatedAdapter.validate_python(json.loads(task_result.outdated))
- block.h2["Outdated tool"]
if outdated:
if outdated.kind == "tool":
block.p[
@@ -186,6 +201,65 @@ async def report(session: route.CommitterSession, project:
str, version: str, fi
return await template.blank("SBOM report", content=block.collect())
[email protected]("/sbom/scan/<project_name>/<version_name>/<path:file_path>",
methods=["POST"])
+async def scan(
+ session: route.CommitterSession, project_name: str, version_name: str,
file_path: str
+) -> response.Response:
+ """Scan a CycloneDX SBOM file for vulnerabilities using OSV."""
+ await session.check_access(project_name)
+
+ await util.validate_empty_form()
+ rel_path = pathlib.Path(file_path)
+
+ if not (file_path.endswith(".cdx.json")):
+ raise base.ASFQuartException("OSV scanning is only supported for
.cdx.json files", errorcode=400)
+
+ try:
+ async with db.session() as data:
+ release = await data.release(project_name=project_name,
version=version_name).demand(
+ RuntimeError("Release does not exist for OSV scan")
+ )
+ revision_number = release.latest_revision_number
+ if revision_number is None:
+ raise RuntimeError("No revision number found for OSV scan")
+ log.info(f"Starting OSV scan for {project_name} {version_name}
{revision_number} {rel_path}")
+ async with storage.write_as_project_committee_member(project_name) as
wacm:
+ sbom_task = await wacm.sbom.osv_scan_cyclonedx(project_name,
version_name, revision_number, rel_path)
+
+ except Exception as e:
+ log.exception("Error starting OSV scan:")
+ await quart.flash(f"Error starting OSV scan: {e!s}", "error")
+ return await session.redirect(
+ report,
+ project=project_name,
+ version=version_name,
+ file_path=str(rel_path),
+ )
+
+ return await session.redirect(
+ report,
+ success=f"OSV vulnerability scan queued for {rel_path.name} (task ID:
{util.unwrap(sbom_task.id)})",
+ project=project_name,
+ version=version_name,
+ file_path=str(rel_path),
+ )
+
+
+def _extract_vulnerability_severity(vuln: dict[str, Any]) -> str:
+ """Extract severity information from vulnerability data."""
+ db_specific = vuln.get("database_specific", {})
+ if "severity" in db_specific:
+ return db_specific["severity"]
+
+ severity_data = vuln.get("severity", [])
+ if severity_data and isinstance(severity_data, list):
+ first_severity = severity_data[0]
+ if isinstance(first_severity, dict) and ("type" in first_severity):
+ return first_severity["type"]
+
+ return "Unknown"
+
+
def _missing_table(block: htm.Block, items:
list[sbom.models.conformance.Missing]) -> None:
warning_rows = [
htpy.tr[
@@ -210,3 +284,147 @@ def _missing_tally(items:
list[sbom.models.conformance.Missing]) -> list[tuple[s
[(kind, prop, count) for (kind, prop), count in counts.items()],
key=lambda kv: (kv[0], kv[1]),
)
+
+
+def _vulnerability_component_details(block: htm.Block, component:
results.OSVComponent) -> None:
+ details_content = []
+ summary_element = htpy.summary[
+
htpy.span(".badge.bg-danger.me-2.font-monospace")[str(len(component.vulnerabilities))],
+ htpy.strong[component.purl],
+ ]
+ details_content.append(summary_element)
+
+ for vuln in component.vulnerabilities:
+ vuln_id = vuln.get("id", "Unknown")
+ vuln_summary = vuln.get("summary", "No summary available")
+ vuln_modified = vuln.get("modified", "Unknown")
+ vuln_severity = _extract_vulnerability_severity(vuln)
+
+ vuln_header = [htpy.strong(".me-2")[vuln_id]]
+ if vuln_severity != "Unknown":
+
vuln_header.append(htpy.span(".badge.bg-warning.text-dark")[vuln_severity])
+
+ vuln_div =
htpy.div(".ms-3.mb-3.border-start.border-warning.border-3.ps-3")[
+ htpy.div(".d-flex.align-items-center.mb-2")[*vuln_header],
+ htpy.p(".mb-1")[vuln_summary],
+ htpy.div(".text-muted.small")[
+ "Last modified: ",
+ vuln_modified,
+ ],
+ htpy.div(".mt-2.text-muted")[vuln.get("details", "No additional
details available.")],
+ ]
+ details_content.append(vuln_div)
+
+ block.append(htpy.details(".mb-3.rounded")[*details_content])
+
+
+def _vulnerability_scan_button(
+ block: htm.Block, project: str, version: str, file_path: str, empty_form:
forms.Empty
+) -> None:
+ block.p["No vulnerability scan has been performed for this revision."]
+
+ action = util.as_url(
+ scan,
+ project_name=project,
+ version_name=version,
+ file_path=file_path,
+ )
+ block.append(
+ htpy.form("", action=action, method="post")[
+ markupsafe.Markup(str(empty_form.hidden_tag())),
+ htpy.button(".btn.btn-primary", type="submit")["Scan file"],
+ ]
+ )
+
+
+def _vulnerability_scan_find_completed_task(
+ osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
+) -> sql.Task | None:
+ """Find the most recent completed OSV scan task for the given revision."""
+ for task in osv_tasks:
+ if task.status == sql.TaskStatus.COMPLETED and (task.result is not
None):
+ task_result = task.result
+ if isinstance(task_result, results.SBOMOSVScan) and
task_result.revision_number == revision_number:
+ return task
+ return None
+
+
+def _vulnerability_scan_find_in_progress_task(
+ osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
+) -> sql.Task | None:
+ """Find the most recent in-progress OSV scan task for the given
revision."""
+ for task in osv_tasks:
+ if task.revision_number == revision_number:
+ if task.status in (sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE,
sql.TaskStatus.FAILED):
+ return task
+ return None
+
+
+def _vulnerability_scan_results(block: htm.Block, task: sql.Task) -> None:
+ task_result = task.result
+ if not isinstance(task_result, results.SBOMOSVScan):
+ block.p["Invalid scan result format."]
+ return
+
+ components = task_result.components
+ ignored_count = task_result.ignored_count
+
+ if not components:
+ block.p["No vulnerabilities found."]
+ if ignored_count > 0:
+ component_word = "component" if (ignored_count == 1) else
"components"
+ block.p[f"{ignored_count} {component_word} were ignored due to
missing PURL or version information."]
+ return
+
+ block.p[f"Found vulnerabilities in {len(components)} components:"]
+
+ for component in components:
+ _vulnerability_component_details(block, component)
+
+ if ignored_count > 0:
+ component_word = "component" if (ignored_count == 1) else "components"
+ block.p[f"{ignored_count} {component_word} were ignored due to missing
PURL or version information."]
+
+
+def _vulnerability_scan_section(
+ block: htm.Block,
+ project: str,
+ version: str,
+ file_path: str,
+ revision_number: str,
+ osv_tasks: collections.abc.Sequence[sql.Task],
+ empty_form: forms.Empty,
+) -> None:
+ """Display the vulnerability scan section based on task status."""
+ completed_task = _vulnerability_scan_find_completed_task(osv_tasks,
revision_number)
+
+ if completed_task is not None:
+ _vulnerability_scan_results(block, completed_task)
+ return
+
+ in_progress_task = _vulnerability_scan_find_in_progress_task(osv_tasks,
revision_number)
+
+ if in_progress_task is not None:
+ _vulnerability_scan_status(block, in_progress_task, project, version,
file_path, empty_form)
+ else:
+ _vulnerability_scan_button(block, project, version, file_path,
empty_form)
+
+
+def _vulnerability_scan_status(
+ block: htm.Block,
+ task: sql.Task,
+ project: str,
+ version: str,
+ file_path: str,
+ empty_form: forms.Empty,
+) -> None:
+ status_text = task.status.value.replace("_", " ").capitalize()
+ block.p[f"Vulnerability scan is currently {status_text.lower()}."]
+ block.p["Task ID: ", htpy.code[str(task.id)]]
+ if (task.status == sql.TaskStatus.FAILED) and (task.error is not None):
+ block.p[
+ "Task reported an error: ",
+ htpy.code[task.error],
+ ". Additional details are unavailable from ATR.",
+ ]
+ _vulnerability_scan_button(block, project, version, file_path,
empty_form)
diff --git a/atr/storage/writers/sbom.py b/atr/storage/writers/sbom.py
index 951e05c..12d35d2 100644
--- a/atr/storage/writers/sbom.py
+++ b/atr/storage/writers/sbom.py
@@ -143,6 +143,35 @@ class CommitteeParticipant(FoundationCommitter):
# Wait 100ms before checking again
await asyncio.sleep(0.1)
+ async def osv_scan_cyclonedx(
+ self,
+ project_name: str,
+ version_name: str,
+ revision_number: str,
+ rel_path: pathlib.Path,
+ ) -> sql.Task:
+ sbom_task = sql.Task(
+ task_type=sql.TaskType.SBOM_OSV_SCAN,
+ task_args=sbom.FileArgs(
+ project_name=project_name,
+ version_name=version_name,
+ revision_number=revision_number,
+ file_path=str(rel_path),
+ asf_uid=util.unwrap(self.__asf_uid),
+ ).model_dump(),
+ asf_uid=util.unwrap(self.__asf_uid),
+ added=datetime.datetime.now(datetime.UTC),
+ status=sql.TaskStatus.QUEUED,
+ project_name=project_name,
+ version_name=version_name,
+ revision_number=revision_number,
+ primary_rel_path=str(rel_path),
+ )
+ self.__data.add(sbom_task)
+ await self.__data.commit()
+ await self.__data.refresh(sbom_task)
+ return sbom_task
+
class CommitteeMember(CommitteeParticipant):
def __init__(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]