This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch vulnerabilities_sbom in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 3884c68f16278e3271e4398cba483f9ed1237b1d Author: Alastair McFarlane <[email protected]> AuthorDate: Thu Dec 18 16:00:52 2025 +0000 Store vulnerabilities in SBOM and read back from the report. Store ATR task info in SBOM as a reference. --- atr/get/checks.py | 8 + atr/get/sbom.py | 270 +++++++++++++++++++-------- atr/models/results.py | 13 +- atr/sbom/cli.py | 37 ++-- atr/sbom/models/bom.py | 2 +- atr/sbom/models/osv.py | 36 +++- atr/sbom/osv.py | 170 +++++++++++++++-- atr/sbom/utilities.py | 111 ++++++++++- atr/tasks/sbom.py | 35 +++- atr/templates/check-selected-path-table.html | 2 +- docker-compose.yml | 2 + pyproject.toml | 1 + uv.lock | 11 ++ 13 files changed, 574 insertions(+), 124 deletions(-) diff --git a/atr/get/checks.py b/atr/get/checks.py index 47c2879..0e0815f 100644 --- a/atr/get/checks.py +++ b/atr/get/checks.py @@ -27,6 +27,7 @@ import atr.db as db import atr.get.download as download import atr.get.ignores as ignores import atr.get.report as report +import atr.get.sbom as sbom import atr.get.vote as vote import atr.htm as htm import atr.models.sql as sql @@ -349,6 +350,7 @@ def _render_file_row( version_name=release.version, file_path=path_str, ) + sbom_url = util.as_url(sbom.report, project=release.project.name, version=release.version, file_path=path_str) if not has_checks_before: path_display = htpy.code(".text-muted")[path_str] @@ -393,6 +395,11 @@ def _render_file_row( err_cell = htpy.span(".text-muted", style=num_style)["0"] report_btn = htpy.a(".btn.btn-sm.btn-outline-success", href=report_url)["Show details"] + # <a href="{{ as_url(get.sbom.report, project=project_name, version=version_name, file_path=path) }}" + # class="btn btn-sm btn-outline-secondary">Show SBOM</a> + sbom_btn = None + if path.suffixes[-2:] == [".cdx", ".json"]: + sbom_btn = htpy.a(".btn.btn-sm.btn-outline-secondary", href=sbom_url)["SBOM report"] download_btn = htpy.a(".btn.btn-sm.btn-outline-secondary", href=download_url)["Download"] tbody.tr[ @@ -403,6 +410,7 @@ def _render_file_row( htpy.td(".text-end.text-nowrap.py-2.pe-3")[ htpy.div(".d-flex.justify-content-end.align-items-center.gap-2")[ report_btn, + sbom_btn, download_btn, ], ], diff --git a/atr/get/sbom.py b/atr/get/sbom.py index fb570a7..4b6222c 100644 --- a/atr/get/sbom.py +++ b/atr/get/sbom.py @@ -21,6 +21,8 @@ import json from typing import TYPE_CHECKING, Any import asfquart.base as base +import cmarkgfm +import markupsafe import atr.blueprints.get as get import atr.db as db @@ -29,6 +31,7 @@ import atr.htm as htm import atr.models.results as results import atr.models.sql as sql import atr.sbom as sbom +import atr.sbom.models.osv as osv import atr.shared as shared import atr.template as template import atr.web as web @@ -40,7 +43,14 @@ if TYPE_CHECKING: @get.committer("/sbom/report/<project>/<version>/<path:file_path>") async def report(session: web.Committer, project: str, version: str, file_path: str) -> str: await session.check_access(project) - await session.release(project, version) + + # If the draft is not found, we try to get the release candidate + try: + release = await session.release(project, version, with_committee=True) + except base.ASFQuartException: + release = await session.release(project, version, phase=sql.ReleasePhase.RELEASE_CANDIDATE, with_committee=True) + is_release_candidate = release.phase == sql.ReleasePhase.RELEASE_CANDIDATE + async with db.session() as data: via = sql.validate_instrumented_attribute # TODO: Abstract this code and the sbomtool.MissingAdapter validators @@ -48,6 +58,7 @@ async def report(session: web.Committer, project: str, version: str, file_path: await data.task( project_name=project, version_name=version, + revision_number=release.latest_revision_number, task_type=sql.TaskType.SBOM_TOOL_SCORE, status=sql.TaskStatus.COMPLETED, primary_rel_path=file_path, @@ -55,13 +66,14 @@ async def report(session: web.Committer, project: str, version: str, file_path: .order_by(sql.sqlmodel.desc(via(sql.Task.completed))) .all() ) - + # Run or running scans for the current revision osv_tasks = ( await data.task( project_name=project, version_name=version, task_type=sql.TaskType.SBOM_OSV_SCAN, primary_rel_path=file_path, + revision_number=release.latest_revision_number, ) .order_by(sql.sqlmodel.desc(via(sql.Task.added))) .all() @@ -80,38 +92,24 @@ async def report(session: web.Committer, project: str, version: str, file_path: raise base.ASFQuartException("Invalid SBOM score result", errorcode=500) warnings = [sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in task_result.warnings] errors = [sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in task_result.errors] + if task_result.vulnerabilities is not None: + vulnerabilities = [ + sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in task_result.vulnerabilities + ] + else: + vulnerabilities = [] - block.p[ - """This is a report by the ATR SBOM tool, for debugging and - informational purposes. Please use it only as an approximate - guideline to the quality of your SBOM file.""" - ] - block.p["This report is for revision ", htm.code[task_result.revision_number], "."] - - # TODO: Show the status if the task to augment the SBOM is still running - # TODO: Add a field to the SBOM to show that it's been augmented - # And then don't allow it to be augmented again - form.render_block( - block, - model_cls=shared.sbom.AugmentSBOMForm, - submit_label="Augment SBOM", - empty=True, - ) - - if warnings: - block.h2["Warnings"] - _missing_table(block, warnings) + _report_header(block, is_release_candidate, release, task_result) - if errors: - block.h2["Errors"] - _missing_table(block, errors) + if not is_release_candidate: + _augment_section(block, release, task_result) - if not (warnings or errors): - block.h2["Conformance report"] - block.p["No NTIA 2021 minimum data field conformance warnings or errors found."] + _conformance_section(block, errors, warnings) - block.h2["Vulnerability scan"] - _vulnerability_scan_section(block, project, version, file_path, task_result.revision_number, osv_tasks) + block.h2["Vulnerabilities"] + _vulnerability_scan_section( + block, project, version, file_path, task_result, vulnerabilities, osv_tasks, is_release_candidate + ) block.h2["Outdated tool"] outdated = None @@ -133,22 +131,84 @@ async def report(session: web.Committer, project: str, version: str, file_path: else: block.p["No outdated tool found."] + _cyclonedx_cli_errors(block, task_result) + + return await template.blank("SBOM report", content=block.collect()) + + +def _conformance_section(block: htm.Block, errors: list[Any], warnings: list[Any]): + if warnings: + block.h2["Warnings"] + _missing_table(block, warnings) + + if errors: + block.h2["Errors"] + _missing_table(block, errors) + + if not (warnings or errors): + block.h2["Conformance report"] + block.p["No NTIA 2021 minimum data field conformance warnings or errors found."] + + +def _report_header( + block: htm.Block, is_release_candidate: bool, release: sql.Release, task_result: results.SBOMToolScore +) -> None: + block.p[ + """This is a report by the ATR SBOM tool, for debugging and + informational purposes. Please use it only as an approximate + guideline to the quality of your SBOM file.""" + ] + if not is_release_candidate: + block.p[ + "This report is for revision ", htm.code[task_result.revision_number], "." + ] # TODO: Mark if a subsequent score has failed + elif release.phase == sql.ReleasePhase.RELEASE_CANDIDATE: + block.p["This report is for the latest release candidate."] + + +def _augment_section(block: htm.Block, release: sql.Release, task_result: results.SBOMToolScore): + # TODO: Show the status if the task to augment the SBOM is still running + # And then don't allow it to be augmented again + augments = [] + if task_result.atr_props is not None: + augments = [t.get("value", "") for t in task_result.atr_props if t.get("name", "") == "asf:atr:augment"] + if len(augments) == 0: + block.p["We can attempt to augment this SBOM with additional data."] + form.render_block( + block, + model_cls=shared.sbom.AugmentSBOMForm, + submit_label="Augment SBOM", + empty=True, + ) + else: + if release.latest_revision_number in augments: + block.p["This SBOM was augmented by ATR."] + else: + block.p["This SBOM was augmented by ATR at revision ", htm.code[augments[-1]], "."] + block.p["We can perform augmentation again to check for additional new data."] + form.render_block( + block, + model_cls=shared.sbom.AugmentSBOMForm, + submit_label="Re-augment SBOM", + empty=True, + ) + + +def _cyclonedx_cli_errors(block: htm.Block, task_result: results.SBOMToolScore): block.h2["CycloneDX CLI validation errors"] if task_result.cli_errors: block.pre["\n".join(task_result.cli_errors)] else: block.p["No CycloneDX CLI validation errors found."] - return await template.blank("SBOM report", content=block.collect()) - -def _extract_vulnerability_severity(vuln: dict[str, Any]) -> str: +def _extract_vulnerability_severity(vuln: osv.VulnerabilityDetails) -> str: """Extract severity information from vulnerability data.""" - db_specific = vuln.get("database_specific", {}) - if "severity" in db_specific: - return db_specific["severity"] + data = vuln.database_specific or {} + if "severity" in data: + return data["severity"] - severity_data = vuln.get("severity", []) + severity_data = vuln.severity if severity_data and isinstance(severity_data, list): first_severity = severity_data[0] if isinstance(first_severity, dict) and ("type" in first_severity): @@ -198,7 +258,7 @@ def _missing_tally(items: list[sbom.models.conformance.Missing]) -> list[tuple[s ) -def _vulnerability_component_details(block: htm.Block, component: results.OSVComponent) -> None: +def _vulnerability_component_details_osv(block: htm.Block, component: results.OSVComponent) -> None: details_content = [] summary_element = htm.summary[ htm.span(".badge.bg-danger.me-2.font-monospace")[str(len(component.vulnerabilities))], @@ -207,17 +267,20 @@ def _vulnerability_component_details(block: htm.Block, component: results.OSVCom details_content.append(summary_element) for vuln in component.vulnerabilities: - vuln_id = vuln.get("id", "Unknown") - vuln_summary = vuln.get("summary", "No summary available") - vuln_refs = [r for r in vuln.get("references", []) if r.get("type", "") == "WEB"] + vuln_id = vuln.id or "Unknown" + vuln_summary = vuln.summary + vuln_refs = [] + if vuln.references is not None: + vuln_refs = [r for r in vuln.references if r.get("type", "") == "WEB"] vuln_primary_ref = vuln_refs[0] if (len(vuln_refs) > 0) else {} - vuln_modified = vuln.get("modified", "Unknown") + vuln_modified = vuln.modified or "Unknown" vuln_severity = _extract_vulnerability_severity(vuln) vuln_header = [htm.a(href=vuln_primary_ref.get("url", ""), target="_blank")[htm.strong(".me-2")[vuln_id]]] if vuln_severity != "Unknown": vuln_header.append(htm.span(".badge.bg-warning.text-dark")[vuln_severity]) + details = markupsafe.Markup(cmarkgfm.github_flavored_markdown_to_html(vuln.details)) vuln_div = htm.div(".ms-3.mb-3.border-start.border-warning.border-3.ps-3")[ htm.div(".d-flex.align-items-center.mb-2")[*vuln_header], htm.p(".mb-1")[vuln_summary], @@ -225,15 +288,15 @@ def _vulnerability_component_details(block: htm.Block, component: results.OSVCom "Last modified: ", vuln_modified, ], - htm.div(".mt-2.text-muted")[vuln.get("details", "No additional details available.")], + htm.div(".mt-2.text-muted")[details or "No additional details available."], ] details_content.append(vuln_div) block.append(htm.details(".mb-3.rounded")[*details_content]) -def _vulnerability_scan_button(block: htm.Block, project: str, version: str, file_path: str) -> None: - block.p["No vulnerability scan has been performed for this revision."] +def _vulnerability_scan_button(block: htm.Block) -> None: + block.p["No new vulnerability scan has been performed for this revision."] form.render_block( block, @@ -266,33 +329,78 @@ def _vulnerability_scan_find_in_progress_task( return None -def _vulnerability_scan_results(block: htm.Block, task: sql.Task) -> None: - task_result = task.result - if not isinstance(task_result, results.SBOMOSVScan): - block.p["Invalid scan result format."] - return +def _vulnerability_scan_results( + block: htm.Block, vulns: list[osv.CdxVulnerabilityDetail], scans: list[str], task: sql.Task | None +) -> None: + if task is not None: + task_result = task.result + if not isinstance(task_result, results.SBOMOSVScan): + block.p["Invalid scan result format."] + return + + components = task_result.components + ignored = task_result.ignored + ignored_count = len(ignored) + + if not components: + block.p["No vulnerabilities found."] + if ignored_count > 0: + component_word = "component was" if (ignored_count == 1) else "components were" + block.p[f"{ignored_count} {component_word} ignored due to missing PURL or version information:"] + block.p[f"{','.join(ignored)}"] + return + + block.p[f"Scan found vulnerabilities in {len(components)} components:"] - components = task_result.components - ignored = task_result.ignored - ignored_count = len(ignored) + for component in components: + _vulnerability_component_details_osv(block, component) - if not components: - block.p["No vulnerabilities found."] if ignored_count > 0: component_word = "component was" if (ignored_count == 1) else "components were" block.p[f"{ignored_count} {component_word} ignored due to missing PURL or version information:"] block.p[f"{','.join(ignored)}"] - return - - block.p[f"Found vulnerabilities in {len(components)} components:"] + else: + if len(vulns) == 0: + block.p["No vulnerabilities listed in this SBOM."] + return + components = {a.get("ref", "") for v in vulns if v.affects is not None for a in v.affects} + + if len(scans) > 0: + block.p["This SBOM was scanned for vulnerabilities at revision ", htm.code[scans[-1]], "."] + + block.p[f"Vulnerabilities found in {len(components)} components:"] + + for component in components: + _vulnerability_component_details_osv( + block, + results.OSVComponent( + purl=component, + vulnerabilities=[ + _cdx_to_osv(v) + for v in vulns + if v.affects is not None and component in [a.get("ref") for a in v.affects] + ], + ), + ) - for component in components: - _vulnerability_component_details(block, component) - if ignored_count > 0: - component_word = "component was" if (ignored_count == 1) else "components were" - block.p[f"{ignored_count} {component_word} ignored due to missing PURL or version information:"] - block.p[f"{','.join(ignored)}"] +def _cdx_to_osv(cdx: osv.CdxVulnerabilityDetail) -> osv.VulnerabilityDetails: + score = [] + severity = "" + if cdx.ratings is not None: + severity, score = sbom.utilities.cdx_severity_to_osv(cdx.ratings) + return osv.VulnerabilityDetails( + id=cdx.id, + summary=cdx.description, + details=cdx.detail, + modified=cdx.updated or "", + published=cdx.published, + severity=score, + database_specific={"severity": severity}, + references=[{"type": "WEB", "url": a.get("url", "")} for a in cdx.advisories] + if cdx.advisories is not None + else [], + ) def _vulnerability_scan_section( @@ -300,31 +408,29 @@ def _vulnerability_scan_section( project: str, version: str, file_path: str, - revision_number: str, + task_result: results.SBOMToolScore, + vulnerabilities: list[osv.CdxVulnerabilityDetail], osv_tasks: collections.abc.Sequence[sql.Task], + is_release_candidate: bool, ) -> None: """Display the vulnerability scan section based on task status.""" - completed_task = _vulnerability_scan_find_completed_task(osv_tasks, revision_number) + completed_task = _vulnerability_scan_find_completed_task(osv_tasks, task_result.revision_number) - if completed_task is not None: - _vulnerability_scan_results(block, completed_task) - return + in_progress_task = _vulnerability_scan_find_in_progress_task(osv_tasks, task_result.revision_number) - in_progress_task = _vulnerability_scan_find_in_progress_task(osv_tasks, revision_number) + scans = [] + if task_result.atr_props is not None: + scans = [t.get("value", "") for t in task_result.atr_props if t.get("name", "") == "asf:atr:osv-scan"] + _vulnerability_scan_results(block, vulnerabilities, scans, completed_task) - if in_progress_task is not None: - _vulnerability_scan_status(block, in_progress_task, project, version, file_path) - else: - _vulnerability_scan_button(block, project, version, file_path) + if not is_release_candidate: + if in_progress_task is not None: + _vulnerability_scan_status(block, in_progress_task, project, version, file_path) + else: + _vulnerability_scan_button(block) -def _vulnerability_scan_status( - block: htm.Block, - task: sql.Task, - project: str, - version: str, - file_path: str, -) -> None: +def _vulnerability_scan_status(block: htm.Block, task: sql.Task, project: str, version: str, file_path: str) -> None: status_text = task.status.value.replace("_", " ").capitalize() block.p[f"Vulnerability scan is currently {status_text.lower()}."] block.p["Task ID: ", htm.code[str(task.id)]] @@ -334,4 +440,4 @@ def _vulnerability_scan_status( htm.code[task.error], ". Additional details are unavailable from ATR.", ] - _vulnerability_scan_button(block, project, version, file_path) + _vulnerability_scan_button(block) diff --git a/atr/models/results.py b/atr/models/results.py index 9db1b55..166a4ed 100644 --- a/atr/models/results.py +++ b/atr/models/results.py @@ -15,10 +15,12 @@ # specific language governing permissions and limitations # under the License. -from typing import Annotated, Any, Literal +from typing import Annotated, Literal import pydantic +import atr.sbom.models.osv as osv + from . import schema @@ -48,7 +50,7 @@ class SBOMGenerateCycloneDX(schema.Strict): class OSVComponent(schema.Strict): purl: str = schema.description("Package URL") - vulnerabilities: list[dict[str, Any]] = schema.description("Vulnerabilities found") + vulnerabilities: list[osv.VulnerabilityDetails] = schema.description("Vulnerabilities found") class SBOMOSVScan(schema.Strict): @@ -57,6 +59,7 @@ class SBOMOSVScan(schema.Strict): version_name: str = schema.description("Version name") revision_number: str = schema.description("Revision number") file_path: str = schema.description("Relative path to the scanned SBOM file") + new_file_path: str = schema.Field(default="", strict=False, description="Relative path to the updated SBOM file") components: list[OSVComponent] = schema.description("Components with vulnerabilities") ignored: list[str] = schema.description("Components ignored") @@ -120,6 +123,12 @@ class SBOMToolScore(schema.Strict): warnings: list[str] = schema.description("Warnings from the SBOM tool") errors: list[str] = schema.description("Errors from the SBOM tool") outdated: str | None = schema.description("Outdated tool from the SBOM tool") + vulnerabilities: list[str] | None = schema.Field( + default=None, strict=False, description="Vulnerabilities found in the SBOM" + ) + atr_props: list[dict[str, str]] | None = schema.Field( + default=None, strict=False, description="ATR properties found in the SBOM" + ) cli_errors: list[str] | None = schema.description("Errors from the CycloneDX CLI") diff --git a/atr/sbom/cli.py b/atr/sbom/cli.py index 4ef3bec..edc93fd 100644 --- a/atr/sbom/cli.py +++ b/atr/sbom/cli.py @@ -27,7 +27,7 @@ from .cyclonedx import validate_cli, validate_py from .licenses import check from .maven import plugin_outdated_version from .sbomqs import total_score -from .utilities import bundle_to_patch, patch_to_data, path_to_bundle +from .utilities import bundle_to_ntia_patch, bundle_to_vuln_patch, patch_to_data, path_to_bundle def command_license(bundle: models.bundle.Bundle) -> None: @@ -54,7 +54,7 @@ def command_license(bundle: models.bundle.Bundle) -> None: def command_merge(bundle: models.bundle.Bundle) -> None: - patch_ops = asyncio.run(bundle_to_patch(bundle)) + patch_ops = asyncio.run(bundle_to_ntia_patch(bundle)) if patch_ops: patch_data = patch_to_data(patch_ops) merged = bundle.doc.patch(yyjson.Document(patch_data)) @@ -75,11 +75,11 @@ def command_osv(bundle: models.bundle.Bundle) -> None: if ignored_count > 0: print(f"Warning: {ignored_count} components ignored (missing purl or version)") for component_result in results: - print(component_result.purl) + print(component_result.ref) for vuln in component_result.vulnerabilities: - vuln_id = vuln.get("id", "unknown") - modified = vuln.get("modified", "") - summary = vuln.get("summary", "(no summary)") + vuln_id = vuln.id + modified = vuln.modified + summary = vuln.summary print(f" {vuln_id} {modified} {summary}") @@ -91,8 +91,18 @@ def command_outdated(bundle: models.bundle.Bundle) -> None: print("no outdated tool found") -def command_patch(bundle: models.bundle.Bundle) -> None: - patch_ops = asyncio.run(bundle_to_patch(bundle)) +def command_patch_ntia(bundle: models.bundle.Bundle) -> None: + patch_ops = asyncio.run(bundle_to_ntia_patch(bundle)) + if patch_ops: + patch_data = patch_to_data(patch_ops) + print(yyjson.Document(patch_data).dumps()) + else: + print("no patch needed") + + +def command_patch_vuln(bundle: models.bundle.Bundle) -> None: + results, _ = asyncio.run(osv.scan_bundle(bundle)) + patch_ops = asyncio.run(bundle_to_vuln_patch(bundle, results)) if patch_ops: patch_data = patch_to_data(patch_ops) print(yyjson.Document(patch_data).dumps()) @@ -101,7 +111,7 @@ def command_patch(bundle: models.bundle.Bundle) -> None: def command_scores(bundle: models.bundle.Bundle) -> None: - patch_ops = asyncio.run(bundle_to_patch(bundle)) + patch_ops = asyncio.run(bundle_to_ntia_patch(bundle)) if patch_ops: patch_data = patch_to_data(patch_ops) merged = bundle.doc.patch(yyjson.Document(patch_data)) @@ -153,6 +163,9 @@ def command_where(bundle: models.bundle.Bundle) -> None: def main() -> None: # noqa: C901 + if len(sys.argv) < 3: + print("Usage: python -m atr.sbom <command> <sbom-path>") + sys.exit(1) path = pathlib.Path(sys.argv[2]) bundle = path_to_bundle(path) match sys.argv[1]: @@ -166,8 +179,10 @@ def main() -> None: # noqa: C901 command_osv(bundle) case "outdated": command_outdated(bundle) - case "patch": - command_patch(bundle) + case "patch-ntia": + command_patch_ntia(bundle) + case "patch-vuln": + command_patch_vuln(bundle) case "scores": command_scores(bundle) case "validate-cli": diff --git a/atr/sbom/models/bom.py b/atr/sbom/models/bom.py index 60c872f..3bc65f9 100644 --- a/atr/sbom/models/bom.py +++ b/atr/sbom/models/bom.py @@ -28,7 +28,7 @@ class Swid(Lax): class Supplier(Lax): name: str | None = None - url: str | None = None + url: list[str] | None = None class License(Lax): diff --git a/atr/sbom/models/osv.py b/atr/sbom/models/osv.py index 30cf8aa..ab5c6e3 100644 --- a/atr/sbom/models/osv.py +++ b/atr/sbom/models/osv.py @@ -19,14 +19,44 @@ from __future__ import annotations from typing import Any +import pydantic + from .base import Lax class QueryResult(Lax): - vulns: list[dict[str, Any]] | None = None + vulns: list[VulnerabilityDetails] | None = None next_page_token: str | None = None class ComponentVulnerabilities(Lax): - purl: str - vulnerabilities: list[dict[str, Any]] + ref: str + vulnerabilities: list[VulnerabilityDetails] + + +class VulnerabilityDetails(Lax): + id: str + summary: str | None = None + details: str | None = None + references: list[dict[str, Any]] | None = None + severity: list[dict[str, Any]] | None = None + published: str | None = None + modified: str + database_specific: dict[str, Any] = pydantic.Field(default={}) + + +class CdxVulnerabilityDetail(Lax): + bom_ref: str | None = pydantic.Field(default=None, alias="bom-ref") + id: str + source: dict[str, str] | None = None + description: str | None = None + detail: str | None = None + advisories: list[dict[str, str]] | None = None + cwes: list[int] | None = None + published: str | None = None + updated: str | None = None + affects: list[dict[str, str]] | None = None + ratings: list[dict[str, str | float]] | None = None + + +CdxVulnAdapter = pydantic.TypeAdapter(CdxVulnerabilityDetail) diff --git a/atr/sbom/osv.py b/atr/sbom/osv.py index fc84892..7008ef9 100644 --- a/atr/sbom/osv.py +++ b/atr/sbom/osv.py @@ -18,14 +18,66 @@ from __future__ import annotations import os -from typing import Any +from typing import TYPE_CHECKING, Any import aiohttp from . import models +from .utilities import get_pointer, osv_severity_to_cdx + +if TYPE_CHECKING: + import yyjson _DEBUG: bool = os.environ.get("DEBUG_SBOM_TOOL") == "1" _OSV_API_BASE: str = "https://api.osv.dev/v1" +_SOURCE_DATABASE_NAMES = { + "ASB": "Android Security Bulletin", + "PUB": "Android Security Bulletin", + "ALSA": "AlmaLinux Security Advisory", + "ALBA": "AlmaLinux Security Advisory", + "ALEA": "AlmaLinux Security Advisory", + "ALPINE": "Alpine Security Advisory", + "BELL": "BellSoft Security Advisory", + "BIT": "Bitnami Vulnerability Database", + "CGA": "Chainguard Security Notices", + "CURL": "Curl CVEs", + "CVE": "National Vulnerability Database", + "DEBIAN": "Debian Security Tracker", + "DSA": "Debian Security Advisory", + "DLA": "Debian Security Advisory", + "DTSA": "Debian Security Advisory", + "ECHO": "Echo Security Advisory", + "EEF": "Erlang Ecosystem Foundation CNA Vulnerabilities", + "ELA": "Debian Extended LTS Security Advisory", + "GHSA": "GitHub Security Advisory", + "GO": "Go Vulnerability Database", + "GSD": "Global Security Database", + "HSEC": "Haskell Security Advisory", + "JLSEC": "Julia Security Advisory", + "KUBE": "Kubernetes Official CVE Feed", + "LBSEC": "LoopBack Advisory Database", + "LSN": "Livepatch Security Notices", + "MGASA": "Mageia Security Advisory", + "MAL": "Malicious Packages Repository", + "MINI": "Minimus Security Notices", + "OESA": "openEuler Security Advisory", + "OSV": "OSV Advisory", + "PHSA": "VMWare Photon Security Advisory", + "PSF": "Python Software Foundation Vulnerability Database", + "PYSEC": "PyPI Vulnerability Database", + "RHSA": "Red Hat Security Data", + "RHBA": "Red Hat Security Data", + "RHEA": "Red Hat Security Data", + "RLSA": "Rocky Linux Security Advisory", + "RXSA": "Rocky Linux Security Advisory", + "RSEC": "RConsortium Advisory Database", + "RUSTSEC": "RustSec Advisory Database", + "SUSE": "SUSE Security Landing Page", + "openSUSE": "SUSE Security Landing Page", + "UBUNTU": "Ubuntu CVE Reports", + "USN": "Ubuntu Security Notices", + "V8": "V8/Chromium Time-Based Policy", +} async def scan_bundle(bundle: models.bundle.Bundle) -> tuple[list[models.osv.ComponentVulnerabilities], list[str]]: @@ -42,11 +94,76 @@ async def scan_bundle(bundle: models.bundle.Bundle) -> tuple[list[models.osv.Com print(f"[DEBUG] Total components with vulnerabilities: {len(component_vulns_map)}") await _scan_bundle_populate_vulnerabilities(session, component_vulns_map) result: list[models.osv.ComponentVulnerabilities] = [] - for purl, vulns in component_vulns_map.items(): - result.append(models.osv.ComponentVulnerabilities(purl=purl, vulnerabilities=vulns)) + for ref, vulns in component_vulns_map.items(): + result.append(models.osv.ComponentVulnerabilities(ref=ref, vulnerabilities=vulns)) return result, ignored +def vulns_from_bundle(bundle: models.bundle.Bundle) -> list[models.osv.CdxVulnerabilityDetail]: + vulns = get_pointer(bundle.doc, "/vulnerabilities") + if vulns is None: + return [] + print(vulns) + return [models.osv.CdxVulnerabilityDetail.model_validate(v) for v in vulns] + + +async def vuln_patch( + session: aiohttp.ClientSession, + doc: yyjson.Document, + components: list[models.osv.ComponentVulnerabilities], +) -> models.patch.Patch: + patch_ops: models.patch.Patch = [] + _assemble_vulnerabilities(doc, patch_ops) + ix = 0 + for c in components: + for vuln in c.vulnerabilities: + _assemble_component_vulnerability(doc, patch_ops, c.ref, vuln, ix) + ix += 1 + return patch_ops + + +def _assemble_vulnerabilities(doc: yyjson.Document, patch_ops: models.patch.Patch) -> None: + if get_pointer(doc, "/vulnerabilities") is not None: + patch_ops.append(models.patch.RemoveOp(op="remove", path="/vulnerabilities")) + patch_ops.append( + models.patch.AddOp( + op="add", + path="/vulnerabilities", + value=[], + ) + ) + + +def _assemble_component_vulnerability( + doc: yyjson.Document, patch_ops: models.patch.Patch, ref: str, vuln: models.osv.VulnerabilityDetails, index: int +) -> None: + vulnerability = { + "bom-ref": f"vuln:{ref}/{vuln.id}", + "id": vuln.id, + "source": _get_source(vuln), + "description": vuln.summary, + "detail": vuln.details, + "cwes": [int(r.replace("CWE-", "")) for r in vuln.database_specific.get("cwe_ids", [])], + "published": vuln.published, + "updated": vuln.modified, + "affects": [{"ref": ref}], + "ratings": osv_severity_to_cdx(vuln.severity, vuln.database_specific.get("severity", "")), + } + if vuln.references is not None: + vulnerability["advisories"] = [ + {"url": r["url"]} + for r in vuln.references + if (r.get("type", "") == "WEB" and "advisories" in r.get("url", "")) or r.get("type", "") == "ADVISORY" + ] + patch_ops.append( + models.patch.AddOp( + op="add", + path=f"/vulnerabilities/{index!s}", + value=vulnerability, + ) + ) + + def _component_purl_with_version(component: models.bom.Component) -> str | None: if component.purl is None: return None @@ -90,7 +207,7 @@ async def _fetch_vulnerabilities_for_batch( async def _fetch_vulnerability_details( session: aiohttp.ClientSession, vuln_id: str, -) -> dict[str, Any]: +) -> models.osv.VulnerabilityDetails: if _DEBUG: print(f"[DEBUG] Fetching details for {vuln_id}") async with session.get(f"{_OSV_API_BASE}/vulns/{vuln_id}") as response: @@ -98,12 +215,24 @@ async def _fetch_vulnerability_details( return await response.json() +def _get_source(vuln: models.osv.VulnerabilityDetails) -> dict[str, str]: + db = vuln.id.split("-")[0] + web_refs = list(filter(lambda v: v.get("type", "") == "WEB", vuln.references)) if vuln.references else [] + first_ref = web_refs[0] if len(web_refs) > 0 else None + + name = _SOURCE_DATABASE_NAMES.get(db, "Unknown Database") + source = {"name": name} + if first_ref is not None: + source["url"] = first_ref.get("url", "") + return source + + async def _paginate_query( session: aiohttp.ClientSession, query: dict[str, Any], page_token: str, -) -> list[dict[str, Any]]: - all_vulns: list[dict[str, Any]] = [] +) -> list[models.osv.VulnerabilityDetails]: + all_vulns: list[models.osv.VulnerabilityDetails] = [] current_query = query.copy() current_query["page_token"] = page_token page = 0 @@ -135,7 +264,8 @@ def _scan_bundle_build_queries( ignored.append(component.name) continue query = {"package": {"purl": purl_with_version}} - queries.append((purl_with_version, query)) + if component.bom_ref is not None: + queries.append((component.bom_ref, query)) return queries, ignored @@ -143,31 +273,31 @@ async def _scan_bundle_fetch_vulnerabilities( session: aiohttp.ClientSession, queries: list[tuple[str, dict[str, Any]]], batch_size: int, -) -> dict[str, list[dict[str, Any]]]: - component_vulns_map: dict[str, list[dict[str, Any]]] = {} +) -> dict[str, list[models.osv.VulnerabilityDetails]]: + component_vulns_map: dict[str, list[models.osv.VulnerabilityDetails]] = {} for batch_start in range(0, len(queries), batch_size): batch_end = min(batch_start + batch_size, len(queries)) batch = queries[batch_start:batch_end] if _DEBUG: batch_num = batch_start // batch_size + 1 print(f"[DEBUG] Processing batch {batch_num} ({batch_start + 1}-{batch_end}/{len(queries)})") - batch_queries = [query for _purl, query in batch] + batch_queries = [query for _ref, query in batch] batch_results = await _fetch_vulnerabilities_for_batch(session, batch_queries) if _DEBUG and (len(batch_results) != len(batch)): print(f"[DEBUG] count mismatch (expected {len(batch)}, got {len(batch_results)})") - for i, (purl, query) in enumerate(batch): + for i, (ref, query) in enumerate(batch): if i >= len(batch_results): break query_result = batch_results[i] if query_result.vulns: - existing_vulns = component_vulns_map.setdefault(purl, []) + existing_vulns = component_vulns_map.setdefault(ref, []) existing_vulns.extend(query_result.vulns) if _DEBUG: - print(f"[DEBUG] {purl}: {len(query_result.vulns)} vulnerabilities") + print(f"[DEBUG] {ref}: {len(query_result.vulns)} vulnerabilities") if query_result.next_page_token: if _DEBUG: - print(f"[DEBUG] {purl}: has pagination, fetching remaining pages") - existing_vulns = component_vulns_map.setdefault(purl, []) + print(f"[DEBUG] {ref}: has pagination, fetching remaining pages") + existing_vulns = component_vulns_map.setdefault(ref, []) paginated = await _paginate_query(session, query, query_result.next_page_token) existing_vulns.extend(paginated) return component_vulns_map @@ -175,19 +305,19 @@ async def _scan_bundle_fetch_vulnerabilities( async def _scan_bundle_populate_vulnerabilities( session: aiohttp.ClientSession, - component_vulns_map: dict[str, list[dict[str, Any]]], + component_vulns_map: dict[str, list[models.osv.VulnerabilityDetails]], ) -> None: - details_cache: dict[str, dict[str, Any]] = {} + details_cache: dict[str, models.osv.VulnerabilityDetails] = {} for vulns in component_vulns_map.values(): for vuln in vulns: - vuln_id = vuln.get("id") + vuln_id = vuln.id if not vuln_id: continue details = details_cache.get(vuln_id) if details is None: details = await _fetch_vulnerability_details(session, vuln_id) details_cache[vuln_id] = details - vuln.clear() - vuln.update(details) + vuln.__dict__.clear() + vuln.__dict__.update(details) if _DEBUG: print(f"[DEBUG] Fetched details for {len(details_cache)} unique vulnerabilities") diff --git a/atr/sbom/utilities.py b/atr/sbom/utilities.py index 496deb2..36a2cf5 100644 --- a/atr/sbom/utilities.py +++ b/atr/sbom/utilities.py @@ -17,18 +17,27 @@ from __future__ import annotations +import re from typing import TYPE_CHECKING, Any +import cvss + if TYPE_CHECKING: import pathlib + import atr.sbom.models.osv as osv + import aiohttp import yyjson from . import models +_SCORING_METHODS_OSV = {"CVSS_V2": "CVSSv2", "CVSS_V3": "CVSSv3", "CVSS_V4": "CVSSv4"} +_SCORING_METHODS_CDX = {"CVSSv2": "CVSS_V2", "CVSSv3": "CVSS_V3", "CVSSv4": "CVSS_V4", "other": "Other"} +_CDX_SEVERITIES = ["critical", "high", "medium", "low", "info", "none", "unknown"] + -async def bundle_to_patch(bundle_value: models.bundle.Bundle) -> models.patch.Patch: +async def bundle_to_ntia_patch(bundle_value: models.bundle.Bundle) -> models.patch.Patch: from .conformance import ntia_2021_issues, ntia_2021_patch _warnings, errors = ntia_2021_issues(bundle_value.bom) @@ -37,6 +46,17 @@ async def bundle_to_patch(bundle_value: models.bundle.Bundle) -> models.patch.Pa return patch_ops +async def bundle_to_vuln_patch( + bundle_value: models.bundle.Bundle, vulnerabilities: list[osv.ComponentVulnerabilities] +) -> models.patch.Patch: + from .osv import vuln_patch + + # TODO: May not need session (copied from ntia patch) + async with aiohttp.ClientSession() as session: + patch_ops = await vuln_patch(session, bundle_value.doc, vulnerabilities) + return patch_ops + + def get_pointer(doc: yyjson.Document, path: str) -> Any | None: try: return doc.get_pointer(path) @@ -47,6 +67,13 @@ def get_pointer(doc: yyjson.Document, path: str) -> Any | None: raise +def get_atr_props_from_bundle(bundle_value: models.bundle.Bundle) -> list[dict[str, str]]: + properties: list[dict[str, str]] | None = get_pointer(bundle_value.doc, "/properties") + if properties is None: + return [] + return [p for p in properties if "asf:atr:" in p.get("name", "")] + + def patch_to_data(patch_ops: models.patch.Patch) -> list[dict[str, Any]]: return [op.model_dump(by_alias=True, exclude_none=True) for op in patch_ops] @@ -55,3 +82,85 @@ def path_to_bundle(path: pathlib.Path) -> models.bundle.Bundle: text = path.read_text(encoding="utf-8") bom = models.bom.Bom.model_validate_json(text) return models.bundle.Bundle(doc=yyjson.Document(text), bom=bom, path=path, text=text) + + +def record_task(task: str, revision: str, doc: yyjson.Document, patch_ops: models.patch.Patch) -> models.patch.Patch: + properties: list[dict[str, str]] | None = get_pointer(doc, "/properties") + operation = {"name": f"asf:atr:{task}", "value": revision} + if properties is None: + patch_ops.append(models.patch.AddOp(op="add", path="/properties", value=[operation])) + else: + properties.append(operation) + patch_ops.append(models.patch.ReplaceOp(op="replace", path="/properties", value=properties)) + return patch_ops + + +def osv_severity_to_cdx(severity: list[dict[str, Any]] | None, textual: str) -> list[dict[str, str | float]] | None: + if severity is not None: + return [ + { + "severity": _map_severity(textual), + "method": _SCORING_METHODS_OSV.get(s.get("type", ""), "other"), + **_extract_cdx_score(_SCORING_METHODS_OSV.get(s.get("type", ""), "other"), s.get("score", "")), + } + for s in severity + ] + return None + + +def cdx_severity_to_osv(severity: list[dict[str, str | float]]) -> tuple[str | None, list[dict[str, str]]]: + severities = [ + { + "score": str(s.get("score", str(s.get("vector", "")))), + "type": _SCORING_METHODS_CDX.get(str(s.get("method", "other"))), + } + for s in severity + ] + textual = severity[0].get("severity") + return str(textual), severities + + +def _extract_cdx_score(type: str, score_str: str) -> dict[str, str | float]: + if "CVSS" in score_str or "CVSS" in type: + components = re.match(r"CVSS:(?P<version>\d+\.?\d*)/.+", score_str) + parsed = None + vector = score_str + if components is None: + # CVSS2 doesn't include the version in the string, but we know this is a CVSS vector + parsed = cvss.CVSS2(vector) + else: + version = components.group("version") + if "3" in version or "V3" in type: + parsed = cvss.CVSS3(vector) + elif "4" in version or "V4" in type: + parsed = cvss.CVSS4(vector) + if parsed is not None: + # Pull a different score depending on which sections are filled out + scores = parsed.scores() + severities = parsed.severities() + score: float | None = next((s for s in reversed(scores) if s is not None), None) + severity = next((s for s in reversed(severities) if s is not None), "unknown") + result: dict[str, str | float] = {"vector": vector, "severity": _map_severity(severity)} + if score is not None: + result["score"] = score + return result + # Some vector that failed to parse + return {"vector": score_str} + else: + try: + # Maybe the score is just a numeric score + return {"score": float(score_str)} + except ValueError: + # If not, it must just be a string (eg. Ubuntu scoring system) + return {"severity": score_str} + + +def _map_severity(severity: str) -> str: + sev = severity.lower() + if sev in _CDX_SEVERITIES: + return sev + else: + # Map known github values + if sev == "moderate": + return "medium" + return "unknown" diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py index e837a87..d07dd0d 100644 --- a/atr/tasks/sbom.py +++ b/atr/tasks/sbom.py @@ -85,9 +85,10 @@ async def augment(args: FileArgs) -> results.Results | None: raise SBOMScoringError("SBOM file does not exist", {"file_path": args.file_path}) # Read from the old revision bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path)) - patch_ops = await sbom.utilities.bundle_to_patch(bundle) + patch_ops = await sbom.utilities.bundle_to_ntia_patch(bundle) new_full_path: str | None = None if patch_ops: + sbom.utilities.record_task("augment", args.revision_number, bundle.doc, patch_ops) patch_data = sbom.utilities.patch_to_data(patch_ops) merged = bundle.doc.patch(yyjson.Document(patch_data)) description = "SBOM augmentation through web interface" @@ -140,13 +141,37 @@ async def osv_scan(args: FileArgs) -> results.Results | None: raise SBOMScanningError("SBOM file does not exist", {"file_path": args.file_path}) bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path)) vulnerabilities, ignored = await sbom.osv.scan_bundle(bundle) - components = [results.OSVComponent(purl=v.purl, vulnerabilities=v.vulnerabilities) for v in vulnerabilities] + patch_ops = await sbom.utilities.bundle_to_vuln_patch(bundle, vulnerabilities) + components = [results.OSVComponent(purl=v.ref, vulnerabilities=v.vulnerabilities) for v in vulnerabilities] + + new_full_path: str | None = None + if patch_ops: + sbom.utilities.record_task("osv-scan", args.revision_number, bundle.doc, patch_ops) + patch_data = sbom.utilities.patch_to_data(patch_ops) + merged = bundle.doc.patch(yyjson.Document(patch_data)) + description = "SBOM vulnerability scan through web interface" + async with storage.write(args.asf_uid) as write: + wacp = await write.as_project_committee_participant(args.project_name) + async with wacp.revision.create_and_manage( + args.project_name, args.version_name, args.asf_uid or "unknown", description=description + ) as creating: + new_full_path = os.path.join(str(creating.interim_path), args.file_path) + # Write to the new revision + log.info(f"Writing updated SBOM to {new_full_path}") + await aiofiles.os.remove(new_full_path) + async with aiofiles.open(new_full_path, "w", encoding="utf-8") as f: + await f.write(merged.dumps()) + + if creating.new is None: + raise RuntimeError("Internal error: New revision not found") + return results.SBOMOSVScan( kind="sbom_osv_scan", project_name=args.project_name, version_name=args.version_name, revision_number=args.revision_number, - file_path=args.file_path, + file_path=full_path, + new_file_path=new_full_path or full_path, components=components, ignored=ignored, ) @@ -196,8 +221,10 @@ async def score_tool(args: FileArgs) -> results.Results | None: if not (full_path.endswith(".cdx.json") and os.path.isfile(full_path)): raise SBOMScoringError("SBOM file does not exist", {"file_path": args.file_path}) bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path)) + properties = sbom.utilities.get_atr_props_from_bundle(bundle) warnings, errors = sbom.conformance.ntia_2021_issues(bundle.bom) outdated = sbom.maven.plugin_outdated_version(bundle.bom) + vulnerabilities = sbom.osv.vulns_from_bundle(bundle) cli_errors = sbom.cyclonedx.validate_cli(bundle) return results.SBOMToolScore( kind="sbom_tool_score", @@ -208,6 +235,8 @@ async def score_tool(args: FileArgs) -> results.Results | None: warnings=[w.model_dump_json() for w in warnings], errors=[e.model_dump_json() for e in errors], outdated=outdated.model_dump_json() if outdated else None, + vulnerabilities=[v.model_dump_json() for v in vulnerabilities], + atr_props=properties, cli_errors=cli_errors, ) diff --git a/atr/templates/check-selected-path-table.html b/atr/templates/check-selected-path-table.html index 1c8a81d..0c8b101 100644 --- a/atr/templates/check-selected-path-table.html +++ b/atr/templates/check-selected-path-table.html @@ -67,7 +67,7 @@ {% endif %} {% if path.suffixes[-2:] == [".cdx", ".json"] %} <a href="{{ as_url(get.sbom.report, project=project_name, version=version_name, file_path=path) }}" - class="btn btn-sm btn-outline-secondary">Show SBOM</a> + class="btn btn-sm btn-outline-secondary">SBOM report</a> {% endif %} {% if has_errors %} <a href="{{ as_url(get.report.selected_path, project_name=project_name, version_name=version_name, rel_path=path) }}" diff --git a/docker-compose.yml b/docker-compose.yml index 6baa6f5..64e7da5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,8 @@ services: - BIND=0.0.0.0:8080 - SECRET_KEY=insecure-test-key - SSH_HOST=0.0.0.0 + - LDAP_BIND_DN=$LDAP_BIND_DN + - LDAP_BIND_PASSWORD=$LDAP_BIND_PASSWORD networks: - atr-network volumes: diff --git a/pyproject.toml b/pyproject.toml index 686190b..6c41378 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "blockbuster>=1.5.23,<2.0.0", "cmarkgfm>=2024.11.20", "cryptography~=44.0", + "cvss~=3.6", "cyclonedx-python-lib[json-validation]>=11.0.0", # "dkimpy @ git+https://github.com/sbp/dkimpy.git@main", "dnspython>=2.7.0,<3.0.0", diff --git a/uv.lock b/uv.lock index 8b8c044..c2822c7 100644 --- a/uv.lock +++ b/uv.lock @@ -393,6 +393,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/63/51/ef6c5628e46092f0a54c7cee69acc827adc6b6aab57b55d344fefbdf28f1/cssbeautifier-1.15.4-py3-none-any.whl", hash = "sha256:78c84d5e5378df7d08622bbd0477a1abdbd209680e95480bf22f12d5701efc98", size = 123667, upload-time = "2025-02-27T17:53:43.594Z" }, ] +[[package]] +name = "cvss" +version = "3.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a5/f6/1f7d315de23f39bbc32b94bb6b33a1b6124856037bfaa3f8bdb1a49584fa/cvss-3.6.tar.gz", hash = "sha256:f21d18224efcd3c01b44ff1b37dec2e3208d29a6d0ce6c87a599c73c21ee1a99", size = 30047, upload-time = "2025-08-04T10:50:13.753Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/63/6d/fe2c65b94a28ae0481dc254e8cd664b82390069003bea945076d8a445f2b/cvss-3.6-py2.py3-none-any.whl", hash = "sha256:e342c6ad9c7eb69d2aebbbc2768a03cabd57eb947c806e145de5b936219833ea", size = 31154, upload-time = "2025-08-04T10:50:12.328Z" }, +] + [[package]] name = "cyclonedx-python-lib" version = "11.5.0" @@ -1778,6 +1787,7 @@ dependencies = [ { name = "blockbuster" }, { name = "cmarkgfm" }, { name = "cryptography" }, + { name = "cvss" }, { name = "cyclonedx-python-lib", extra = ["json-validation"] }, { name = "dnspython" }, { name = "dunamai" }, @@ -1834,6 +1844,7 @@ requires-dist = [ { name = "blockbuster", specifier = ">=1.5.23,<2.0.0" }, { name = "cmarkgfm", specifier = ">=2024.11.20" }, { name = "cryptography", specifier = "~=44.0" }, + { name = "cvss", specifier = "~=3.6" }, { name = "cyclonedx-python-lib", extras = ["json-validation"], specifier = ">=11.0.0" }, { name = "dnspython", specifier = ">=2.7.0,<3.0.0" }, { name = "dunamai", specifier = ">=1.23.0" }, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
