This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch vulnerabilities_sbom
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 92b5e8b70b283e8cd506151fa41c2225b0f703e3
Author: Alastair McFarlane <[email protected]>
AuthorDate: Wed Dec 17 18:15:27 2025 +0000

    Saving first day's work
---
 atr/get/sbom.py        | 125 +++++++++++++++++++++++--------------
 atr/models/results.py  |   5 +-
 atr/sbom/cli.py        |  29 ++++++---
 atr/sbom/models/osv.py |  35 +++++++++--
 atr/sbom/osv.py        | 164 +++++++++++++++++++++++++++++++++++++++++++------
 atr/sbom/utilities.py  |  68 +++++++++++++++++++-
 atr/tasks/sbom.py      |  31 +++++++++-
 7 files changed, 377 insertions(+), 80 deletions(-)

diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index fb570a7..8de0bb1 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -20,7 +20,10 @@ from __future__ import annotations
 import json
 from typing import TYPE_CHECKING, Any
 
+import aiofiles
 import asfquart.base as base
+import cmarkgfm
+import markupsafe
 
 import atr.blueprints.get as get
 import atr.db as db
@@ -29,7 +32,9 @@ import atr.htm as htm
 import atr.models.results as results
 import atr.models.sql as sql
 import atr.sbom as sbom
+import atr.sbom.models.osv as osv
 import atr.shared as shared
+import atr.storage as storage
 import atr.template as template
 import atr.web as web
 
@@ -41,6 +46,7 @@ if TYPE_CHECKING:
 async def report(session: web.Committer, project: str, version: str, 
file_path: str) -> str:
     await session.check_access(project)
     await session.release(project, version)
+
     async with db.session() as data:
         via = sql.validate_instrumented_attribute
         # TODO: Abstract this code and the sbomtool.MissingAdapter validators
@@ -55,7 +61,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
             .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
             .all()
         )
-
+        # Run or running scans for the current revision
         osv_tasks = (
             await data.task(
                 project_name=project,
@@ -80,13 +86,14 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
         raise base.ASFQuartException("Invalid SBOM score result", 
errorcode=500)
     warnings = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in 
task_result.warnings]
     errors = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in 
task_result.errors]
+    vulnerabilities = 
[sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in 
task_result.vulnerabilities]
 
     block.p[
         """This is a report by the ATR SBOM tool, for debugging and
         informational purposes. Please use it only as an approximate
         guideline to the quality of your SBOM file."""
     ]
-    block.p["This report is for revision ", 
htm.code[task_result.revision_number], "."]
+    block.p["This report is for revision ", 
htm.code[task_result.revision_number], "."] # TODO: Mark if a subsequent score 
has failed
 
     # TODO: Show the status if the task to augment the SBOM is still running
     # TODO: Add a field to the SBOM to show that it's been augmented
