This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new 9adefac  Add a section to start an OSV scan in the SBOM report UI
9adefac is described below

commit 9adefac69ef4833a54541170a5cd2cae131746a6
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Oct 17 20:31:38 2025 +0100

    Add a section to start an OSV scan in the SBOM report UI
---
 atr/routes/sbom.py          | 230 ++++++++++++++++++++++++++++++++++++++++++--
 atr/storage/writers/sbom.py |  29 ++++++
 2 files changed, 253 insertions(+), 6 deletions(-)

diff --git a/atr/routes/sbom.py b/atr/routes/sbom.py
index 13c654c..4fe1c75 100644
--- a/atr/routes/sbom.py
+++ b/atr/routes/sbom.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 import json
 import pathlib
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 import asfquart.base as base
 import htpy
@@ -39,6 +39,8 @@ import atr.template as template
 import atr.util as util
 
 if TYPE_CHECKING:
+    import collections.abc
+
     import werkzeug.wrappers.response as response
 
 
@@ -106,6 +108,17 @@ async def report(session: route.CommitterSession, project: 
str, version: str, fi
             .all()
         )
 
+        osv_tasks = (
+            await data.task(
+                project_name=project,
+                version_name=version,
+                task_type=sql.TaskType.SBOM_OSV_SCAN,
+                primary_rel_path=file_path,
+            )
+            .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
+            .all()
+        )
+
     block = htm.Block()
     block.h1["SBOM report"]
 
@@ -121,10 +134,9 @@ async def report(session: route.CommitterSession, project: 
str, version: str, fi
     errors = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in 
task_result.errors]
 
     block.p[
-        """This is a report by the sbomtool, for debugging and
+        """This is a report by the ATR SBOM tool, for debugging and
         informational purposes. Please use it only as an approximate
-        guideline to the quality of your SBOM file. It checks for NTIA 2021
-        minimum data field conformance."""
+        guideline to the quality of your SBOM file."""
     ]
     block.p["This report is for revision ", 
htpy.code[task_result.revision_number], "."]
 
