This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new 23b3bc5  Pull previous SBOM results into the report and highlight 
new/changed vulnerabilities and licenses.
23b3bc5 is described below

commit 23b3bc5adce730835e0b7d218e14d7e90db13e0e
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Dec 23 17:46:03 2025 +0000

    Pull previous SBOM results into the report and highlight new/changed 
vulnerabilities and licenses.
---
 atr/get/sbom.py         | 441 +++++++++++++++++++++++++++++++++---------------
 atr/htm.py              |   5 +
 atr/models/results.py   |   9 +
 atr/models/schema.py    |   2 +-
 atr/sbom/cli.py         |   2 +-
 atr/sbom/licenses.py    |  18 +-
 atr/sbom/models/base.py |   2 +-
 atr/static/css/atr.css  |   4 +
 atr/tasks/__init__.py   |  13 ++
 atr/tasks/sbom.py       |  35 +++-
 atr/util.py             |  47 ++++++
 11 files changed, 432 insertions(+), 146 deletions(-)

diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index 783f238..18dc4c6 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 import json
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, Literal
 
 import asfquart.base as base
 import cmarkgfm
@@ -27,6 +27,8 @@ import markupsafe
 import atr.blueprints.get as get
 import atr.db as db
 import atr.form as form
+import atr.get.compose as compose
+import atr.get.vote as vote
 import atr.htm as htm
 import atr.models.results as results
 import atr.models.sql as sql
@@ -34,38 +36,104 @@ import atr.sbom as sbom
 import atr.sbom.models.osv as osv
 import atr.shared as shared
 import atr.template as template
+import atr.util as util
 import atr.web as web
 
 if TYPE_CHECKING:
-    import collections.abc
+    from collections.abc import Sequence
 
 
 @get.committer("/sbom/report/<project>/<version>/<path:file_path>")
 async def report(session: web.Committer, project: str, version: str, 
file_path: str) -> str:
     await session.check_access(project)
 
-    validated_path = form.to_relpath(file_path)
-    if validated_path is None:
-        raise base.ASFQuartException("Invalid file path", errorcode=400)
-    validated_path_str = str(validated_path)
-
     # If the draft is not found, we try to get the release candidate
     try:
         release = await session.release(project, version, with_committee=True)
     except base.ASFQuartException:
         release = await session.release(project, version, 
phase=sql.ReleasePhase.RELEASE_CANDIDATE, with_committee=True)
-    is_release_candidate = release.phase == sql.ReleasePhase.RELEASE_CANDIDATE
 
+    block = htm.Block()
+
+    is_release_candidate = False
+    back_url = ""
+    back_anchor = ""
+    phase: Literal["COMPOSE", "VOTE"] = "COMPOSE"
+    match release.phase:
+        case sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT:
+            back_url = util.as_url(compose.selected, 
project_name=release.project.name, version_name=release.version)
+            back_anchor = f"Compose {release.project.short_display_name} 
{release.version}"
+            phase = "COMPOSE"
+        case sql.ReleasePhase.RELEASE_CANDIDATE:
+            is_release_candidate = True
+            back_url = util.as_url(vote.selected, 
project_name=release.project.name, version_name=release.version)
+            back_anchor = f"Vote on {release.project.short_display_name} 
{release.version}"
+            phase = "VOTE"
+
+    shared.distribution.html_nav(
+        block,
+        back_url=back_url,
+        back_anchor=back_anchor,
+        phase=phase,
+    )
+
+    block.h1["SBOM report"]
+
+    validated_path = form.to_relpath(file_path)
+    if validated_path is None:
+        raise base.ASFQuartException("Invalid file path", errorcode=400)
+    validated_path_str = str(validated_path)
+
+    task, augment_tasks, osv_tasks = await _fetch_tasks(validated_path_str, 
project, release, version)
+
+    task_status = await _report_task_results(block, task)
+    if task_status:
+        return task_status
+
+    if task is None or (not isinstance(task.result, results.SBOMToolScore)):
+        raise base.ASFQuartException("Invalid SBOM score result", 
errorcode=500)
+
+    task_result = task.result
+    _report_header(block, is_release_candidate, release, task_result)
+
+    if not is_release_candidate:
+        latest_augment = None
+        last_augmented_bom = None
+        if len(augment_tasks) > 0:
+            latest_augment = augment_tasks[0]
+            augment_results: list[Any] = [t.result for t in augment_tasks]
+            augmented_bom_versions = [
+                r.bom_version for r in augment_results if (r is not None) and 
(r.bom_version is not None)
+            ]
+            if len(augmented_bom_versions) > 0:
+                last_augmented_bom = max(augmented_bom_versions)
+        _augment_section(block, release, task_result, latest_augment, 
last_augmented_bom)
+
+    _conformance_section(block, task_result)
+    _license_section(block, task_result)
+
+    _vulnerability_scan_section(block, project, version, file_path, 
task_result, osv_tasks, is_release_candidate)
+
+    _outdated_tool_section(block, task_result)
+
+    _cyclonedx_cli_errors(block, task_result)
+
+    return await template.blank("SBOM report", content=block.collect())
+
+
+async def _fetch_tasks(
+    file_path: str, project: str, release: sql.Release, version: str
+) -> tuple[sql.Task | None, Sequence[sql.Task], Sequence[sql.Task]]:
+    # TODO: Abstract this code and the sbomtool.MissingAdapter validators
     async with db.session() as data:
         via = sql.validate_instrumented_attribute