@@ -111,7 +118,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
         block.p["No NTIA 2021 minimum data field conformance warnings or 
errors found."]
 
     block.h2["Vulnerability scan"]
-    _vulnerability_scan_section(block, project, version, file_path, 
task_result.revision_number, osv_tasks)
+    _vulnerability_scan_section(block, project, version, file_path, 
task_result.revision_number, vulnerabilities, osv_tasks)
 
     block.h2["Outdated tool"]
     outdated = None
@@ -142,13 +149,13 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
     return await template.blank("SBOM report", content=block.collect())
 
 
-def _extract_vulnerability_severity(vuln: dict[str, Any]) -> str:
+def _extract_vulnerability_severity(vuln: osv.VulnerabilityDetails) -> str:
     """Extract severity information from vulnerability data."""
-    db_specific = vuln.get("database_specific", {})
-    if "severity" in db_specific:
-        return db_specific["severity"]
+    data = vuln.database_specific or {}
+    if "severity" in data:
+        return data["severity"]
 
-    severity_data = vuln.get("severity", [])
+    severity_data = vuln.severity
     if severity_data and isinstance(severity_data, list):
         first_severity = severity_data[0]
         if isinstance(first_severity, dict) and ("type" in first_severity):
@@ -197,8 +204,7 @@ def _missing_tally(items: 
list[sbom.models.conformance.Missing]) -> list[tuple[s
         key=lambda kv: (kv[0], kv[1]),
     )
 
-
-def _vulnerability_component_details(block: htm.Block, component: 
results.OSVComponent) -> None:
+def _vulnerability_component_details_osv(block: htm.Block, component: 
results.OSVComponent) -> None:
     details_content = []
     summary_element = htm.summary[
         
htm.span(".badge.bg-danger.me-2.font-monospace")[str(len(component.vulnerabilities))],
@@ -207,17 +213,18 @@ def _vulnerability_component_details(block: htm.Block, 
component: results.OSVCom
     details_content.append(summary_element)
 
     for vuln in component.vulnerabilities:
-        vuln_id = vuln.get("id", "Unknown")
-        vuln_summary = vuln.get("summary", "No summary available")
-        vuln_refs = [r for r in vuln.get("references", []) if r.get("type", 
"") == "WEB"]
+        vuln_id = vuln.id or "Unknown"
+        vuln_summary = vuln.summary
+        vuln_refs = [r for r in vuln.references if r.get("type", "") == "WEB"]
         vuln_primary_ref = vuln_refs[0] if (len(vuln_refs) > 0) else {}
-        vuln_modified = vuln.get("modified", "Unknown")
+        vuln_modified = vuln.modified or "Unknown"
         vuln_severity = _extract_vulnerability_severity(vuln)
 
         vuln_header = [htm.a(href=vuln_primary_ref.get("url", ""), 
target="_blank")[htm.strong(".me-2")[vuln_id]]]
         if vuln_severity != "Unknown":
             
vuln_header.append(htm.span(".badge.bg-warning.text-dark")[vuln_severity])
 
+        details = 
markupsafe.Markup(cmarkgfm.github_flavored_markdown_to_html(vuln.details))
         vuln_div = 
htm.div(".ms-3.mb-3.border-start.border-warning.border-3.ps-3")[
             htm.div(".d-flex.align-items-center.mb-2")[*vuln_header],
             htm.p(".mb-1")[vuln_summary],
@@ -225,15 +232,14 @@ def _vulnerability_component_details(block: htm.Block, 
component: results.OSVCom
                 "Last modified: ",
                 vuln_modified,
             ],
-            htm.div(".mt-2.text-muted")[vuln.get("details", "No additional 
details available.")],
+            htm.div(".mt-2.text-muted")[details or "No additional details 
available."],
         ]
         details_content.append(vuln_div)
 
     block.append(htm.details(".mb-3.rounded")[*details_content])
 
-
-def _vulnerability_scan_button(block: htm.Block, project: str, version: str, 
file_path: str) -> None:
-    block.p["No vulnerability scan has been performed for this revision."]
+def _vulnerability_scan_button(block: htm.Block) -> None:
+    block.p["No new vulnerability scan has been performed for this revision."]
 
     form.render_block(
         block,
@@ -266,34 +272,64 @@ def _vulnerability_scan_find_in_progress_task(
     return None
 
 
-def _vulnerability_scan_results(block: htm.Block, task: sql.Task) -> None:
-    task_result = task.result
-    if not isinstance(task_result, results.SBOMOSVScan):
-        block.p["Invalid scan result format."]
-        return
+def _vulnerability_scan_results(block: htm.Block, vulns: 
list[osv.CdxVulnerabilityDetail], task: sql.Task | None) -> None:
+    if task is not None:
+        task_result = task.result
+        if not isinstance(task_result, results.SBOMOSVScan):
+            block.p["Invalid scan result format."]
+            return
 
-    components = task_result.components
-    ignored = task_result.ignored
-    ignored_count = len(ignored)
+        components = task_result.components
+        ignored = task_result.ignored
+        ignored_count = len(ignored)
+
+        if not components:
+            block.p["No vulnerabilities found."]
+            if ignored_count > 0:
+                component_word = "component was" if (ignored_count == 1) else 
"components were"
+                block.p[f"{ignored_count} {component_word} ignored due to 
missing PURL or version information:"]
+                block.p[f"{','.join(ignored)}"]
+            return
+
+        block.p[f"Found vulnerabilities in {len(components)} components:"]
+
+        for component in components:
+            _vulnerability_component_details_osv(block, component)
 
-    if not components:
-        block.p["No vulnerabilities found."]
         if ignored_count > 0:
             component_word = "component was" if (ignored_count == 1) else 
"components were"
             block.p[f"{ignored_count} {component_word} ignored due to missing 
PURL or version information:"]
             block.p[f"{','.join(ignored)}"]
-        return
-
-    block.p[f"Found vulnerabilities in {len(components)} components:"]
-
-    for component in components:
-        _vulnerability_component_details(block, component)
-
-    if ignored_count > 0:
-        component_word = "component was" if (ignored_count == 1) else 
"components were"
-        block.p[f"{ignored_count} {component_word} ignored due to missing PURL 
or version information:"]
-        block.p[f"{','.join(ignored)}"]
+    else:
+        if len(vulns) == 0:
+            block.p["No vulnerabilities found."]
+            return
+        components = {a.get("ref") for v in vulns for a in v.affects}
+        block.p[f"Found vulnerabilities in {len(components)} components:"]
+
+        for component in components:
+            _vulnerability_component_details_osv(
+                block,
+                results.OSVComponent(
+                    purl=component,
+                    vulnerabilities=[_cdx_to_osv(v) for v in vulns if 
component in [a.get("ref") for a in v.affects]]
+                )
+            )
 
+def _cdx_to_osv(cdx: osv.CdxVulnerabilityDetail) -> osv.VulnerabilityDetails:
+    severity, score = sbom.utilities.cdx_severity_to_osv(cdx.ratings)
+    return osv.VulnerabilityDetails(
+        id=cdx.id,
+        summary=cdx.description,
+        details=cdx.detail,
+        modified=cdx.updated,
+        published=cdx.published,
+        severity=score,
+        database_specific={
+            "severity": severity
+        },
+        references=[{ "type": "WEB", "url": a.get("url","")} for a in 
cdx.advisories]
+    )
 
 def _vulnerability_scan_section(
     block: htm.Block,
@@ -301,21 +337,20 @@ def _vulnerability_scan_section(
     version: str,
     file_path: str,
     revision_number: str,
+    vulnerabilities: list[osv.CdxVulnerabilityDetail],
     osv_tasks: collections.abc.Sequence[sql.Task],
 ) -> None:
     """Display the vulnerability scan section based on task status."""
     completed_task = _vulnerability_scan_find_completed_task(osv_tasks, 
revision_number)
 
-    if completed_task is not None:
-        _vulnerability_scan_results(block, completed_task)
-        return
-
     in_progress_task = _vulnerability_scan_find_in_progress_task(osv_tasks, 
revision_number)
 
     if in_progress_task is not None:
         _vulnerability_scan_status(block, in_progress_task, project, version, 
file_path)
     else:
-        _vulnerability_scan_button(block, project, version, file_path)
+        _vulnerability_scan_button(block)
+
+    _vulnerability_scan_results(block, vulnerabilities, completed_task)
 
 
 def _vulnerability_scan_status(
@@ -334,4 +369,4 @@ def _vulnerability_scan_status(
             htm.code[task.error],
             ". Additional details are unavailable from ATR.",
         ]
-        _vulnerability_scan_button(block, project, version, file_path)
+        _vulnerability_scan_button(block)
diff --git a/atr/models/results.py b/atr/models/results.py
index 9db1b55..54ad43d 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -18,6 +18,7 @@
 from typing import Annotated, Any, Literal
 
 import pydantic
+import atr.sbom.models.osv as osv
 
 from . import schema
 
@@ -48,7 +49,7 @@ class SBOMGenerateCycloneDX(schema.Strict):
 
 class OSVComponent(schema.Strict):
     purl: str = schema.description("Package URL")
-    vulnerabilities: list[dict[str, Any]] = 
schema.description("Vulnerabilities found")
+    vulnerabilities: list[osv.VulnerabilityDetails] = 
schema.description("Vulnerabilities found")
 
 
 class SBOMOSVScan(schema.Strict):
@@ -57,6 +58,7 @@ class SBOMOSVScan(schema.Strict):
     version_name: str = schema.description("Version name")
     revision_number: str = schema.description("Revision number")
     file_path: str = schema.description("Relative path to the scanned SBOM 
file")
+    new_file_path: str = schema.description("Relative path to the updated SBOM 
file")
     components: list[OSVComponent] = schema.description("Components with 
vulnerabilities")
     ignored: list[str] = schema.description("Components ignored")
 
@@ -120,6 +122,7 @@ class SBOMToolScore(schema.Strict):
     warnings: list[str] = schema.description("Warnings from the SBOM tool")
     errors: list[str] = schema.description("Errors from the SBOM tool")
     outdated: str | None = schema.description("Outdated tool from the SBOM 
tool")
+    vulnerabilities: list[str] | None = schema.description("Vulnerabilities 
stored in the SBOM")
     cli_errors: list[str] | None = schema.description("Errors from the 
CycloneDX CLI")
 
 
diff --git a/atr/sbom/cli.py b/atr/sbom/cli.py
index 4ef3bec..e658423 100644
--- a/atr/sbom/cli.py
+++ b/atr/sbom/cli.py
@@ -27,7 +27,7 @@ from .cyclonedx import validate_cli, validate_py
 from .licenses import check
 from .maven import plugin_outdated_version
 from .sbomqs import total_score
-from .utilities import bundle_to_patch, patch_to_data, path_to_bundle
+from .utilities import bundle_to_ntia_patch, bundle_to_vuln_patch, 
patch_to_data, path_to_bundle
 
 
 def command_license(bundle: models.bundle.Bundle) -> None:
@@ -54,7 +54,7 @@ def command_license(bundle: models.bundle.Bundle) -> None:
 
 
 def command_merge(bundle: models.bundle.Bundle) -> None:
-    patch_ops = asyncio.run(bundle_to_patch(bundle))
+    patch_ops = asyncio.run(bundle_to_ntia_patch(bundle))
     if patch_ops:
         patch_data = patch_to_data(patch_ops)
         merged = bundle.doc.patch(yyjson.Document(patch_data))
@@ -91,8 +91,18 @@ def command_outdated(bundle: models.bundle.Bundle) -> None:
         print("no outdated tool found")
 
 
-def command_patch(bundle: models.bundle.Bundle) -> None:
-    patch_ops = asyncio.run(bundle_to_patch(bundle))
+def command_patch_ntia(bundle: models.bundle.Bundle) -> None:
+    patch_ops = asyncio.run(bundle_to_ntia_patch(bundle))
+    if patch_ops:
+        patch_data = patch_to_data(patch_ops)
+        print(yyjson.Document(patch_data).dumps())
+    else:
+        print("no patch needed")
+
+
+def command_patch_vuln(bundle: models.bundle.Bundle) -> None:
+    results, ignored = asyncio.run(osv.scan_bundle(bundle))
+    patch_ops = asyncio.run(bundle_to_vuln_patch(bundle, results))
     if patch_ops:
         patch_data = patch_to_data(patch_ops)
         print(yyjson.Document(patch_data).dumps())
@@ -101,7 +111,7 @@ def command_patch(bundle: models.bundle.Bundle) -> None:
 
 
 def command_scores(bundle: models.bundle.Bundle) -> None:
-    patch_ops = asyncio.run(bundle_to_patch(bundle))
+    patch_ops = asyncio.run(bundle_to_ntia_patch(bundle))
     if patch_ops:
         patch_data = patch_to_data(patch_ops)
         merged = bundle.doc.patch(yyjson.Document(patch_data))
@@ -153,6 +163,9 @@ def command_where(bundle: models.bundle.Bundle) -> None:
 
 
 def main() -> None:  # noqa: C901
+    if len(sys.argv) < 3:
+        print("Usage: python -m atr.sbom <command> <sbom-path>")
+        sys.exit(1)
     path = pathlib.Path(sys.argv[2])
     bundle = path_to_bundle(path)
     match sys.argv[1]:
@@ -166,8 +179,10 @@ def main() -> None:  # noqa: C901
             command_osv(bundle)
         case "outdated":
             command_outdated(bundle)
-        case "patch":
-            command_patch(bundle)
+        case "patch-ntia":
+            command_patch_ntia(bundle)
+        case "patch-vuln":
+            command_patch_vuln(bundle)
         case "scores":
             command_scores(bundle)
         case "validate-cli":
diff --git a/atr/sbom/models/osv.py b/atr/sbom/models/osv.py
index 30cf8aa..23cf1f3 100644
--- a/atr/sbom/models/osv.py
+++ b/atr/sbom/models/osv.py
@@ -17,16 +17,43 @@
 
 from __future__ import annotations
 
-from typing import Any
+from typing import Any, Optional, ClassVar
+
+import pydantic
 
 from .base import Lax
 
 
 class QueryResult(Lax):
-    vulns: list[dict[str, Any]] | None = None
+    vulns: list[VulnerabilityDetails] | None = None
     next_page_token: str | None = None
 
 
 class ComponentVulnerabilities(Lax):
-    purl: str
-    vulnerabilities: list[dict[str, Any]]
+    ref: str
+    vulnerabilities: list[VulnerabilityDetails]
+
+class VulnerabilityDetails(Lax):
+    id: str
+    summary: str | None = None
+    details: str | None = None
+    references: list[dict[str, Any]] | None = None
+    severity: list[dict[str, Any]] | None = None
+    published: str | None = None
+    modified: str
+    database_specific: dict[str, Any] = pydantic.Field(default={})
+
+class CdxVulnerabilityDetail(Lax):
+    bom_ref: str | None = pydantic.Field(default=None, alias="bom-ref")
+    id: str | None = None
+    source: dict[str, str] | None = None
+    description: str | None = None
+    detail: str | None = None
+    advisories: list[dict[str,str]] | None = None
+    cwes: list[int] | None = None
+    published: str | None = None
+    updated: str | None = None
+    affects: list[dict[str,str]] | None = None
+    ratings: list[dict[str,str]] | None = None
+
+CdxVulnAdapter = pydantic.TypeAdapter(CdxVulnerabilityDetail)
diff --git a/atr/sbom/osv.py b/atr/sbom/osv.py
index fc84892..ee2d1a9 100644
--- a/atr/sbom/osv.py
+++ b/atr/sbom/osv.py
@@ -21,11 +21,61 @@ import os
 from typing import Any
 
 import aiohttp
+import yyjson
 
 from . import models
+from .utilities import get_pointer, osv_severity_to_cdx
 
 _DEBUG: bool = os.environ.get("DEBUG_SBOM_TOOL") == "1"
 _OSV_API_BASE: str = "https://api.osv.dev/v1";
+_SOURCE_DATABASE_NAMES = {
+    "ASB": "Android Security Bulletin",
+    "PUB": "Android Security Bulletin",
+    "ALSA": "AlmaLinux Security Advisory",
+    "ALBA": "AlmaLinux Security Advisory",
+    "ALEA": "AlmaLinux Security Advisory",
+    "ALPINE": "Alpine Security Advisory",
+    "BELL": "BellSoft Security Advisory",
+    "BIT": "Bitnami Vulnerability Database",
+    "CGA": "Chainguard Security Notices",
+    "CURL": "Curl CVEs",
+    "CVE": "National Vulnerability Database",
+    "DEBIAN": "Debian Security Tracker",
+    "DSA": "Debian Security Advisory",
+    "DLA": "Debian Security Advisory",
+    "DTSA": "Debian Security Advisory",
+    "ECHO": "Echo Security Advisory",
+    "EEF": "Erlang Ecosystem Foundation CNA Vulnerabilities",
+    "ELA": "Debian Extended LTS Security Advisory",
+    "GHSA": "GitHub Security Advisory",
+    "GO": "Go Vulnerability Database",
+    "GSD": "Global Security Database",
+    "HSEC": "Haskell Security Advisory",
+    "JLSEC": "Julia Security Advisory",
+    "KUBE": "Kubernetes Official CVE Feed",
+    "LBSEC": "LoopBack Advisory Database",
+    "LSN": "Livepatch Security Notices",
+    "MGASA": "Mageia Security Advisory",
+    "MAL": "Malicious Packages Repository",
+    "MINI": "Minimus Security Notices",
+    "OESA": "openEuler Security Advisory",
+    "OSV": "OSV Advisory",
+    "PHSA": "VMWare Photon Security Advisory",
+    "PSF": "Python Software Foundation Vulnerability Database",
+    "PYSEC": "PyPI Vulnerability Database",
+    "RHSA": "Red Hat Security Data",
+    "RHBA": "Red Hat Security Data",
+    "RHEA": "Red Hat Security Data",
+    "RLSA": "Rocky Linux Security Advisory",
+    "RXSA": "Rocky Linux Security Advisory",
+    "RSEC": "RConsortium Advisory Database",
+    "RUSTSEC": "RustSec Advisory Database",
+    "SUSE": "SUSE Security Landing Page",
+    "openSUSE": "SUSE Security Landing Page",
+    "UBUNTU": "Ubuntu CVE Reports",
+    "USN": "Ubuntu Security Notices",
+    "V8": "V8/Chromium Time-Based Policy",
+}
 
 
 async def scan_bundle(bundle: models.bundle.Bundle) -> 
tuple[list[models.osv.ComponentVulnerabilities], list[str]]:
@@ -42,10 +92,79 @@ async def scan_bundle(bundle: models.bundle.Bundle) -> 
tuple[list[models.osv.Com
             print(f"[DEBUG] Total components with vulnerabilities: 
{len(component_vulns_map)}")
         await _scan_bundle_populate_vulnerabilities(session, 
component_vulns_map)
     result: list[models.osv.ComponentVulnerabilities] = []
-    for purl, vulns in component_vulns_map.items():
-        result.append(models.osv.ComponentVulnerabilities(purl=purl, 
vulnerabilities=vulns))
+    for ref, vulns in component_vulns_map.items():
+        result.append(models.osv.ComponentVulnerabilities(ref=ref, 
vulnerabilities=vulns))
     return result, ignored
 
+def vulns_from_bundle(bundle: models.bundle.Bundle) -> 
list[models.osv.CdxVulnerabilityDetail]:
+    vulns = get_pointer(bundle.doc, "/vulnerabilities")
+    if vulns is None:
+        return []
+    print(vulns)
+    return [models.osv.CdxVulnerabilityDetail.model_validate(v) for v in vulns]
+
+async def vuln_patch(
+    session: aiohttp.ClientSession,
+    doc: yyjson.Document,
+    components: list[models.osv.ComponentVulnerabilities],
+) -> models.patch.Patch:
+    patch_ops: models.patch.Patch = []
+    _assemble_vulnerabilities(doc, patch_ops)
+    ix = 0
+    for c in components:
+        for vuln in c.vulnerabilities:
+            _assemble_component_vulnerability(doc, patch_ops, c.ref, vuln, ix)
+            ix += 1
+    return patch_ops
+
+def _assemble_vulnerabilities(doc: yyjson.Document, patch_ops: 
models.patch.Patch) -> None:
+    if get_pointer(doc, "/vulnerabilities") is not None:
+        patch_ops.append(
+            models.patch.RemoveOp(
+                op="remove",
+                path="/vulnerabilities"
+            )
+        )
+    patch_ops.append(
+        models.patch.AddOp(
+            op="add",
+            path="/vulnerabilities",
+            value=[],
+        )
+    )
+
+def _assemble_component_vulnerability(
+    doc: yyjson.Document,
+    patch_ops: models.patch.Patch,
+    ref: str,
+    vuln: models.osv.VulnerabilityDetails,
+    index: int
+)-> None:
+    vulnerability = {
+        "bom-ref": f"vuln:{ref}/{vuln.id}",
+        "id": vuln.id,
+        "source": _get_source(vuln),
+        "description": vuln.summary,
+        "detail": vuln.details,
+        "advisories": [
+            { "url": r["url"] }
+            for r in vuln.references
+            if (r.get("type", "") == "WEB" and "advisories" in r.get("url", 
"")) or r.get("type", "") == "ADVISORY"
+        ],
+        "cwes": [int(r.replace("CWE-", "")) for r in 
vuln.database_specific.get("cwe_ids", [])],
+        "published": vuln.published,
+        "updated": vuln.modified,
+        "affects": [{"ref": ref}],
+        "ratings": osv_severity_to_cdx(vuln.severity, 
vuln.database_specific.get("severity", "")),
+    }
+    patch_ops.append(
+        models.patch.AddOp(
+            op="add",
+            path=f"/vulnerabilities/{index!s}",
+            value=vulnerability,
+        )
+    )
+
 
 def _component_purl_with_version(component: models.bom.Component) -> str | 
None:
     if component.purl is None:
@@ -90,20 +209,27 @@ async def _fetch_vulnerabilities_for_batch(
 async def _fetch_vulnerability_details(
     session: aiohttp.ClientSession,
     vuln_id: str,
-) -> dict[str, Any]:
+) -> models.osv.VulnerabilityDetails:
     if _DEBUG:
         print(f"[DEBUG] Fetching details for {vuln_id}")
     async with session.get(f"{_OSV_API_BASE}/vulns/{vuln_id}") as response:
         response.raise_for_status()
         return await response.json()
 
+def _get_source(vuln: models.osv.VulnerabilityDetails) -> dict[str, str | 
None]:
+    db = vuln.id.split("-")[0]
+    web_refs = list(filter(lambda v: v.get("type", "") == "WEB", 
vuln.references)) if vuln.references else []
+    first_ref = web_refs[0] if len(web_refs) > 0 else None
+
+    name = _SOURCE_DATABASE_NAMES.get(db, "Unknown Database")
+    return {"name": name, "url": first_ref.get("url", "")}
 
 async def _paginate_query(
     session: aiohttp.ClientSession,
     query: dict[str, Any],
     page_token: str,
-) -> list[dict[str, Any]]:
-    all_vulns: list[dict[str, Any]] = []
+) -> list[models.osv.VulnerabilityDetails]:
+    all_vulns: list[models.osv.VulnerabilityDetails] = []
     current_query = query.copy()
     current_query["page_token"] = page_token
     page = 0
@@ -135,7 +261,7 @@ def _scan_bundle_build_queries(
             ignored.append(component.name)
             continue
         query = {"package": {"purl": purl_with_version}}
-        queries.append((purl_with_version, query))
+        queries.append((component.bom_ref, query))
     return queries, ignored
 
 
@@ -143,31 +269,31 @@ async def _scan_bundle_fetch_vulnerabilities(
     session: aiohttp.ClientSession,
     queries: list[tuple[str, dict[str, Any]]],
     batch_size: int,
-) -> dict[str, list[dict[str, Any]]]:
-    component_vulns_map: dict[str, list[dict[str, Any]]] = {}
+) -> dict[str, list[models.osv.VulnerabilityDetails]]:
+    component_vulns_map: dict[str, list[models.osv.VulnerabilityDetails]] = {}
     for batch_start in range(0, len(queries), batch_size):
         batch_end = min(batch_start + batch_size, len(queries))
         batch = queries[batch_start:batch_end]
         if _DEBUG:
             batch_num = batch_start // batch_size + 1
             print(f"[DEBUG] Processing batch {batch_num} ({batch_start + 
1}-{batch_end}/{len(queries)})")
-        batch_queries = [query for _purl, query in batch]
+        batch_queries = [query for _ref, query in batch]
         batch_results = await _fetch_vulnerabilities_for_batch(session, 
batch_queries)
         if _DEBUG and (len(batch_results) != len(batch)):
             print(f"[DEBUG] count mismatch (expected {len(batch)}, got 
{len(batch_results)})")
-        for i, (purl, query) in enumerate(batch):
+        for i, (ref, query) in enumerate(batch):
             if i >= len(batch_results):
                 break
             query_result = batch_results[i]
             if query_result.vulns:
-                existing_vulns = component_vulns_map.setdefault(purl, [])
+                existing_vulns = component_vulns_map.setdefault(ref, [])
                 existing_vulns.extend(query_result.vulns)
                 if _DEBUG:
-                    print(f"[DEBUG] {purl}: {len(query_result.vulns)} 
vulnerabilities")
+                    print(f"[DEBUG] {ref}: {len(query_result.vulns)} 
vulnerabilities")
             if query_result.next_page_token:
                 if _DEBUG:
-                    print(f"[DEBUG] {purl}: has pagination, fetching remaining 
pages")
-                existing_vulns = component_vulns_map.setdefault(purl, [])
+                    print(f"[DEBUG] {ref}: has pagination, fetching remaining 
pages")
+                existing_vulns = component_vulns_map.setdefault(ref, [])
                 paginated = await _paginate_query(session, query, 
query_result.next_page_token)
                 existing_vulns.extend(paginated)
     return component_vulns_map
@@ -175,19 +301,19 @@ async def _scan_bundle_fetch_vulnerabilities(
 
 async def _scan_bundle_populate_vulnerabilities(
     session: aiohttp.ClientSession,
-    component_vulns_map: dict[str, list[dict[str, Any]]],
+    component_vulns_map: dict[str, list[models.osv.VulnerabilityDetails]],
 ) -> None:
-    details_cache: dict[str, dict[str, Any]] = {}
+    details_cache: dict[str, models.osv.VulnerabilityDetails] = {}
     for vulns in component_vulns_map.values():
         for vuln in vulns:
-            vuln_id = vuln.get("id")
+            vuln_id = vuln.id
             if not vuln_id:
                 continue
             details = details_cache.get(vuln_id)
             if details is None:
                 details = await _fetch_vulnerability_details(session, vuln_id)
                 details_cache[vuln_id] = details
-            vuln.clear()
-            vuln.update(details)
+            vuln.__dict__.clear()
+            vuln.__dict__.update(details)
     if _DEBUG:
         print(f"[DEBUG] Fetched details for {len(details_cache)} unique 
vulnerabilities")
diff --git a/atr/sbom/utilities.py b/atr/sbom/utilities.py
index 496deb2..6d98695 100644
--- a/atr/sbom/utilities.py
+++ b/atr/sbom/utilities.py
@@ -19,16 +19,39 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any
 
+
 if TYPE_CHECKING:
     import pathlib
 
+    import atr.sbom.models.osv as osv
+
 import aiohttp
 import yyjson
 
 from . import models
 
+_SCORING_METHODS_OSV = {
+    "CVSS_V2": "CVSSv2",
+    "CVSS_V3": "CVSSv3",
+    "CVSS_V4": "CVSSv4"
+}
+_SCORING_METHODS_CDX = {
+    "CVSSv2": "CVSS_V2",
+    "CVSSv3": "CVSS_V3",
+    "CVSSv4": "CVSS_V4",
+    "other": "Other"
+}
+_CDX_SEVERITIES = [
+    "critical",
+    "high",
+    "medium",
+    "low",
+    "info",
+    "none",
+    "unknown"
+]
 
-async def bundle_to_patch(bundle_value: models.bundle.Bundle) -> 
models.patch.Patch:
+async def bundle_to_ntia_patch(bundle_value: models.bundle.Bundle) -> 
models.patch.Patch:
     from .conformance import ntia_2021_issues, ntia_2021_patch
 
     _warnings, errors = ntia_2021_issues(bundle_value.bom)
@@ -36,6 +59,18 @@ async def bundle_to_patch(bundle_value: 
models.bundle.Bundle) -> models.patch.Pa
         patch_ops = await ntia_2021_patch(session, bundle_value.doc, errors)
     return patch_ops
 
+async def bundle_to_vuln_patch(
+    bundle_value: models.bundle.Bundle,
+    vulnerabilities:  list[osv.ComponentVulnerabilities]
+) -> models.patch.Patch:
+    from .osv import vuln_patch
+
+    # TODO: May not need session (copied from ntia patch)
+    async with aiohttp.ClientSession() as session:
+        patch_ops = await vuln_patch(session, bundle_value.doc, 
vulnerabilities)
+    return patch_ops
+
+
 
 def get_pointer(doc: yyjson.Document, path: str) -> Any | None:
     try:
@@ -55,3 +90,34 @@ def path_to_bundle(path: pathlib.Path) -> 
models.bundle.Bundle:
     text = path.read_text(encoding="utf-8")
     bom = models.bom.Bom.model_validate_json(text)
     return models.bundle.Bundle(doc=yyjson.Document(text), bom=bom, path=path, 
text=text)
+
+def osv_severity_to_cdx(severity: list[dict[str, Any]] | None, textual: str) 
-> list[dict[str, str]] | None:
+    if severity is not None:
+        return [
+            {
+                "severity": textual.lower() if textual.lower() in 
_CDX_SEVERITIES else "unknown", # TODO: constrain to valid enum
+                "method": _SCORING_METHODS_OSV.get(s.get("type"), "other"),
+                **_extract_cdx_score(s.get("score"))
+            }
+            for s in severity
+        ]
+    return None
+
+def cdx_severity_to_osv(severity: list[dict[str, str]]) -> tuple[str | None, 
list[dict[str, str]]]:
+    severities = [
+        {
+            "score": str(s.get("score", s.get("vector", ""))),
+            "type": _SCORING_METHODS_CDX.get(s.get("method")),
+        } for s in severity
+    ]
+    textual = severity[0].get("severity")
+    return textual, severities
+
+def _extract_cdx_score(score: str):
+    if "CVSS" in score:
+        return { "vector": score }
+    else:
+        try:
+            return { "score": float(score) }
+        except ValueError:
+            return { "vector": score }
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index e837a87..ad6c9db 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -85,7 +85,7 @@ async def augment(args: FileArgs) -> results.Results | None:
         raise SBOMScoringError("SBOM file does not exist", {"file_path": 
args.file_path})
     # Read from the old revision
     bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path))
-    patch_ops = await sbom.utilities.bundle_to_patch(bundle)
+    patch_ops = await sbom.utilities.bundle_to_ntia_patch(bundle)
     new_full_path: str | None = None
     if patch_ops:
         patch_data = sbom.utilities.patch_to_data(patch_ops)
@@ -140,13 +140,36 @@ async def osv_scan(args: FileArgs) -> results.Results | 
None:
         raise SBOMScanningError("SBOM file does not exist", {"file_path": 
args.file_path})
     bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path))
     vulnerabilities, ignored = await sbom.osv.scan_bundle(bundle)
-    components = [results.OSVComponent(purl=v.purl, 
vulnerabilities=v.vulnerabilities) for v in vulnerabilities]
+    patch_ops = await sbom.utilities.bundle_to_vuln_patch(bundle, 
vulnerabilities)
+    components = [results.OSVComponent(purl=v.ref, 
vulnerabilities=v.vulnerabilities) for v in vulnerabilities]
+
+    new_full_path: str | None = None
+    if patch_ops:
+        patch_data = sbom.utilities.patch_to_data(patch_ops)
+        merged = bundle.doc.patch(yyjson.Document(patch_data))
+        description = "SBOM vulnerability scan through web interface"
+        async with storage.write(args.asf_uid) as write:
+            wacp = await 
write.as_project_committee_participant(args.project_name)
+            async with wacp.revision.create_and_manage(
+                args.project_name, args.version_name, args.asf_uid or 
"unknown", description=description
+            ) as creating:
+                new_full_path = os.path.join(str(creating.interim_path), 
args.file_path)
+                # Write to the new revision
+                log.info(f"Writing updated SBOM to {new_full_path}")
+                await aiofiles.os.remove(new_full_path)
+                async with aiofiles.open(new_full_path, "w", encoding="utf-8") 
as f:
+                    await f.write(merged.dumps())
+
+            if creating.new is None:
+                raise RuntimeError("Internal error: New revision not found")
+
     return results.SBOMOSVScan(
         kind="sbom_osv_scan",
         project_name=args.project_name,
         version_name=args.version_name,
         revision_number=args.revision_number,
-        file_path=args.file_path,
+        file_path=full_path,
+        new_file_path=new_full_path,
         components=components,
         ignored=ignored,
     )
@@ -198,6 +221,7 @@ async def score_tool(args: FileArgs) -> results.Results | 
None:
     bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path))
     warnings, errors = sbom.conformance.ntia_2021_issues(bundle.bom)
     outdated = sbom.maven.plugin_outdated_version(bundle.bom)
+    vulnerabilities = sbom.osv.vulns_from_bundle(bundle)
     cli_errors = sbom.cyclonedx.validate_cli(bundle)
     return results.SBOMToolScore(
         kind="sbom_tool_score",
@@ -208,6 +232,7 @@ async def score_tool(args: FileArgs) -> results.Results | 
None:
         warnings=[w.model_dump_json() for w in warnings],
         errors=[e.model_dump_json() for e in errors],
         outdated=outdated.model_dump_json() if outdated else None,
+        vulnerabilities=[v.model_dump_json() for v in vulnerabilities],
         cli_errors=cli_errors,
     )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to