@@ -154,13 +166,16 @@ async def report(session: route.CommitterSession, 
project: str, version: str, fi
         _missing_table(block, errors)
 
     if not (warnings or errors):
-        block.h2["Results"]
+        block.h2["Conformance report"]
         block.p["No NTIA 2021 minimum data field conformance warnings or 
errors found."]
 
+    block.h2["Vulnerability scan"]
+    _vulnerability_scan_section(block, project, version, file_path, 
task_result.revision_number, osv_tasks, empty_form)
+
+    block.h2["Outdated tool"]
     outdated = None
     if task_result.outdated:
         outdated = 
sbom.models.maven.OutdatedAdapter.validate_python(json.loads(task_result.outdated))
-    block.h2["Outdated tool"]
     if outdated:
         if outdated.kind == "tool":
             block.p[
@@ -186,6 +201,65 @@ async def report(session: route.CommitterSession, project: 
str, version: str, fi
     return await template.blank("SBOM report", content=block.collect())
 
 
[email protected]("/sbom/scan/<project_name>/<version_name>/<path:file_path>", 
methods=["POST"])
+async def scan(
+    session: route.CommitterSession, project_name: str, version_name: str, 
file_path: str
+) -> response.Response:
+    """Scan a CycloneDX SBOM file for vulnerabilities using OSV."""
+    await session.check_access(project_name)
+
+    await util.validate_empty_form()
+    rel_path = pathlib.Path(file_path)
+
+    if not (file_path.endswith(".cdx.json")):
+        raise base.ASFQuartException("OSV scanning is only supported for 
.cdx.json files", errorcode=400)
+
+    try:
+        async with db.session() as data:
+            release = await data.release(project_name=project_name, 
version=version_name).demand(
+                RuntimeError("Release does not exist for OSV scan")
+            )
+            revision_number = release.latest_revision_number
+            if revision_number is None:
+                raise RuntimeError("No revision number found for OSV scan")
+            log.info(f"Starting OSV scan for {project_name} {version_name} 
{revision_number} {rel_path}")
+        async with storage.write_as_project_committee_member(project_name) as 
wacm:
+            sbom_task = await wacm.sbom.osv_scan_cyclonedx(project_name, 
version_name, revision_number, rel_path)
+
+    except Exception as e:
+        log.exception("Error starting OSV scan:")
+        await quart.flash(f"Error starting OSV scan: {e!s}", "error")
+        return await session.redirect(
+            report,
+            project=project_name,
+            version=version_name,
+            file_path=str(rel_path),
+        )
+
+    return await session.redirect(
+        report,
+        success=f"OSV vulnerability scan queued for {rel_path.name} (task ID: 
{util.unwrap(sbom_task.id)})",
+        project=project_name,
+        version=version_name,
+        file_path=str(rel_path),
+    )
+
+
+def _extract_vulnerability_severity(vuln: dict[str, Any]) -> str:
+    """Extract severity information from vulnerability data."""
+    db_specific = vuln.get("database_specific", {})
+    if "severity" in db_specific:
+        return db_specific["severity"]
+
+    severity_data = vuln.get("severity", [])
+    if severity_data and isinstance(severity_data, list):
+        first_severity = severity_data[0]
+        if isinstance(first_severity, dict) and ("type" in first_severity):
+            return first_severity["type"]
+
+    return "Unknown"
+
+
 def _missing_table(block: htm.Block, items: 
list[sbom.models.conformance.Missing]) -> None:
     warning_rows = [
         htpy.tr[
@@ -210,3 +284,147 @@ def _missing_tally(items: 
list[sbom.models.conformance.Missing]) -> list[tuple[s
         [(kind, prop, count) for (kind, prop), count in counts.items()],
         key=lambda kv: (kv[0], kv[1]),
     )
+
+
+def _vulnerability_component_details(block: htm.Block, component: 
results.OSVComponent) -> None:
+    details_content = []
+    summary_element = htpy.summary[
+        
htpy.span(".badge.bg-danger.me-2.font-monospace")[str(len(component.vulnerabilities))],
+        htpy.strong[component.purl],
+    ]
+    details_content.append(summary_element)
+
+    for vuln in component.vulnerabilities:
+        vuln_id = vuln.get("id", "Unknown")
+        vuln_summary = vuln.get("summary", "No summary available")
+        vuln_modified = vuln.get("modified", "Unknown")
+        vuln_severity = _extract_vulnerability_severity(vuln)
+
+        vuln_header = [htpy.strong(".me-2")[vuln_id]]
+        if vuln_severity != "Unknown":
+            
vuln_header.append(htpy.span(".badge.bg-warning.text-dark")[vuln_severity])
+
+        vuln_div = 
htpy.div(".ms-3.mb-3.border-start.border-warning.border-3.ps-3")[
+            htpy.div(".d-flex.align-items-center.mb-2")[*vuln_header],
+            htpy.p(".mb-1")[vuln_summary],
+            htpy.div(".text-muted.small")[
+                "Last modified: ",
+                vuln_modified,
+            ],
+            htpy.div(".mt-2.text-muted")[vuln.get("details", "No additional 
details available.")],
+        ]
+        details_content.append(vuln_div)
+
+    block.append(htpy.details(".mb-3.rounded")[*details_content])
+
+
+def _vulnerability_scan_button(
+    block: htm.Block, project: str, version: str, file_path: str, empty_form: 
forms.Empty
+) -> None:
+    block.p["No vulnerability scan has been performed for this revision."]
+
+    action = util.as_url(
+        scan,
+        project_name=project,
+        version_name=version,
+        file_path=file_path,
+    )
+    block.append(
+        htpy.form("", action=action, method="post")[
+            markupsafe.Markup(str(empty_form.hidden_tag())),
+            htpy.button(".btn.btn-primary", type="submit")["Scan file"],
+        ]
+    )
+
+
+def _vulnerability_scan_find_completed_task(
+    osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
+) -> sql.Task | None:
+    """Find the most recent completed OSV scan task for the given revision."""
+    for task in osv_tasks:
+        if task.status == sql.TaskStatus.COMPLETED and (task.result is not 
None):
+            task_result = task.result
+            if isinstance(task_result, results.SBOMOSVScan) and 
task_result.revision_number == revision_number:
+                return task
+    return None
+
+
+def _vulnerability_scan_find_in_progress_task(
+    osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
+) -> sql.Task | None:
+    """Find the most recent in-progress OSV scan task for the given 
revision."""
+    for task in osv_tasks:
+        if task.revision_number == revision_number:
+            if task.status in (sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE, 
sql.TaskStatus.FAILED):
+                return task
+    return None
+
+
+def _vulnerability_scan_results(block: htm.Block, task: sql.Task) -> None:
+    task_result = task.result
+    if not isinstance(task_result, results.SBOMOSVScan):
+        block.p["Invalid scan result format."]
+        return
+
+    components = task_result.components
+    ignored_count = task_result.ignored_count
+
+    if not components:
+        block.p["No vulnerabilities found."]
+        if ignored_count > 0:
+            component_word = "component" if (ignored_count == 1) else 
"components"
+            block.p[f"{ignored_count} {component_word} were ignored due to 
missing PURL or version information."]
+        return
+
+    block.p[f"Found vulnerabilities in {len(components)} components:"]
+
+    for component in components:
+        _vulnerability_component_details(block, component)
+
+    if ignored_count > 0:
+        component_word = "component" if (ignored_count == 1) else "components"
+        block.p[f"{ignored_count} {component_word} were ignored due to missing 
PURL or version information."]
+
+
+def _vulnerability_scan_section(
+    block: htm.Block,
+    project: str,
+    version: str,
+    file_path: str,
+    revision_number: str,
+    osv_tasks: collections.abc.Sequence[sql.Task],
+    empty_form: forms.Empty,
+) -> None:
+    """Display the vulnerability scan section based on task status."""
+    completed_task = _vulnerability_scan_find_completed_task(osv_tasks, 
revision_number)
+
+    if completed_task is not None:
+        _vulnerability_scan_results(block, completed_task)
+        return
+
+    in_progress_task = _vulnerability_scan_find_in_progress_task(osv_tasks, 
revision_number)
+
+    if in_progress_task is not None:
+        _vulnerability_scan_status(block, in_progress_task, project, version, 
file_path, empty_form)
+    else:
+        _vulnerability_scan_button(block, project, version, file_path, 
empty_form)
+
+
+def _vulnerability_scan_status(
+    block: htm.Block,
+    task: sql.Task,
+    project: str,
+    version: str,
+    file_path: str,
+    empty_form: forms.Empty,
+) -> None:
+    status_text = task.status.value.replace("_", " ").capitalize()
+    block.p[f"Vulnerability scan is currently {status_text.lower()}."]
+    block.p["Task ID: ", htpy.code[str(task.id)]]
+    if (task.status == sql.TaskStatus.FAILED) and (task.error is not None):
+        block.p[
+            "Task reported an error: ",
+            htpy.code[task.error],
+            ". Additional details are unavailable from ATR.",
+        ]
+        _vulnerability_scan_button(block, project, version, file_path, 
empty_form)
diff --git a/atr/storage/writers/sbom.py b/atr/storage/writers/sbom.py
index 951e05c..12d35d2 100644
--- a/atr/storage/writers/sbom.py
+++ b/atr/storage/writers/sbom.py
@@ -143,6 +143,35 @@ class CommitteeParticipant(FoundationCommitter):
             # Wait 100ms before checking again
             await asyncio.sleep(0.1)
 
+    async def osv_scan_cyclonedx(
+        self,
+        project_name: str,
+        version_name: str,
+        revision_number: str,
+        rel_path: pathlib.Path,
+    ) -> sql.Task:
+        sbom_task = sql.Task(
+            task_type=sql.TaskType.SBOM_OSV_SCAN,
+            task_args=sbom.FileArgs(
+                project_name=project_name,
+                version_name=version_name,
+                revision_number=revision_number,
+                file_path=str(rel_path),
+                asf_uid=util.unwrap(self.__asf_uid),
+            ).model_dump(),
+            asf_uid=util.unwrap(self.__asf_uid),
+            added=datetime.datetime.now(datetime.UTC),
+            status=sql.TaskStatus.QUEUED,
+            project_name=project_name,
+            version_name=version_name,
+            revision_number=revision_number,
+            primary_rel_path=str(rel_path),
+        )
+        self.__data.add(sbom_task)
+        await self.__data.commit()
+        await self.__data.refresh(sbom_task)
+        return sbom_task
+
 
 class CommitteeMember(CommitteeParticipant):
     def __init__(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to