-        # TODO: Abstract this code and the sbomtool.MissingAdapter validators
         tasks = (
             await data.task(
                 project_name=project,
                 version_name=version,
                 revision_number=release.latest_revision_number,
                 task_type=sql.TaskType.SBOM_TOOL_SCORE,
-                primary_rel_path=validated_path_str,
+                primary_rel_path=file_path,
             )
             .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
             .all()
@@ -75,7 +143,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
                 project_name=project,
                 version_name=version,
                 task_type=sql.TaskType.SBOM_AUGMENT,
-                primary_rel_path=validated_path_str,
+                primary_rel_path=file_path,
             )
             .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
             .all()
@@ -86,56 +154,13 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
                 project_name=project,
                 version_name=version,
                 task_type=sql.TaskType.SBOM_OSV_SCAN,
-                primary_rel_path=validated_path_str,
+                primary_rel_path=file_path,
                 revision_number=release.latest_revision_number,
             )
             .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
             .all()
         )
-
-    block = htm.Block()
-    block.h1["SBOM report"]
-
-    await _report_task_results(block, list(tasks))
-
-    task_result = tasks[0].result
-    if not isinstance(task_result, results.SBOMToolScore):
-        raise base.ASFQuartException("Invalid SBOM score result", 
errorcode=500)
-
-    _report_header(block, is_release_candidate, release, task_result)
-
-    if not is_release_candidate:
-        latest_augment = None
-        last_augmented_bom = None
-        if len(augment_tasks) > 0:
-            latest_augment = augment_tasks[0]
-            augment_results: list[Any] = [t.result for t in augment_tasks]
-            augmented_bom_versions = [
-                r.bom_version for r in augment_results if (r is not None) and 
(r.bom_version is not None)
-            ]
-            if len(augmented_bom_versions) > 0:
-                last_augmented_bom = max(augmented_bom_versions)
-        _augment_section(block, release, task_result, latest_augment, 
last_augmented_bom)
-
-    _conformance_section(block, task_result)
-    _license_section(block, task_result)
-
-    if task_result.vulnerabilities is not None:
-        vulnerabilities = [
-            sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for 
e in task_result.vulnerabilities
-        ]
-    else:
-        vulnerabilities = []
-
-    _vulnerability_scan_section(
-        block, project, version, validated_path_str, task_result, 
vulnerabilities, osv_tasks, is_release_candidate
-    )
-
-    _outdated_tool_section(block, task_result)
-
-    _cyclonedx_cli_errors(block, task_result)
-
-    return await template.blank("SBOM report", content=block.collect())
+        return tasks[0] if len(tasks) > 0 else None, augment_tasks, osv_tasks
 
 
 def _outdated_tool_section(block: htm.Block, task_result: 
results.SBOMToolScore):
@@ -183,18 +208,18 @@ def _outdated_tool_section(block: htm.Block, task_result: 
results.SBOMToolScore)
 
 
 def _conformance_section(block: htm.Block, task_result: results.SBOMToolScore) 
-> None:
+    block.h2["Conformance report"]
     warnings = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in 
task_result.warnings]
     errors = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in 
task_result.errors]
     if warnings:
-        block.h2["Warnings"]
+        block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"), 
"Warnings"]
         _missing_table(block, warnings)
 
     if errors:
-        block.h2["Errors"]
+        block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
         _missing_table(block, errors)
 
     if not (warnings or errors):
-        block.h2["Conformance report"]
         block.p["No NTIA 2021 minimum data field conformance warnings or 
errors found."]
 
 
@@ -202,22 +227,30 @@ def _license_section(block: htm.Block, task_result: 
results.SBOMToolScore) -> No
     block.h2["Licenses"]
     warnings = []
     errors = []
+    prev_licenses = None
+    if task_result.prev_licenses is not None:
+        prev_licenses = _load_license_issues(task_result.prev_licenses)
     if task_result.license_warnings is not None:
-        warnings = [sbom.models.licenses.Issue.model_validate(json.loads(w)) 
for w in task_result.license_warnings]
+        warnings = _load_license_issues(task_result.license_warnings)
     if task_result.license_errors is not None:
-        errors = [sbom.models.licenses.Issue.model_validate(json.loads(e)) for 
e in task_result.license_errors]
+        errors = _load_license_issues(task_result.license_errors)
+    # TODO: Rework the rendering of these since category in the table is 
redundant.
     if warnings:
-        block.h3["Warnings"]
-        _license_table(block, warnings)
+        block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"), 
"Warnings"]
+        _license_table(block, warnings, prev_licenses)
 
     if errors:
