This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch previous_sbom_results in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 3f2a25979d0b74c974dd745bd53ba2d7b24f272d Author: Alastair McFarlane <[email protected]> AuthorDate: Tue Dec 23 17:46:03 2025 +0000 Pull previous SBOM results into the report and highlight new/changed vulnerabilities and licenses. --- atr/get/sbom.py | 358 +++++++++++++++++++++----------- atr/htm.py | 5 + atr/models/results.py | 9 + atr/sbom/cli.py | 2 +- atr/sbom/licenses.py | 18 +- atr/sbom/models/base.py | 2 +- atr/static/css/bootstrap.custom.css | 4 + atr/static/css/bootstrap.custom.css.map | 2 +- atr/tasks/__init__.py | 13 ++ atr/tasks/sbom.py | 35 +++- atr/util.py | 47 +++++ bootstrap/source/custom.scss | 4 + 12 files changed, 371 insertions(+), 128 deletions(-) diff --git a/atr/get/sbom.py b/atr/get/sbom.py index d60ca1a..6782a88 100644 --- a/atr/get/sbom.py +++ b/atr/get/sbom.py @@ -44,6 +44,9 @@ if TYPE_CHECKING: async def report(session: web.Committer, project: str, version: str, file_path: str) -> str: await session.check_access(project) + block = htm.Block() + block.h1["SBOM report"] + # If the draft is not found, we try to get the release candidate try: release = await session.release(project, version, with_committee=True) @@ -51,9 +54,48 @@ async def report(session: web.Committer, project: str, version: str, file_path: release = await session.release(project, version, phase=sql.ReleasePhase.RELEASE_CANDIDATE, with_committee=True) is_release_candidate = release.phase == sql.ReleasePhase.RELEASE_CANDIDATE + task, augment_tasks, osv_tasks = await _fetch_tasks(file_path, project, release, version) + + task_status = await _report_task_results(block, task) + if task_status: + return task_status + + if task is None or (not isinstance(task.result, results.SBOMToolScore)): + raise base.ASFQuartException("Invalid SBOM score result", errorcode=500) + + task_result = task.result + _report_header(block, is_release_candidate, release, task_result) + + if not is_release_candidate: + latest_augment = None + last_augmented_bom = None + if len(augment_tasks) > 0: + latest_augment = augment_tasks[0] + augment_results: list[Any] = [t.result for t in augment_tasks] + augmented_bom_versions = [ + r.bom_version for r in augment_results if (r is not None) and (r.bom_version is not None) + ] + if len(augmented_bom_versions) > 0: + last_augmented_bom = max(augmented_bom_versions) + _augment_section(block, release, task_result, latest_augment, last_augmented_bom) + + _conformance_section(block, task_result) + _license_section(block, task_result) + + _vulnerability_scan_section(block, project, version, file_path, task_result, osv_tasks, is_release_candidate) + + _outdated_tool_section(block, task_result) + + _cyclonedx_cli_errors(block, task_result) + + return await template.blank("SBOM report", content=block.collect()) + + +async def _fetch_tasks( + file_path: str, project: str, release: sql.Release, version: str +) -> tuple[sql.Task | None, collections.abc.Sequence[sql.Task], collections.abc.Sequence[sql.Task]]: async with db.session() as data: via = sql.validate_instrumented_attribute - # TODO: Abstract this code and the sbomtool.MissingAdapter validators tasks = ( await data.task( project_name=project, @@ -87,50 +129,7 @@ async def report(session: web.Committer, project: str, version: str, file_path: .order_by(sql.sqlmodel.desc(via(sql.Task.added))) .all() ) - - block = htm.Block() - block.h1["SBOM report"] - - await _report_task_results(block, list(tasks)) - - task_result = tasks[0].result - if not isinstance(task_result, results.SBOMToolScore): - raise base.ASFQuartException("Invalid SBOM score result", errorcode=500) - - _report_header(block, is_release_candidate, release, task_result) - - if not is_release_candidate: - latest_augment = None - last_augmented_bom = None - if len(augment_tasks) > 0: - latest_augment = augment_tasks[0] - augment_results: list[Any] = [t.result for t in augment_tasks] - augmented_bom_versions = [ - r.bom_version for r in augment_results if (r is not None) and (r.bom_version is not None) - ] - if len(augmented_bom_versions) > 0: - last_augmented_bom = max(augmented_bom_versions) - _augment_section(block, release, task_result, latest_augment, last_augmented_bom) - - _conformance_section(block, task_result) - _license_section(block, task_result) - - if task_result.vulnerabilities is not None: - vulnerabilities = [ - sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in task_result.vulnerabilities - ] - else: - vulnerabilities = [] - - _vulnerability_scan_section( - block, project, version, file_path, task_result, vulnerabilities, osv_tasks, is_release_candidate - ) - - _outdated_tool_section(block, task_result) - - _cyclonedx_cli_errors(block, task_result) - - return await template.blank("SBOM report", content=block.collect()) + return tasks[0] if len(tasks) > 0 else None, augment_tasks, osv_tasks def _outdated_tool_section(block: htm.Block, task_result: results.SBOMToolScore): @@ -178,18 +177,18 @@ def _outdated_tool_section(block: htm.Block, task_result: results.SBOMToolScore) def _conformance_section(block: htm.Block, task_result: results.SBOMToolScore) -> None: + block.h2["Conformance report"] warnings = [sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in task_result.warnings] errors = [sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in task_result.errors] if warnings: - block.h2["Warnings"] + block.h3[htm.icon("bi-exclamation-triangle-fill", ".me-2.text-warning"), "Warnings"] _missing_table(block, warnings) if errors: - block.h2["Errors"] + block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"] _missing_table(block, errors) if not (warnings or errors): - block.h2["Conformance report"] block.p["No NTIA 2021 minimum data field conformance warnings or errors found."] @@ -197,22 +196,29 @@ def _license_section(block: htm.Block, task_result: results.SBOMToolScore) -> No block.h2["Licenses"] warnings = [] errors = [] + prev_licenses = [] + if task_result.prev_licenses is not None: + prev_licenses = _load_license_issues(task_result.prev_licenses) if task_result.license_warnings is not None: - warnings = [sbom.models.licenses.Issue.model_validate(json.loads(w)) for w in task_result.license_warnings] + warnings = _load_license_issues(task_result.license_warnings) if task_result.license_errors is not None: - errors = [sbom.models.licenses.Issue.model_validate(json.loads(e)) for e in task_result.license_errors] + errors = _load_license_issues(task_result.license_errors) if warnings: - block.h3["Warnings"] - _license_table(block, warnings) + block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"), "Warnings"] + _license_table(block, warnings, prev_licenses) if errors: - block.h3["Errors"] - _license_table(block, errors) + block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"] + _license_table(block, errors, prev_licenses) if not (warnings or errors): block.p["No license warnings or errors found."] +def _load_license_issues(issues: list[str]) -> list[sbom.models.licenses.Issue]: + return [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in issues] + + def _report_header( block: htm.Block, is_release_candidate: bool, release: sql.Release, task_result: results.SBOMToolScore ) -> None: @@ -229,14 +235,14 @@ def _report_header( block.p[f"This report is for the latest {release.version} release candidate."] -async def _report_task_results(block: htm.Block, tasks: list[sql.Task]): - if not tasks: +async def _report_task_results(block: htm.Block, task: sql.Task | None): + if task is None: block.p["No SBOM score found."] return await template.blank("SBOM report", content=block.collect()) - task_status = tasks[0].status - task_error = tasks[0].error - if task_status == sql.TaskStatus.QUEUED: + task_status = task.status + task_error = task.error + if task_status == sql.TaskStatus.QUEUED or task_status == sql.TaskStatus.ACTIVE: block.p["SBOM score is being computed."] return await template.blank("SBOM report", content=block.collect()) @@ -314,7 +320,11 @@ def _extract_vulnerability_severity(vuln: osv.VulnerabilityDetails) -> str: return "Unknown" -def _license_table(block: htm.Block, items: list[sbom.models.licenses.Issue]) -> None: +def _license_table( + block: htm.Block, + items: list[sbom.models.licenses.Issue], + prev: list[sbom.models.licenses.Issue], +) -> None: warning_rows = [ htm.tr[ htm.td[ @@ -322,9 +332,9 @@ def _license_table(block: htm.Block, items: list[sbom.models.licenses.Issue]) -> if (len(components) == 0) else htm.details[htm.summary[f"Category {category!s}"], htm.div[_detail_table(components)]] ], - htm.td[str(count)], + htm.td[f"{count!s} ({new} new)"], ] - for category, count, components in _license_tally(items) + for category, count, new, components in _license_tally(items, prev) ] block.table(".table.table-sm.table-bordered.table-striped")[ htm.thead[htm.tr[htm.th["License Category"], htm.th["Count"]]], @@ -353,7 +363,7 @@ def _missing_table(block: htm.Block, items: list[sbom.models.conformance.Missing def _detail_table(components: list[str | None]): return htm.table(".table.table-sm.table-bordered.table-striped")[ - htm.tbody[[htm.tr[htm.td[comp.capitalize()]] for comp in components if comp is not None]], + htm.tbody[[htm.tr[htm.td[comp]] for comp in components if comp is not None]], ] @@ -373,33 +383,66 @@ def _missing_tally(items: list[sbom.models.conformance.Missing]) -> list[tuple[s ) +# TODO: Update this to return either a block or something we can use later in a block for styling reasons def _license_tally( items: list[sbom.models.licenses.Issue], -) -> list[tuple[sbom.models.licenses.Category, int, list[str | None]]]: + old_issues: list[sbom.models.licenses.Issue], +) -> list[tuple[sbom.models.licenses.Category, int, int, list[str | None]]]: counts: dict[sbom.models.licenses.Category, int] = {} components: dict[sbom.models.licenses.Category, list[str | None]] = {} + new_count = 0 + old_map = {lic.component_name: (lic.license_expression, lic.category) for lic in old_issues} for item in items: key = item.category counts[key] = counts.get(key, 0) + 1 + name = str(item).capitalize() + if item.component_name not in old_map: + new_count = new_count + 1 + name = f"{name} (new)" + elif item.license_expression != old_map[item.component_name][0]: + new_count = new_count + 1 + name = f"{name} (previously {old_map[item.component_name][0]} - Category { + str(old_map[item.component_name][1]).upper() + })" if key not in components: - components[key] = [str(item)] + components[key] = [name] else: - components[key].append(str(item)) + components[key].append(name) return sorted( - [(category, count, components.get(category, [])) for category, count in counts.items()], + [(category, count, new_count, components.get(category, [])) for category, count in counts.items()], key=lambda kv: kv[0].value, ) -def _vulnerability_component_details_osv(block: htm.Block, component: results.OSVComponent) -> None: - details_content = [] - summary_element = htm.summary[ - htm.span(".badge.bg-danger.me-2.font-monospace")[str(len(component.vulnerabilities))], - htm.strong[component.purl], - ] - details_content.append(summary_element) - +def _severity_to_style(severity: str) -> str: + match severity.lower(): + case "critical": + return ".bg-danger.text-light" + case "high": + return ".bg-danger.text-light" + case "medium": + return ".bg-warning.text-dark" + case "moderate": + return ".bg-warning.text-dark" + case "low": + return ".bg-warning.text-dark" + case "info": + return ".bg-info.text-light" + return ".bg-info.text-light" + + +def _vulnerability_component_details_osv( + block: htm.Block, + component: results.OSVComponent, + previous_vulns: dict[str, str] | None, # id: severity +) -> int: + severities = ["critical", "high", "medium", "moderate", "low", "info", "none", "unknown"] + new = 0 + worst = 99 + + vuln_details = [] for vuln in component.vulnerabilities: + is_new = False vuln_id = vuln.id or "Unknown" vuln_summary = vuln.summary vuln_refs = [] @@ -407,11 +450,29 @@ def _vulnerability_component_details_osv(block: htm.Block, component: results.OS vuln_refs = [r for r in vuln.references if r.get("type", "") == "WEB"] vuln_primary_ref = vuln_refs[0] if (len(vuln_refs) > 0) else {} vuln_modified = vuln.modified or "Unknown" + vuln_severity = _extract_vulnerability_severity(vuln) + worst = _update_worst_severity(severities, vuln_severity, worst) + + if previous_vulns is not None: + if (vuln_id not in previous_vulns) or previous_vulns[vuln_id] != vuln_severity: + is_new = True + new = new + 1 vuln_header = [htm.a(href=vuln_primary_ref.get("url", ""), target="_blank")[htm.strong(".me-2")[vuln_id]]] - if vuln_severity != "Unknown": - vuln_header.append(htm.span(".badge.bg-warning.text-dark")[vuln_severity]) + style = f".badge.me-2{_severity_to_style(vuln_severity)}" + vuln_header.append(htm.span(style)[vuln_severity]) + + if (previous_vulns is not None) and is_new: + if vuln_id in previous_vulns: # If it's there, the sev must have changed + vuln_header.append(htm.icon("arrow-left", ".me-2")) + vuln_header.append( + htm.span(f".badge{_severity_to_style(previous_vulns[vuln_id])}.text-strike")[ + previous_vulns[vuln_id] + ] + ) + else: + vuln_header.append(htm.span(".badge.bg-info.text-light")["new"]) details = markupsafe.Markup(cmarkgfm.github_flavored_markdown_to_html(vuln.details)) vuln_div = htm.div(".ms-3.mb-3.border-start.border-warning.border-3.ps-3")[ @@ -423,13 +484,31 @@ def _vulnerability_component_details_osv(block: htm.Block, component: results.OS ], htm.div(".mt-2.text-muted")[details or "No additional details available."], ] - details_content.append(vuln_div) - + vuln_details.append(vuln_div) + + badge_style = "" + if worst < len(severities): + badge_style = _severity_to_style(severities[worst]) + summary_elements = [htm.span(f".badge{badge_style}.me-2.font-monospace")[str(len(component.vulnerabilities))]] + if new > 0: + summary_elements.append(htm.span(".badge.me-2.bg-info")[f"{new!s} new"]) + summary_elements.append(htm.strong[component.purl]) + details_content = [htm.summary[*summary_elements], *vuln_details] block.append(htm.details(".mb-3.rounded")[*details_content]) + return new + + +def _update_worst_severity(severities: list[str], vuln_severity: str, worst: int) -> int: + try: + sev_index = severities.index(vuln_severity) + except ValueError: + sev_index = 99 + worst = min(worst, sev_index) + return worst def _vulnerability_scan_button(block: htm.Block) -> None: - block.p["No new vulnerability scan has been performed for this revision."] + block.p["You can perform a new vulnerability scan."] form.render_block( block, @@ -463,58 +542,86 @@ def _vulnerability_scan_find_in_progress_task( def _vulnerability_scan_results( - block: htm.Block, vulns: list[osv.CdxVulnerabilityDetail], scans: list[str], task: sql.Task | None + block: htm.Block, + vulns: list[osv.CdxVulnerabilityDetail], + scans: list[str], + task: sql.Task | None, + prev: list[osv.CdxVulnerabilityDetail] | None, ) -> None: + previous_vulns = None + if prev is not None: + previous_osv = [_cdx_to_osv(v) for v in prev] + previous_vulns = {v.id: _extract_vulnerability_severity(v) for v in previous_osv} if task is not None: - task_result = task.result - if not isinstance(task_result, results.SBOMOSVScan): - block.p["Invalid scan result format."] - return + _vulnerability_results_from_scan(task, block, previous_vulns) + else: + _vulnerability_results_from_bom(vulns, block, scans, previous_vulns) - components = task_result.components - ignored = task_result.ignored - ignored_count = len(ignored) - if not components: - block.p["No vulnerabilities found."] - if ignored_count > 0: - component_word = "component was" if (ignored_count == 1) else "components were" - block.p[f"{ignored_count} {component_word} ignored due to missing PURL or version information:"] - block.p[f"{','.join(ignored)}"] - return +def _vulnerability_results_from_bom( + vulns: list[osv.CdxVulnerabilityDetail], block: htm.Block, scans: list[str], previous_vulns: dict[str, str] | None +) -> None: + total_new = 0 + new_block = htm.Block() + if len(vulns) == 0: + block.p["No vulnerabilities listed in this SBOM."] + return + components = {a.get("ref", "") for v in vulns if v.affects is not None for a in v.affects} + + if len(scans) > 0: + block.p["This SBOM was scanned for vulnerabilities at revision ", htm.code[scans[-1]], "."] + + for component in components: + new = _vulnerability_component_details_osv( + new_block, + results.OSVComponent( + purl=component, + vulnerabilities=[ + _cdx_to_osv(v) + for v in vulns + if (v.affects is not None) and (component in [a.get("ref") for a in v.affects]) + ], + ), + previous_vulns, + ) + total_new = total_new + new + + new_str = f" ({total_new!s} new since last release)" if (total_new > 0) else "" + block.p[f"Vulnerabilities{new_str} found in {len(components)} components:"] + block.append(new_block) - block.p[f"Scan found vulnerabilities in {len(components)} components:"] - for component in components: - _vulnerability_component_details_osv(block, component) +def _vulnerability_results_from_scan(task: sql.Task, block: htm.Block, previous_vulns: dict[str, str] | None) -> None: + total_new = 0 + new_block = htm.Block() + task_result = task.result + if not isinstance(task_result, results.SBOMOSVScan): + block.p["Invalid scan result format."] + return + components = task_result.components + ignored = task_result.ignored + ignored_count = len(ignored) + + if not components: + block.p["No vulnerabilities found."] if ignored_count > 0: component_word = "component was" if (ignored_count == 1) else "components were" block.p[f"{ignored_count} {component_word} ignored due to missing PURL or version information:"] block.p[f"{','.join(ignored)}"] - else: - if len(vulns) == 0: - block.p["No vulnerabilities listed in this SBOM."] - return - components = {a.get("ref", "") for v in vulns if v.affects is not None for a in v.affects} + return - if len(scans) > 0: - block.p["This SBOM was scanned for vulnerabilities at revision ", htm.code[scans[-1]], "."] + for component in components: + new = _vulnerability_component_details_osv(new_block, component, previous_vulns) + total_new = total_new + new - block.p[f"Vulnerabilities found in {len(components)} components:"] + new_str = f" ({total_new!s} new since last release)" if (total_new > 0) else "" + block.p[f"Scan found vulnerabilities{new_str} in {len(components)} components:"] - for component in components: - _vulnerability_component_details_osv( - block, - results.OSVComponent( - purl=component, - vulnerabilities=[ - _cdx_to_osv(v) - for v in vulns - if (v.affects is not None) and (component in [a.get("ref") for a in v.affects]) - ], - ), - ) + if ignored_count > 0: + component_word = "component was" if (ignored_count == 1) else "components were" + block.p[f"{ignored_count} {component_word} ignored due to missing PURL or version information:"] + block.p[f"{','.join(ignored)}"] def _cdx_to_osv(cdx: osv.CdxVulnerabilityDetail) -> osv.VulnerabilityDetails: @@ -542,7 +649,6 @@ def _vulnerability_scan_section( version: str, file_path: str, task_result: results.SBOMToolScore, - vulnerabilities: list[osv.CdxVulnerabilityDetail], osv_tasks: collections.abc.Sequence[sql.Task], is_release_candidate: bool, ) -> None: @@ -554,9 +660,21 @@ def _vulnerability_scan_section( block.h2["Vulnerabilities"] scans = [] + if task_result.vulnerabilities is not None: + vulnerabilities = [ + sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in task_result.vulnerabilities + ] + else: + vulnerabilities = [] + if task_result.prev_vulnerabilities is not None: + prev_vulnerabilities = [ + sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in task_result.prev_vulnerabilities + ] + else: + prev_vulnerabilities = None if task_result.atr_props is not None: scans = [t.get("value", "") for t in task_result.atr_props if t.get("name", "") == "asf:atr:osv-scan"] - _vulnerability_scan_results(block, vulnerabilities, scans, completed_task) + _vulnerability_scan_results(block, vulnerabilities, scans, completed_task, prev_vulnerabilities) if not is_release_candidate: if in_progress_task is not None: diff --git a/atr/htm.py b/atr/htm.py index 5b64ea3..c0d925c 100644 --- a/atr/htm.py +++ b/atr/htm.py @@ -44,6 +44,7 @@ h1 = htpy.h1 h2 = htpy.h2 h3 = htpy.h3 html = htpy.html +i = htpy.i li = htpy.li p = htpy.p pre = htpy.pre @@ -279,6 +280,10 @@ class Block: return BlockElementCallable(self, ul) +def icon(name: str, classes="") -> Element: + return i(f".bi.bi-{name}{classes}") + + def ul_links(*items: tuple[str, str]) -> Element: li_items = [li[a(href=item[0])[item[1]]] for item in items] return ul[*li_items] diff --git a/atr/models/results.py b/atr/models/results.py index b8e8c13..5d3ffbc 100644 --- a/atr/models/results.py +++ b/atr/models/results.py @@ -128,6 +128,9 @@ class SBOMToolScore(schema.Strict): version_name: str = schema.description("Version name") revision_number: str = schema.description("Revision number") bom_version: int | None = schema.Field(default=None, strict=False, description="BOM Version scanned") + prev_bom_version: int | None = schema.Field( + default=None, strict=False, description="BOM Version from previous release" + ) file_path: str = schema.description("Relative path to the scored SBOM file") warnings: list[str] = schema.description("Warnings from the SBOM tool") errors: list[str] = schema.description("Errors from the SBOM tool") @@ -141,6 +144,12 @@ class SBOMToolScore(schema.Strict): vulnerabilities: list[str] | None = schema.Field( default=None, strict=False, description="Vulnerabilities found in the SBOM" ) + prev_licenses: list[str] | None = schema.Field( + default=None, strict=False, description="Licenses from previous release" + ) + prev_vulnerabilities: list[str] | None = schema.Field( + default=None, strict=False, description="Vulnerabilities from previous release" + ) atr_props: list[dict[str, str]] | None = schema.Field( default=None, strict=False, description="ATR properties found in the SBOM" ) diff --git a/atr/sbom/cli.py b/atr/sbom/cli.py index 512a93c..a9cbd80 100644 --- a/atr/sbom/cli.py +++ b/atr/sbom/cli.py @@ -31,7 +31,7 @@ from .utilities import bundle_to_ntia_patch, bundle_to_vuln_patch, patch_to_data def command_license(bundle: models.bundle.Bundle) -> None: - warnings, errors = check(bundle.bom) + _, warnings, errors = check(bundle.bom) if warnings: print("WARNINGS (Category B):") for warning in warnings: diff --git a/atr/sbom/licenses.py b/atr/sbom/licenses.py index bfed06a..470571c 100644 --- a/atr/sbom/licenses.py +++ b/atr/sbom/licenses.py @@ -23,9 +23,11 @@ from .spdx import license_expression_atoms def check( bom_value: models.bom.Bom, -) -> tuple[list[models.licenses.Issue], list[models.licenses.Issue]]: + include_all: bool = False, +) -> tuple[list[models.licenses.Issue], list[models.licenses.Issue], list[models.licenses.Issue]]: warnings: list[models.licenses.Issue] = [] errors: list[models.licenses.Issue] = [] + good: list[models.licenses.Issue] = [] components = bom_value.components or [] if bom_value.metadata and bom_value.metadata.component: @@ -99,5 +101,17 @@ def check( component_type=type, ) ) + elif include_all: + good.append( + models.licenses.Issue( + component_name=name, + component_version=version, + license_expression=license_expr, + category=models.licenses.Category.A, + any_unknown=False, + scope=scope, + component_type=type, + ) + ) - return warnings, errors + return good, warnings, errors diff --git a/atr/sbom/models/base.py b/atr/sbom/models/base.py index d392d17..2e34024 100644 --- a/atr/sbom/models/base.py +++ b/atr/sbom/models/base.py @@ -21,7 +21,7 @@ import pydantic class Lax(pydantic.BaseModel): - model_config = pydantic.ConfigDict(extra="allow", strict=False) + model_config = pydantic.ConfigDict(extra="allow", strict=False, validate_assignment=True, validate_by_name=True) class Strict(pydantic.BaseModel): diff --git a/atr/static/css/bootstrap.custom.css b/atr/static/css/bootstrap.custom.css index 02d10a2..fd413b2 100644 --- a/atr/static/css/bootstrap.custom.css +++ b/atr/static/css/bootstrap.custom.css @@ -11849,4 +11849,8 @@ small, .small, .text-muted { cursor: pointer; } +.text-strike { + text-decoration: line-through; +} + /*# sourceMappingURL=bootstrap.custom.css.map */ diff --git a/atr/static/css/bootstrap.custom.css.map b/atr/static/css/bootstrap.custom.css.map index 37776be..a764b52 100644 --- a/atr/static/css/bootstrap.custom.css.map +++ b/atr/static/css/bootstrap.custom.css.map @@ -1 +1 @@ -{"version":3,"sourceRoot":"","sources":["../node_modules/bootstrap/scss/_root.scss","../node_modules/bootstrap/scss/vendor/_rfs.scss","../node_modules/bootstrap/scss/mixins/_color-mode.scss","../scss/reboot-shim.scss","../node_modules/bootstrap/scss/_accordion.scss","../node_modules/bootstrap/scss/mixins/_border-radius.scss","../node_modules/bootstrap/scss/mixins/_transition.scss","../node_modules/bootstrap/scss/_alert.scss","../node_modules/bootstrap/scss/_variables.scss","../node_modul [...] \ No newline at end of file +{"version":3,"sourceRoot":"","sources":["../node_modules/bootstrap/scss/_root.scss","../node_modules/bootstrap/scss/vendor/_rfs.scss","../node_modules/bootstrap/scss/mixins/_color-mode.scss","../scss/reboot-shim.scss","../node_modules/bootstrap/scss/_accordion.scss","../node_modules/bootstrap/scss/mixins/_border-radius.scss","../node_modules/bootstrap/scss/mixins/_transition.scss","../node_modules/bootstrap/scss/_alert.scss","../node_modules/bootstrap/scss/_variables.scss","../node_modul [...] \ No newline at end of file diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index 2c4b985..963f765 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -69,6 +69,18 @@ async def draft_checks( release = await data.release(name=sql.release_name(project_name, release_version), _committee=True).demand( RuntimeError("Release not found") ) + other_releases = ( + await data.release(project_name=project_name, phase=sql.ReleasePhase.RELEASE) + .order_by(sql.Release.released) + .all() + ) + release_versions = sorted( + [v for v in other_releases], key=lambda v: util.version_sort_key(v.version), reverse=True + ) + release_version_sortable = util.version_sort_key(release_version) + previous_version = next( + (v for v in release_versions if util.version_sort_key(v.version) < release_version_sortable), None + ) for path in relative_paths: path_str = str(path) task_function: Callable[[str, sql.Release, str, str], Awaitable[list[sql.Task]]] | None = None @@ -94,6 +106,7 @@ async def draft_checks( "project_name": project_name, "version_name": release_version, "revision_number": revision_number, + "previous_release_version": previous_version.version if previous_version else None, "file_path": path_str, "asf_uid": asf_uid, }, diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py index 46ee194..0b2ea04 100644 --- a/atr/tasks/sbom.py +++ b/atr/tasks/sbom.py @@ -74,6 +74,10 @@ class FileArgs(schema.Strict): asf_uid: str | None = None +class ScoreArgs(FileArgs): + previous_release_version: str | None = schema.description("Previous release version") + + @checks.with_model(FileArgs) async def augment(args: FileArgs) -> results.Results | None: base_dir = util.get_unfinished_dir() / args.project_name / args.version_name / args.revision_number @@ -209,9 +213,12 @@ async def score_qs(args: FileArgs) -> results.Results | None: ) [email protected]_model(FileArgs) -async def score_tool(args: FileArgs) -> results.Results | None: [email protected]_model(ScoreArgs) +async def score_tool(args: ScoreArgs) -> results.Results | None: base_dir = util.get_unfinished_dir() / args.project_name / args.version_name / args.revision_number + previous_base_dir = None + if args.previous_release_version is not None: + previous_base_dir = util.get_finished_dir() / args.project_name / args.previous_release_version if not os.path.isdir(base_dir): raise SBOMScoringError("Revision directory does not exist", {"base_dir": str(base_dir)}) full_path = os.path.join(base_dir, args.file_path) @@ -223,15 +230,35 @@ async def score_tool(args: FileArgs) -> results.Results | None: # TODO: Could update the ATR version with a constant showing last change to the augment/scan # tools so we know if it's outdated outdated = sbom.tool.plugin_outdated_version(bundle.bom) - license_warnings, license_errors = sbom.licenses.check(bundle.bom) + _, license_warnings, license_errors = sbom.licenses.check(bundle.bom) vulnerabilities = sbom.osv.vulns_from_bundle(bundle) cli_errors = sbom.cyclonedx.validate_cli(bundle) + + prev_version = None + prev_licenses = None + prev_vulnerabilities = None + if previous_base_dir is not None: + previous_full_path = os.path.join(previous_base_dir, args.file_path) + try: + previous_bundle = sbom.utilities.path_to_bundle(pathlib.Path(previous_full_path)) + except FileNotFoundError: + # Previous release didn't include this file + previous_bundle = None + if previous_bundle is not None: + prev_version, _ = sbom.utilities.get_props_from_bundle(previous_bundle) + prev_good, prev_license_warnings, prev_license_errors = sbom.licenses.check( + previous_bundle.bom, include_all=True + ) + prev_licenses = [*prev_good, *prev_license_warnings, *prev_license_errors] + prev_vulnerabilities = sbom.osv.vulns_from_bundle(previous_bundle) + return results.SBOMToolScore( kind="sbom_tool_score", project_name=args.project_name, version_name=args.version_name, revision_number=args.revision_number, bom_version=version, + prev_bom_version=prev_version, file_path=args.file_path, warnings=[w.model_dump_json() for w in warnings], errors=[e.model_dump_json() for e in errors], @@ -239,6 +266,8 @@ async def score_tool(args: FileArgs) -> results.Results | None: license_warnings=[w.model_dump_json() for w in license_warnings] if license_warnings else None, license_errors=[e.model_dump_json() for e in license_errors] if license_errors else None, vulnerabilities=[v.model_dump_json() for v in vulnerabilities], + prev_licenses=[w.model_dump_json() for w in prev_licenses] if prev_licenses else None, + prev_vulnerabilities=[v.model_dump_json() for v in prev_vulnerabilities] if prev_vulnerabilities else None, atr_props=properties, cli_errors=cli_errors, ) diff --git a/atr/util.py b/atr/util.py index 42ebea8..d1c9f86 100644 --- a/atr/util.py +++ b/atr/util.py @@ -1011,6 +1011,53 @@ def version_name_error(version_name: str) -> str | None: return None +def version_sort_key(version: str) -> bytes: + """ + Convert a version string into a sortable byte sequence. + Prefixes each digit sequence with its length as u16 little-endian. + Strips leading zeros and appends a byte for the count of leading zeros. + """ + result = [] + i = 0 + length = len(version) + while i < length: + if version[i].isdigit(): + # Find the end of this digit sequence + j = i + while j < length and version[j].isdigit(): + j += 1 + + digit_sequence = version[i:j] + + # Count leading zeros + leading_zeros = 0 + for char in digit_sequence: + if char == "0": + leading_zeros += 1 + else: + break + + # Strip leading zeros (but keep at least one digit if all zeros) + stripped = digit_sequence.lstrip("0") or "0" + + # Count the stripped digits and encode as u16 little-endian + digit_count = len(stripped) + length_bytes = digit_count.to_bytes(2, "little") + + # Add length prefix + stripped digits + leading zero count + result.extend(length_bytes) + result.extend(stripped.encode("utf-8")) + result.append(leading_zeros) + + i = j + else: + # Non-digit character, just add it + result.extend(version[i].encode("utf-8")) + i += 1 + + return bytes(result) + + async def _create_hard_link_clone_checks( source_dir: pathlib.Path, dest_dir: pathlib.Path, diff --git a/bootstrap/source/custom.scss b/bootstrap/source/custom.scss index a3d90d6..9a40292 100644 --- a/bootstrap/source/custom.scss +++ b/bootstrap/source/custom.scss @@ -156,3 +156,7 @@ small, .text-muted { .nav-link { cursor: pointer; } + +.text-strike { + text-decoration: line-through; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