-        block.h3["Errors"]
-        _license_table(block, errors)
+        block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
+        _license_table(block, errors, prev_licenses)
 
     if not (warnings or errors):
         block.p["No license warnings or errors found."]
 
 
+def _load_license_issues(issues: list[str]) -> 
list[sbom.models.licenses.Issue]:
+    return [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in 
issues]
+
+
 def _report_header(
     block: htm.Block, is_release_candidate: bool, release: sql.Release, 
task_result: results.SBOMToolScore
 ) -> None:
@@ -234,14 +267,14 @@ def _report_header(
         block.p[f"This report is for the latest {release.version} release 
candidate."]
 
 
-async def _report_task_results(block: htm.Block, tasks: list[sql.Task]):
-    if not tasks:
+async def _report_task_results(block: htm.Block, task: sql.Task | None):
+    if task is None:
         block.p["No SBOM score found."]
         return await template.blank("SBOM report", content=block.collect())
 
-    task_status = tasks[0].status
-    task_error = tasks[0].error
-    if task_status == sql.TaskStatus.QUEUED:
+    task_status = task.status
+    task_error = task.error
+    if task_status == sql.TaskStatus.QUEUED or task_status == 
sql.TaskStatus.ACTIVE:
         block.p["SBOM score is being computed."]
         return await template.blank("SBOM report", content=block.collect())
 
@@ -319,7 +352,11 @@ def _extract_vulnerability_severity(vuln: 
osv.VulnerabilityDetails) -> str:
     return "Unknown"
 
 
-def _license_table(block: htm.Block, items: list[sbom.models.licenses.Issue]) 
-> None:
+def _license_table(
+    block: htm.Block,
+    items: list[sbom.models.licenses.Issue],
+    prev: list[sbom.models.licenses.Issue] | None,
+) -> None:
     warning_rows = [
         htm.tr[
             htm.td[
@@ -327,9 +364,9 @@ def _license_table(block: htm.Block, items: 
list[sbom.models.licenses.Issue]) ->
                 if (len(components) == 0)
                 else htm.details[htm.summary[f"Category {category!s}"], 
htm.div[_detail_table(components)]]
             ],
-            htm.td[str(count)],
+            htm.td[f"{count!s} {f'({new!s} new, {updated!s} changed)' if new 
or updated else ''}"],
         ]
-        for category, count, components in _license_tally(items)
+        for category, count, new, updated, components in _license_tally(items, 
prev)
     ]
     block.table(".table.table-sm.table-bordered.table-striped")[
         htm.thead[htm.tr[htm.th["License Category"], htm.th["Count"]]],
@@ -358,7 +395,7 @@ def _missing_table(block: htm.Block, items: 
list[sbom.models.conformance.Missing
 
 def _detail_table(components: list[str | None]):
     return htm.table(".table.table-sm.table-bordered.table-striped")[
-        htm.tbody[[htm.tr[htm.td[comp.capitalize()]] for comp in components if 
comp is not None]],
+        htm.tbody[[htm.tr[htm.td[comp]] for comp in components if comp is not 
None]],
     ]
 
 
@@ -378,33 +415,77 @@ def _missing_tally(items: 
list[sbom.models.conformance.Missing]) -> list[tuple[s
     )
 
 
+# TODO: Update this to return either a block or something we can use later in 
a block for styling reasons
 def _license_tally(
     items: list[sbom.models.licenses.Issue],
-) -> list[tuple[sbom.models.licenses.Category, int, list[str | None]]]:
+    old_issues: list[sbom.models.licenses.Issue] | None,
+) -> list[tuple[sbom.models.licenses.Category, int, int | None, int | None, 
list[str | None]]]:
     counts: dict[sbom.models.licenses.Category, int] = {}
     components: dict[sbom.models.licenses.Category, list[str | None]] = {}
+    new_count = 0
+    updated_count = 0
+    old_map = {lic.component_name: (lic.license_expression, lic.category) for 
lic in old_issues} if old_issues else None
     for item in items:
         key = item.category
         counts[key] = counts.get(key, 0) + 1
+        name = str(item).capitalize()
+        if old_map is not None:
+            if item.component_name not in old_map:
+                new_count = new_count + 1
+                name = f"{name} (new)"
+            elif item.license_expression != old_map[item.component_name][0]:
+                updated_count = updated_count + 1
+                name = f"{name} (previously {old_map[item.component_name][0]} 
- Category {
+                    str(old_map[item.component_name][1]).upper()
+                })"
         if key not in components:
-            components[key] = [str(item)]
+            components[key] = [name]
         else:
-            components[key].append(str(item))
+            components[key].append(name)
     return sorted(
-        [(category, count, components.get(category, [])) for category, count 
in counts.items()],
+        [
+            (
+                category,
+                count,
+                new_count if old_issues is not None else None,
+                updated_count if old_issues is not None else None,
+                components.get(category, []),
+            )
+            for category, count in counts.items()
+        ],
         key=lambda kv: kv[0].value,
     )
 
 
-def _vulnerability_component_details_osv(block: htm.Block, component: 
results.OSVComponent) -> None:
-    details_content = []
-    summary_element = htm.summary[
-        
htm.span(".badge.bg-danger.me-2.font-monospace")[str(len(component.vulnerabilities))],
-        htm.strong[component.purl],
-    ]
-    details_content.append(summary_element)
-
+def _severity_to_style(severity: str) -> str:
+    match severity.lower():
+        case "critical":
+            return ".bg-danger.text-light"
+        case "high":
+            return ".bg-danger.text-light"
+        case "medium":
+            return ".bg-warning.text-dark"
+        case "moderate":
+            return ".bg-warning.text-dark"
+        case "low":
+            return ".bg-warning.text-dark"
+        case "info":
+            return ".bg-info.text-light"
+    return ".bg-info.text-light"
+
+
+def _vulnerability_component_details_osv(
+    block: htm.Block,
+    component: results.OSVComponent,
+    previous_vulns: dict[str, tuple[str, list[str]]] | None,  # id: severity
+) -> int:
+    severities = ["critical", "high", "medium", "moderate", "low", "info", 
"none", "unknown"]
+    new = 0
+    worst = 99
+
+    vuln_details = []
     for vuln in component.vulnerabilities:
+        is_new = False
         vuln_id = vuln.id or "Unknown"
         vuln_summary = vuln.summary
         vuln_refs = []
@@ -412,11 +493,34 @@ def _vulnerability_component_details_osv(block: 
htm.Block, component: results.OS
             vuln_refs = [r for r in vuln.references if r.get("type", "") == 
"WEB"]
         vuln_primary_ref = vuln_refs[0] if (len(vuln_refs) > 0) else {}
         vuln_modified = vuln.modified or "Unknown"
+
         vuln_severity = _extract_vulnerability_severity(vuln)
+        worst = _update_worst_severity(severities, vuln_severity, worst)
+
+        if previous_vulns is not None:
+            if (
+                (vuln_id not in previous_vulns)
+                or previous_vulns[vuln_id][0] != vuln_severity
+                or (component.purl not in previous_vulns[vuln_id][1])
+            ):
+                is_new = True
+                new = new + 1
 
         vuln_header = [htm.a(href=vuln_primary_ref.get("url", ""), 
target="_blank")[htm.strong(".me-2")[vuln_id]]]
-        if vuln_severity != "Unknown":
-            
vuln_header.append(htm.span(".badge.bg-warning.text-dark")[vuln_severity])
+        style = f".badge.me-2{_severity_to_style(vuln_severity)}"
+        vuln_header.append(htm.span(style)[vuln_severity])
+
+        if (previous_vulns is not None) and is_new:
+            if (vuln_id in previous_vulns) and (component.purl in 
previous_vulns[vuln_id][1]):
+                # If it's there, the sev must have changed
+                vuln_header.append(htm.icon("arrow-left", ".me-2"))
+                vuln_header.append(
+                    
htm.span(f".badge{_severity_to_style(previous_vulns[vuln_id][0])}.atr-text-strike")[
+                        previous_vulns[vuln_id][0]
+                    ]
+                )
+            else:
+                
vuln_header.append(htm.span(".badge.bg-info.text-light")["new"])
 
         details = 
markupsafe.Markup(cmarkgfm.github_flavored_markdown_to_html(vuln.details))
         vuln_div = 
htm.div(".ms-3.mb-3.border-start.border-warning.border-3.ps-3")[
@@ -428,13 +532,31 @@ def _vulnerability_component_details_osv(block: 
htm.Block, component: results.OS
             ],
             htm.div(".mt-2.text-muted")[details or "No additional details 
available."],
         ]
-        details_content.append(vuln_div)
-
+        vuln_details.append(vuln_div)
+
+    badge_style = ""
+    if worst < len(severities):
+        badge_style = _severity_to_style(severities[worst])
+    summary_elements = 
[htm.span(f".badge{badge_style}.me-2.font-monospace")[str(len(component.vulnerabilities))]]
+    if new > 0:
+        summary_elements.append(htm.span(".badge.me-2.bg-info")[f"{new!s} 
new"])
+    summary_elements.append(htm.strong[component.purl])
+    details_content = [htm.summary[*summary_elements], *vuln_details]
     block.append(htm.details(".mb-3.rounded")[*details_content])
+    return new
+
+
+def _update_worst_severity(severities: list[str], vuln_severity: str, worst: 
int) -> int:
+    try:
+        sev_index = severities.index(vuln_severity)
+    except ValueError:
+        sev_index = 99
+    worst = min(worst, sev_index)
+    return worst
 
 
 def _vulnerability_scan_button(block: htm.Block) -> None:
-    block.p["No new vulnerability scan has been performed for this revision."]
+    block.p["You can perform a new vulnerability scan."]
 
     form.render_block(
         block,
@@ -444,9 +566,7 @@ def _vulnerability_scan_button(block: htm.Block) -> None:
     )
 
 
-def _vulnerability_scan_find_completed_task(
-    osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
-) -> sql.Task | None:
+def _vulnerability_scan_find_completed_task(osv_tasks: Sequence[sql.Task], 
revision_number: str) -> sql.Task | None:
     """Find the most recent completed OSV scan task for the given revision."""
     for task in osv_tasks:
         if (task.status == sql.TaskStatus.COMPLETED) and (task.result is not 
None):
@@ -456,9 +576,7 @@ def _vulnerability_scan_find_completed_task(
     return None
 
 
-def _vulnerability_scan_find_in_progress_task(
-    osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
-) -> sql.Task | None:
+def _vulnerability_scan_find_in_progress_task(osv_tasks: Sequence[sql.Task], 
revision_number: str) -> sql.Task | None:
     """Find the most recent in-progress OSV scan task for the given 
revision."""
     for task in osv_tasks:
         if task.revision_number == revision_number:
@@ -468,58 +586,94 @@ def _vulnerability_scan_find_in_progress_task(
 
 
 def _vulnerability_scan_results(
-    block: htm.Block, vulns: list[osv.CdxVulnerabilityDetail], scans: 
list[str], task: sql.Task | None
+    block: htm.Block,
+    vulns: list[osv.CdxVulnerabilityDetail],
+    scans: list[str],
+    task: sql.Task | None,
+    prev: list[osv.CdxVulnerabilityDetail] | None,
 ) -> None:
+    previous_vulns = None
+    if prev is not None:
+        previous_osv = [
+            (_cdx_to_osv(v), [a.get("ref", "") for a in v.affects] if 
v.affects is not None else []) for v in prev
+        ]
+        previous_vulns = {v.id: (_extract_vulnerability_severity(v), a) for v, 
a in previous_osv}
     if task is not None:
-        task_result = task.result
-        if not isinstance(task_result, results.SBOMOSVScan):
-            block.p["Invalid scan result format."]
-            return
+        _vulnerability_results_from_scan(task, block, previous_vulns)
+    else:
+        _vulnerability_results_from_bom(vulns, block, scans, previous_vulns)
 
-        components = task_result.components
-        ignored = task_result.ignored
-        ignored_count = len(ignored)
 
-        if not components:
-            block.p["No vulnerabilities found."]
-            if ignored_count > 0:
-                component_word = "component was" if (ignored_count == 1) else 
"components were"
-                block.p[f"{ignored_count} {component_word} ignored due to 
missing PURL or version information:"]
-                block.p[f"{','.join(ignored)}"]
-            return
+def _vulnerability_results_from_bom(
+    vulns: list[osv.CdxVulnerabilityDetail],
+    block: htm.Block,
+    scans: list[str],
+    previous_vulns: dict[str, tuple[str, list[str]]] | None,
+) -> None:
+    total_new = 0
+    new_block = htm.Block()
+    if len(vulns) == 0:
+        block.p["No vulnerabilities listed in this SBOM."]
+        return
+    components = {a.get("ref", "") for v in vulns if v.affects is not None for 
a in v.affects}
+
+    if len(scans) > 0:
+        block.p["This SBOM was scanned for vulnerabilities at revision ", 
htm.code[scans[-1]], "."]
+
+    for component in components:
+        new = _vulnerability_component_details_osv(
+            new_block,
+            results.OSVComponent(
+                purl=component,
+                vulnerabilities=[
+                    _cdx_to_osv(v)
+                    for v in vulns
+                    if (v.affects is not None) and (component in [a.get("ref") 
for a in v.affects])
+                ],
+            ),
+            previous_vulns,
+        )
+        total_new = total_new + new
 
-        block.p[f"Scan found vulnerabilities in {len(components)} components:"]
+    new_str = f" ({total_new!s} new since last release)" if (total_new > 0) 
else ""
+    block.p[f"Vulnerabilities{new_str} found in {len(components)} components:"]
+    block.append(new_block)
 
-        for component in components:
-            _vulnerability_component_details_osv(block, component)
 
+def _vulnerability_results_from_scan(
+    task: sql.Task, block: htm.Block, previous_vulns: dict[str, tuple[str, 
list[str]]] | None
+) -> None:
+    total_new = 0
+    new_block = htm.Block()
+    task_result = task.result
+    if not isinstance(task_result, results.SBOMOSVScan):
+        block.p["Invalid scan result format."]
+        return
+
+    components = task_result.components
+    ignored = task_result.ignored
+    ignored_count = len(ignored)
+
+    if not components:
+        block.p["No vulnerabilities found."]
         if ignored_count > 0:
             component_word = "component was" if (ignored_count == 1) else 
"components were"
             block.p[f"{ignored_count} {component_word} ignored due to missing 
PURL or version information:"]
             block.p[f"{','.join(ignored)}"]
-    else:
-        if len(vulns) == 0:
-            block.p["No vulnerabilities listed in this SBOM."]
-            return
-        components = {a.get("ref", "") for v in vulns if v.affects is not None 
for a in v.affects}
+        return
 
-        if len(scans) > 0:
-            block.p["This SBOM was scanned for vulnerabilities at revision ", 
htm.code[scans[-1]], "."]
+    for component in components:
+        new = _vulnerability_component_details_osv(new_block, component, 
previous_vulns)
+        total_new = total_new + new
 
-        block.p[f"Vulnerabilities found in {len(components)} components:"]
+    new_str = f" ({total_new!s} new since last release)" if (total_new > 0) 
else ""
+    block.p[f"Scan found vulnerabilities{new_str} in {len(components)} 
components:"]
 
-        for component in components:
-            _vulnerability_component_details_osv(
-                block,
-                results.OSVComponent(
-                    purl=component,
-                    vulnerabilities=[
-                        _cdx_to_osv(v)
-                        for v in vulns
-                        if (v.affects is not None) and (component in 
[a.get("ref") for a in v.affects])
-                    ],
-                ),
-            )
+    if ignored_count > 0:
+        component_word = "component was" if (ignored_count == 1) else 
"components were"
+        block.p[f"{ignored_count} {component_word} ignored due to missing PURL 
or version information:"]
+        block.p[f"{','.join(ignored)}"]
+    block.append(new_block)
 
 
 def _cdx_to_osv(cdx: osv.CdxVulnerabilityDetail) -> osv.VulnerabilityDetails:
@@ -547,8 +701,7 @@ def _vulnerability_scan_section(
     version: str,
     file_path: str,
     task_result: results.SBOMToolScore,
-    vulnerabilities: list[osv.CdxVulnerabilityDetail],
-    osv_tasks: collections.abc.Sequence[sql.Task],
+    osv_tasks: Sequence[sql.Task],
     is_release_candidate: bool,
 ) -> None:
     """Display the vulnerability scan section based on task status."""
@@ -559,9 +712,21 @@ def _vulnerability_scan_section(
     block.h2["Vulnerabilities"]
 
     scans = []
+    if task_result.vulnerabilities is not None:
+        vulnerabilities = [
+            sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for 
e in task_result.vulnerabilities
+        ]
+    else:
+        vulnerabilities = []
+    if task_result.prev_vulnerabilities is not None:
+        prev_vulnerabilities = [
+            sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for 
e in task_result.prev_vulnerabilities
+        ]
+    else:
+        prev_vulnerabilities = None
     if task_result.atr_props is not None:
         scans = [t.get("value", "") for t in task_result.atr_props if 
t.get("name", "") == "asf:atr:osv-scan"]
-    _vulnerability_scan_results(block, vulnerabilities, scans, completed_task)
+    _vulnerability_scan_results(block, vulnerabilities, scans, completed_task, 
prev_vulnerabilities)
 
     if not is_release_candidate:
         if in_progress_task is not None:
diff --git a/atr/htm.py b/atr/htm.py
index 5b64ea3..c0d925c 100644
--- a/atr/htm.py
+++ b/atr/htm.py
@@ -44,6 +44,7 @@ h1 = htpy.h1
 h2 = htpy.h2
 h3 = htpy.h3
 html = htpy.html
+i = htpy.i
 li = htpy.li
 p = htpy.p
 pre = htpy.pre
@@ -279,6 +280,10 @@ class Block:
         return BlockElementCallable(self, ul)
 
 
+def icon(name: str, classes="") -> Element:
+    return i(f".bi.bi-{name}{classes}")
+
+
 def ul_links(*items: tuple[str, str]) -> Element:
     li_items = [li[a(href=item[0])[item[1]]] for item in items]
     return ul[*li_items]
diff --git a/atr/models/results.py b/atr/models/results.py
index b8e8c13..5d3ffbc 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -128,6 +128,9 @@ class SBOMToolScore(schema.Strict):
     version_name: str = schema.description("Version name")
     revision_number: str = schema.description("Revision number")
     bom_version: int | None = schema.Field(default=None, strict=False, 
description="BOM Version scanned")
+    prev_bom_version: int | None = schema.Field(
+        default=None, strict=False, description="BOM Version from previous 
release"
+    )
     file_path: str = schema.description("Relative path to the scored SBOM 
file")
     warnings: list[str] = schema.description("Warnings from the SBOM tool")
     errors: list[str] = schema.description("Errors from the SBOM tool")
@@ -141,6 +144,12 @@ class SBOMToolScore(schema.Strict):
     vulnerabilities: list[str] | None = schema.Field(
         default=None, strict=False, description="Vulnerabilities found in the 
SBOM"
     )
+    prev_licenses: list[str] | None = schema.Field(
+        default=None, strict=False, description="Licenses from previous 
release"
+    )
+    prev_vulnerabilities: list[str] | None = schema.Field(
+        default=None, strict=False, description="Vulnerabilities from previous 
release"
+    )
     atr_props: list[dict[str, str]] | None = schema.Field(
         default=None, strict=False, description="ATR properties found in the 
SBOM"
     )
diff --git a/atr/models/schema.py b/atr/models/schema.py
index 6bcb013..f7ac483 100644
--- a/atr/models/schema.py
+++ b/atr/models/schema.py
@@ -25,7 +25,7 @@ Field = pydantic.Field
 
 
 class Lax(pydantic.BaseModel):
-    model_config = pydantic.ConfigDict(extra="allow", strict=False, 
validate_assignment=True)
+    model_config = pydantic.ConfigDict(extra="allow", strict=False, 
validate_assignment=True, validate_by_name=True)
 
 
 class Strict(pydantic.BaseModel):
diff --git a/atr/sbom/cli.py b/atr/sbom/cli.py
index 512a93c..a9cbd80 100644
--- a/atr/sbom/cli.py
+++ b/atr/sbom/cli.py
@@ -31,7 +31,7 @@ from .utilities import bundle_to_ntia_patch, 
bundle_to_vuln_patch, patch_to_data
 
 
 def command_license(bundle: models.bundle.Bundle) -> None:
-    warnings, errors = check(bundle.bom)
+    _, warnings, errors = check(bundle.bom)
     if warnings:
         print("WARNINGS (Category B):")
         for warning in warnings:
diff --git a/atr/sbom/licenses.py b/atr/sbom/licenses.py
index bfed06a..470571c 100644
--- a/atr/sbom/licenses.py
+++ b/atr/sbom/licenses.py
@@ -23,9 +23,11 @@ from .spdx import license_expression_atoms
 
 def check(
     bom_value: models.bom.Bom,
-) -> tuple[list[models.licenses.Issue], list[models.licenses.Issue]]:
+    include_all: bool = False,
+) -> tuple[list[models.licenses.Issue], list[models.licenses.Issue], 
list[models.licenses.Issue]]:
     warnings: list[models.licenses.Issue] = []
     errors: list[models.licenses.Issue] = []
+    good: list[models.licenses.Issue] = []
 
     components = bom_value.components or []
     if bom_value.metadata and bom_value.metadata.component:
@@ -99,5 +101,17 @@ def check(
                         component_type=type,
                     )
                 )
+            elif include_all:
+                good.append(
+                    models.licenses.Issue(
+                        component_name=name,
+                        component_version=version,
+                        license_expression=license_expr,
+                        category=models.licenses.Category.A,
+                        any_unknown=False,
+                        scope=scope,
+                        component_type=type,
+                    )
+                )
 
-    return warnings, errors
+    return good, warnings, errors
diff --git a/atr/sbom/models/base.py b/atr/sbom/models/base.py
index d392d17..2e34024 100644
--- a/atr/sbom/models/base.py
+++ b/atr/sbom/models/base.py
@@ -21,7 +21,7 @@ import pydantic
 
 
 class Lax(pydantic.BaseModel):
-    model_config = pydantic.ConfigDict(extra="allow", strict=False)
+    model_config = pydantic.ConfigDict(extra="allow", strict=False, 
validate_assignment=True, validate_by_name=True)
 
 
 class Strict(pydantic.BaseModel):
diff --git a/atr/static/css/atr.css b/atr/static/css/atr.css
index aee676b..bd3566d 100644
--- a/atr/static/css/atr.css
+++ b/atr/static/css/atr.css
@@ -545,3 +545,7 @@ td.atr-shrink {
 .atr-checks-files {
     line-height: 2.25;
 }
+
+.atr-text-strike {
+    text-decoration: line-through;
+}
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 2c4b985..963f765 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -69,6 +69,18 @@ async def draft_checks(
         release = await data.release(name=sql.release_name(project_name, 
release_version), _committee=True).demand(
             RuntimeError("Release not found")
         )
+        other_releases = (
+            await data.release(project_name=project_name, 
phase=sql.ReleasePhase.RELEASE)
+            .order_by(sql.Release.released)
+            .all()
+        )
+        release_versions = sorted(
+            [v for v in other_releases], key=lambda v: 
util.version_sort_key(v.version), reverse=True
+        )
+        release_version_sortable = util.version_sort_key(release_version)
+        previous_version = next(
+            (v for v in release_versions if util.version_sort_key(v.version) < 
release_version_sortable), None
+        )
         for path in relative_paths:
             path_str = str(path)
             task_function: Callable[[str, sql.Release, str, str], 
Awaitable[list[sql.Task]]] | None = None
@@ -94,6 +106,7 @@ async def draft_checks(
                             "project_name": project_name,
                             "version_name": release_version,
                             "revision_number": revision_number,
+                            "previous_release_version": 
previous_version.version if previous_version else None,
                             "file_path": path_str,
                             "asf_uid": asf_uid,
                         },
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 46ee194..0b2ea04 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -74,6 +74,10 @@ class FileArgs(schema.Strict):
     asf_uid: str | None = None
 
 
+class ScoreArgs(FileArgs):
+    previous_release_version: str | None = schema.description("Previous 
release version")
+
+
 @checks.with_model(FileArgs)
 async def augment(args: FileArgs) -> results.Results | None:
     base_dir = util.get_unfinished_dir() / args.project_name / 
args.version_name / args.revision_number
@@ -209,9 +213,12 @@ async def score_qs(args: FileArgs) -> results.Results | 
None:
     )
 
 
[email protected]_model(FileArgs)
-async def score_tool(args: FileArgs) -> results.Results | None:
[email protected]_model(ScoreArgs)
+async def score_tool(args: ScoreArgs) -> results.Results | None:
     base_dir = util.get_unfinished_dir() / args.project_name / 
args.version_name / args.revision_number
+    previous_base_dir = None
+    if args.previous_release_version is not None:
+        previous_base_dir = util.get_finished_dir() / args.project_name / 
args.previous_release_version
     if not os.path.isdir(base_dir):
         raise SBOMScoringError("Revision directory does not exist", 
{"base_dir": str(base_dir)})
     full_path = os.path.join(base_dir, args.file_path)
@@ -223,15 +230,35 @@ async def score_tool(args: FileArgs) -> results.Results | 
None:
     # TODO: Could update the ATR version with a constant showing last change 
to the augment/scan
     #  tools so we know if it's outdated
     outdated = sbom.tool.plugin_outdated_version(bundle.bom)
-    license_warnings, license_errors = sbom.licenses.check(bundle.bom)
+    _, license_warnings, license_errors = sbom.licenses.check(bundle.bom)
     vulnerabilities = sbom.osv.vulns_from_bundle(bundle)
     cli_errors = sbom.cyclonedx.validate_cli(bundle)
+
+    prev_version = None
+    prev_licenses = None
+    prev_vulnerabilities = None
+    if previous_base_dir is not None:
+        previous_full_path = os.path.join(previous_base_dir, args.file_path)
+        try:
+            previous_bundle = 
sbom.utilities.path_to_bundle(pathlib.Path(previous_full_path))
+        except FileNotFoundError:
+            # Previous release didn't include this file
+            previous_bundle = None
+        if previous_bundle is not None:
+            prev_version, _ = 
sbom.utilities.get_props_from_bundle(previous_bundle)
+            prev_good, prev_license_warnings, prev_license_errors = 
sbom.licenses.check(
+                previous_bundle.bom, include_all=True
+            )
+            prev_licenses = [*prev_good, *prev_license_warnings, 
*prev_license_errors]
+            prev_vulnerabilities = sbom.osv.vulns_from_bundle(previous_bundle)
+
     return results.SBOMToolScore(
         kind="sbom_tool_score",
         project_name=args.project_name,
         version_name=args.version_name,
         revision_number=args.revision_number,
         bom_version=version,
+        prev_bom_version=prev_version,
         file_path=args.file_path,
         warnings=[w.model_dump_json() for w in warnings],
         errors=[e.model_dump_json() for e in errors],
@@ -239,6 +266,8 @@ async def score_tool(args: FileArgs) -> results.Results | 
None:
         license_warnings=[w.model_dump_json() for w in license_warnings] if 
license_warnings else None,
         license_errors=[e.model_dump_json() for e in license_errors] if 
license_errors else None,
         vulnerabilities=[v.model_dump_json() for v in vulnerabilities],
+        prev_licenses=[w.model_dump_json() for w in prev_licenses] if 
prev_licenses else None,
+        prev_vulnerabilities=[v.model_dump_json() for v in 
prev_vulnerabilities] if prev_vulnerabilities else None,
         atr_props=properties,
         cli_errors=cli_errors,
     )
diff --git a/atr/util.py b/atr/util.py
index 42ebea8..0838124 100644
--- a/atr/util.py
+++ b/atr/util.py
@@ -1011,6 +1011,53 @@ def version_name_error(version_name: str) -> str | None:
     return None
 
 
+def version_sort_key(version: str) -> bytes:
+    """
+    Convert a version string into a sortable byte sequence.
+    Prefixes each digit sequence with its length as u16 little-endian.
+    Strips leading zeros and appends a byte for the count of leading zeros.
+    """
+    result = []
+    i = 0
+    length = len(version)
+    while i < length:
+        if version[i].isdigit():
+            # Find the end of this digit sequence
+            j = i
+            while j < length and version[j].isdigit():
+                j += 1
+
+            digit_sequence = version[i:j]
+
+            # Count leading zeros
+            leading_zeros = 0
+            for char in digit_sequence:
+                if char == "0":
+                    leading_zeros += 1
+                else:
+                    break
+
+            # Strip leading zeros (but keep at least one digit if all zeros)
+            stripped = digit_sequence.lstrip("0")
+
+            # Count the stripped digits and encode as u16 little-endian
+            digit_count = len(stripped)
+            length_bytes = digit_count.to_bytes(2)
+
+            # Add length prefix + stripped digits + leading zero count
+            result.extend(length_bytes)
+            result.extend(stripped.encode("utf-8"))
+            result.append(leading_zeros)
+
+            i = j
+        else:
+            # Non-digit character, just add it
+            result.extend(version[i].encode("utf-8"))
+            i += 1
+
+    return bytes(result)
+
+
 async def _create_hard_link_clone_checks(
     source_dir: pathlib.Path,
     dest_dir: pathlib.Path,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to