This is an automated email from the ASF dual-hosted git repository.
arm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 23b3bc5 Pull previous SBOM results into the report and highlight
new/changed vulnerabilities and licenses.
23b3bc5 is described below
commit 23b3bc5adce730835e0b7d218e14d7e90db13e0e
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Dec 23 17:46:03 2025 +0000
Pull previous SBOM results into the report and highlight new/changed
vulnerabilities and licenses.
---
atr/get/sbom.py | 441 +++++++++++++++++++++++++++++++++---------------
atr/htm.py | 5 +
atr/models/results.py | 9 +
atr/models/schema.py | 2 +-
atr/sbom/cli.py | 2 +-
atr/sbom/licenses.py | 18 +-
atr/sbom/models/base.py | 2 +-
atr/static/css/atr.css | 4 +
atr/tasks/__init__.py | 13 ++
atr/tasks/sbom.py | 35 +++-
atr/util.py | 47 ++++++
11 files changed, 432 insertions(+), 146 deletions(-)
diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index 783f238..18dc4c6 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -18,7 +18,7 @@
from __future__ import annotations
import json
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, Literal
import asfquart.base as base
import cmarkgfm
@@ -27,6 +27,8 @@ import markupsafe
import atr.blueprints.get as get
import atr.db as db
import atr.form as form
+import atr.get.compose as compose
+import atr.get.vote as vote
import atr.htm as htm
import atr.models.results as results
import atr.models.sql as sql
@@ -34,38 +36,104 @@ import atr.sbom as sbom
import atr.sbom.models.osv as osv
import atr.shared as shared
import atr.template as template
+import atr.util as util
import atr.web as web
if TYPE_CHECKING:
- import collections.abc
+ from collections.abc import Sequence
@get.committer("/sbom/report/<project>/<version>/<path:file_path>")
async def report(session: web.Committer, project: str, version: str,
file_path: str) -> str:
await session.check_access(project)
- validated_path = form.to_relpath(file_path)
- if validated_path is None:
- raise base.ASFQuartException("Invalid file path", errorcode=400)
- validated_path_str = str(validated_path)
-
# If the draft is not found, we try to get the release candidate
try:
release = await session.release(project, version, with_committee=True)
except base.ASFQuartException:
release = await session.release(project, version,
phase=sql.ReleasePhase.RELEASE_CANDIDATE, with_committee=True)
- is_release_candidate = release.phase == sql.ReleasePhase.RELEASE_CANDIDATE
+ block = htm.Block()
+
+ is_release_candidate = False
+ back_url = ""
+ back_anchor = ""
+ phase: Literal["COMPOSE", "VOTE"] = "COMPOSE"
+ match release.phase:
+ case sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT:
+ back_url = util.as_url(compose.selected,
project_name=release.project.name, version_name=release.version)
+ back_anchor = f"Compose {release.project.short_display_name}
{release.version}"
+ phase = "COMPOSE"
+ case sql.ReleasePhase.RELEASE_CANDIDATE:
+ is_release_candidate = True
+ back_url = util.as_url(vote.selected,
project_name=release.project.name, version_name=release.version)
+ back_anchor = f"Vote on {release.project.short_display_name}
{release.version}"
+ phase = "VOTE"
+
+ shared.distribution.html_nav(
+ block,
+ back_url=back_url,
+ back_anchor=back_anchor,
+ phase=phase,
+ )
+
+ block.h1["SBOM report"]
+
+ validated_path = form.to_relpath(file_path)
+ if validated_path is None:
+ raise base.ASFQuartException("Invalid file path", errorcode=400)
+ validated_path_str = str(validated_path)
+
+ task, augment_tasks, osv_tasks = await _fetch_tasks(validated_path_str,
project, release, version)
+
+ task_status = await _report_task_results(block, task)
+ if task_status:
+ return task_status
+
+ if task is None or (not isinstance(task.result, results.SBOMToolScore)):
+ raise base.ASFQuartException("Invalid SBOM score result",
errorcode=500)
+
+ task_result = task.result
+ _report_header(block, is_release_candidate, release, task_result)
+
+ if not is_release_candidate:
+ latest_augment = None
+ last_augmented_bom = None
+ if len(augment_tasks) > 0:
+ latest_augment = augment_tasks[0]
+ augment_results: list[Any] = [t.result for t in augment_tasks]
+ augmented_bom_versions = [
+ r.bom_version for r in augment_results if (r is not None) and
(r.bom_version is not None)
+ ]
+ if len(augmented_bom_versions) > 0:
+ last_augmented_bom = max(augmented_bom_versions)
+ _augment_section(block, release, task_result, latest_augment,
last_augmented_bom)
+
+ _conformance_section(block, task_result)
+ _license_section(block, task_result)
+
+ _vulnerability_scan_section(block, project, version, file_path,
task_result, osv_tasks, is_release_candidate)
+
+ _outdated_tool_section(block, task_result)
+
+ _cyclonedx_cli_errors(block, task_result)
+
+ return await template.blank("SBOM report", content=block.collect())
+
+
+async def _fetch_tasks(
+ file_path: str, project: str, release: sql.Release, version: str
+) -> tuple[sql.Task | None, Sequence[sql.Task], Sequence[sql.Task]]:
+ # TODO: Abstract this code and the sbomtool.MissingAdapter validators
async with db.session() as data:
via = sql.validate_instrumented_attribute
- # TODO: Abstract this code and the sbomtool.MissingAdapter validators
tasks = (
await data.task(
project_name=project,
version_name=version,
revision_number=release.latest_revision_number,
task_type=sql.TaskType.SBOM_TOOL_SCORE,
- primary_rel_path=validated_path_str,
+ primary_rel_path=file_path,
)
.order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
.all()
@@ -75,7 +143,7 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
project_name=project,
version_name=version,
task_type=sql.TaskType.SBOM_AUGMENT,
- primary_rel_path=validated_path_str,
+ primary_rel_path=file_path,
)
.order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
.all()
@@ -86,56 +154,13 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
project_name=project,
version_name=version,
task_type=sql.TaskType.SBOM_OSV_SCAN,
- primary_rel_path=validated_path_str,
+ primary_rel_path=file_path,
revision_number=release.latest_revision_number,
)
.order_by(sql.sqlmodel.desc(via(sql.Task.added)))
.all()
)
-
- block = htm.Block()
- block.h1["SBOM report"]
-
- await _report_task_results(block, list(tasks))
-
- task_result = tasks[0].result
- if not isinstance(task_result, results.SBOMToolScore):
- raise base.ASFQuartException("Invalid SBOM score result",
errorcode=500)
-
- _report_header(block, is_release_candidate, release, task_result)
-
- if not is_release_candidate:
- latest_augment = None
- last_augmented_bom = None
- if len(augment_tasks) > 0:
- latest_augment = augment_tasks[0]
- augment_results: list[Any] = [t.result for t in augment_tasks]
- augmented_bom_versions = [
- r.bom_version for r in augment_results if (r is not None) and
(r.bom_version is not None)
- ]
- if len(augmented_bom_versions) > 0:
- last_augmented_bom = max(augmented_bom_versions)
- _augment_section(block, release, task_result, latest_augment,
last_augmented_bom)
-
- _conformance_section(block, task_result)
- _license_section(block, task_result)
-
- if task_result.vulnerabilities is not None:
- vulnerabilities = [
- sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for
e in task_result.vulnerabilities
- ]
- else:
- vulnerabilities = []
-
- _vulnerability_scan_section(
- block, project, version, validated_path_str, task_result,
vulnerabilities, osv_tasks, is_release_candidate
- )
-
- _outdated_tool_section(block, task_result)
-
- _cyclonedx_cli_errors(block, task_result)
-
- return await template.blank("SBOM report", content=block.collect())
+ return tasks[0] if len(tasks) > 0 else None, augment_tasks, osv_tasks
def _outdated_tool_section(block: htm.Block, task_result:
results.SBOMToolScore):
@@ -183,18 +208,18 @@ def _outdated_tool_section(block: htm.Block, task_result:
results.SBOMToolScore)
def _conformance_section(block: htm.Block, task_result: results.SBOMToolScore)
-> None:
+ block.h2["Conformance report"]
warnings =
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in
task_result.warnings]
errors =
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in
task_result.errors]
if warnings:
- block.h2["Warnings"]
+ block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"),
"Warnings"]
_missing_table(block, warnings)
if errors:
- block.h2["Errors"]
+ block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
_missing_table(block, errors)
if not (warnings or errors):
- block.h2["Conformance report"]
block.p["No NTIA 2021 minimum data field conformance warnings or
errors found."]
@@ -202,22 +227,30 @@ def _license_section(block: htm.Block, task_result:
results.SBOMToolScore) -> No
block.h2["Licenses"]
warnings = []
errors = []
+ prev_licenses = None
+ if task_result.prev_licenses is not None:
+ prev_licenses = _load_license_issues(task_result.prev_licenses)
if task_result.license_warnings is not None:
- warnings = [sbom.models.licenses.Issue.model_validate(json.loads(w))
for w in task_result.license_warnings]
+ warnings = _load_license_issues(task_result.license_warnings)
if task_result.license_errors is not None:
- errors = [sbom.models.licenses.Issue.model_validate(json.loads(e)) for
e in task_result.license_errors]
+ errors = _load_license_issues(task_result.license_errors)
+ # TODO: Rework the rendering of these since category in the table is
redundant.
if warnings:
- block.h3["Warnings"]
- _license_table(block, warnings)
+ block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"),
"Warnings"]
+ _license_table(block, warnings, prev_licenses)
if errors:
- block.h3["Errors"]
- _license_table(block, errors)
+ block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
+ _license_table(block, errors, prev_licenses)
if not (warnings or errors):
block.p["No license warnings or errors found."]
+def _load_license_issues(issues: list[str]) ->
list[sbom.models.licenses.Issue]:
+ return [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in
issues]
+
+
def _report_header(
block: htm.Block, is_release_candidate: bool, release: sql.Release,
task_result: results.SBOMToolScore
) -> None:
@@ -234,14 +267,14 @@ def _report_header(
block.p[f"This report is for the latest {release.version} release
candidate."]
-async def _report_task_results(block: htm.Block, tasks: list[sql.Task]):
- if not tasks:
+async def _report_task_results(block: htm.Block, task: sql.Task | None):
+ if task is None:
block.p["No SBOM score found."]
return await template.blank("SBOM report", content=block.collect())
- task_status = tasks[0].status
- task_error = tasks[0].error
- if task_status == sql.TaskStatus.QUEUED:
+ task_status = task.status
+ task_error = task.error
+ if task_status == sql.TaskStatus.QUEUED or task_status ==
sql.TaskStatus.ACTIVE:
block.p["SBOM score is being computed."]
return await template.blank("SBOM report", content=block.collect())
@@ -319,7 +352,11 @@ def _extract_vulnerability_severity(vuln:
osv.VulnerabilityDetails) -> str:
return "Unknown"
-def _license_table(block: htm.Block, items: list[sbom.models.licenses.Issue])
-> None:
+def _license_table(
+ block: htm.Block,
+ items: list[sbom.models.licenses.Issue],
+ prev: list[sbom.models.licenses.Issue] | None,
+) -> None:
warning_rows = [
htm.tr[
htm.td[
@@ -327,9 +364,9 @@ def _license_table(block: htm.Block, items:
list[sbom.models.licenses.Issue]) ->
if (len(components) == 0)
else htm.details[htm.summary[f"Category {category!s}"],
htm.div[_detail_table(components)]]
],
- htm.td[str(count)],
+ htm.td[f"{count!s} {f'({new!s} new, {updated!s} changed)' if new
or updated else ''}"],
]
- for category, count, components in _license_tally(items)
+ for category, count, new, updated, components in _license_tally(items,
prev)
]
block.table(".table.table-sm.table-bordered.table-striped")[
htm.thead[htm.tr[htm.th["License Category"], htm.th["Count"]]],
@@ -358,7 +395,7 @@ def _missing_table(block: htm.Block, items:
list[sbom.models.conformance.Missing
def _detail_table(components: list[str | None]):
return htm.table(".table.table-sm.table-bordered.table-striped")[
- htm.tbody[[htm.tr[htm.td[comp.capitalize()]] for comp in components if
comp is not None]],
+ htm.tbody[[htm.tr[htm.td[comp]] for comp in components if comp is not
None]],
]
@@ -378,33 +415,77 @@ def _missing_tally(items:
list[sbom.models.conformance.Missing]) -> list[tuple[s
)
+# TODO: Update this to return either a block or something we can use later in
a block for styling reasons
def _license_tally(
items: list[sbom.models.licenses.Issue],
-) -> list[tuple[sbom.models.licenses.Category, int, list[str | None]]]:
+ old_issues: list[sbom.models.licenses.Issue] | None,
+) -> list[tuple[sbom.models.licenses.Category, int, int | None, int | None,
list[str | None]]]:
counts: dict[sbom.models.licenses.Category, int] = {}
components: dict[sbom.models.licenses.Category, list[str | None]] = {}
+ new_count = 0
+ updated_count = 0
+ old_map = {lic.component_name: (lic.license_expression, lic.category) for
lic in old_issues} if old_issues else None
for item in items:
key = item.category
counts[key] = counts.get(key, 0) + 1
+ name = str(item).capitalize()
+ if old_map is not None:
+ if item.component_name not in old_map:
+ new_count = new_count + 1
+ name = f"{name} (new)"
+ elif item.license_expression != old_map[item.component_name][0]:
+ updated_count = updated_count + 1
+ name = f"{name} (previously {old_map[item.component_name][0]}
- Category {
+ str(old_map[item.component_name][1]).upper()
+ })"
if key not in components:
- components[key] = [str(item)]
+ components[key] = [name]
else:
- components[key].append(str(item))
+ components[key].append(name)
return sorted(
- [(category, count, components.get(category, [])) for category, count
in counts.items()],
+ [
+ (
+ category,
+ count,
+ new_count if old_issues is not None else None,
+ updated_count if old_issues is not None else None,
+ components.get(category, []),
+ )
+ for category, count in counts.items()
+ ],
key=lambda kv: kv[0].value,
)
-def _vulnerability_component_details_osv(block: htm.Block, component:
results.OSVComponent) -> None:
- details_content = []
- summary_element = htm.summary[
-
htm.span(".badge.bg-danger.me-2.font-monospace")[str(len(component.vulnerabilities))],
- htm.strong[component.purl],
- ]
- details_content.append(summary_element)
-
+def _severity_to_style(severity: str) -> str:
+ match severity.lower():
+ case "critical":
+ return ".bg-danger.text-light"
+ case "high":
+ return ".bg-danger.text-light"
+ case "medium":
+ return ".bg-warning.text-dark"
+ case "moderate":
+ return ".bg-warning.text-dark"
+ case "low":
+ return ".bg-warning.text-dark"
+ case "info":
+ return ".bg-info.text-light"
+ return ".bg-info.text-light"
+
+
+def _vulnerability_component_details_osv(
+ block: htm.Block,
+ component: results.OSVComponent,
+ previous_vulns: dict[str, tuple[str, list[str]]] | None, # id: severity
+) -> int:
+ severities = ["critical", "high", "medium", "moderate", "low", "info",
"none", "unknown"]
+ new = 0
+ worst = 99
+
+ vuln_details = []
for vuln in component.vulnerabilities:
+ is_new = False
vuln_id = vuln.id or "Unknown"
vuln_summary = vuln.summary
vuln_refs = []
@@ -412,11 +493,34 @@ def _vulnerability_component_details_osv(block:
htm.Block, component: results.OS
vuln_refs = [r for r in vuln.references if r.get("type", "") ==
"WEB"]
vuln_primary_ref = vuln_refs[0] if (len(vuln_refs) > 0) else {}
vuln_modified = vuln.modified or "Unknown"
+
vuln_severity = _extract_vulnerability_severity(vuln)
+ worst = _update_worst_severity(severities, vuln_severity, worst)
+
+ if previous_vulns is not None:
+ if (
+ (vuln_id not in previous_vulns)
+ or previous_vulns[vuln_id][0] != vuln_severity
+ or (component.purl not in previous_vulns[vuln_id][1])
+ ):
+ is_new = True
+ new = new + 1
vuln_header = [htm.a(href=vuln_primary_ref.get("url", ""),
target="_blank")[htm.strong(".me-2")[vuln_id]]]
- if vuln_severity != "Unknown":
-
vuln_header.append(htm.span(".badge.bg-warning.text-dark")[vuln_severity])
+ style = f".badge.me-2{_severity_to_style(vuln_severity)}"
+ vuln_header.append(htm.span(style)[vuln_severity])
+
+ if (previous_vulns is not None) and is_new:
+ if (vuln_id in previous_vulns) and (component.purl in
previous_vulns[vuln_id][1]):
+ # If it's there, the sev must have changed
+ vuln_header.append(htm.icon("arrow-left", ".me-2"))
+ vuln_header.append(
+
htm.span(f".badge{_severity_to_style(previous_vulns[vuln_id][0])}.atr-text-strike")[
+ previous_vulns[vuln_id][0]
+ ]
+ )
+ else:
+
vuln_header.append(htm.span(".badge.bg-info.text-light")["new"])
details =
markupsafe.Markup(cmarkgfm.github_flavored_markdown_to_html(vuln.details))
vuln_div =
htm.div(".ms-3.mb-3.border-start.border-warning.border-3.ps-3")[
@@ -428,13 +532,31 @@ def _vulnerability_component_details_osv(block:
htm.Block, component: results.OS
],
htm.div(".mt-2.text-muted")[details or "No additional details
available."],
]
- details_content.append(vuln_div)
-
+ vuln_details.append(vuln_div)
+
+ badge_style = ""
+ if worst < len(severities):
+ badge_style = _severity_to_style(severities[worst])
+ summary_elements =
[htm.span(f".badge{badge_style}.me-2.font-monospace")[str(len(component.vulnerabilities))]]
+ if new > 0:
+ summary_elements.append(htm.span(".badge.me-2.bg-info")[f"{new!s}
new"])
+ summary_elements.append(htm.strong[component.purl])
+ details_content = [htm.summary[*summary_elements], *vuln_details]
block.append(htm.details(".mb-3.rounded")[*details_content])
+ return new
+
+
+def _update_worst_severity(severities: list[str], vuln_severity: str, worst:
int) -> int:
+ try:
+ sev_index = severities.index(vuln_severity)
+ except ValueError:
+ sev_index = 99
+ worst = min(worst, sev_index)
+ return worst
def _vulnerability_scan_button(block: htm.Block) -> None:
- block.p["No new vulnerability scan has been performed for this revision."]
+ block.p["You can perform a new vulnerability scan."]
form.render_block(
block,
@@ -444,9 +566,7 @@ def _vulnerability_scan_button(block: htm.Block) -> None:
)
-def _vulnerability_scan_find_completed_task(
- osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
-) -> sql.Task | None:
+def _vulnerability_scan_find_completed_task(osv_tasks: Sequence[sql.Task],
revision_number: str) -> sql.Task | None:
"""Find the most recent completed OSV scan task for the given revision."""
for task in osv_tasks:
if (task.status == sql.TaskStatus.COMPLETED) and (task.result is not
None):
@@ -456,9 +576,7 @@ def _vulnerability_scan_find_completed_task(
return None
-def _vulnerability_scan_find_in_progress_task(
- osv_tasks: collections.abc.Sequence[sql.Task], revision_number: str
-) -> sql.Task | None:
+def _vulnerability_scan_find_in_progress_task(osv_tasks: Sequence[sql.Task],
revision_number: str) -> sql.Task | None:
"""Find the most recent in-progress OSV scan task for the given
revision."""
for task in osv_tasks:
if task.revision_number == revision_number:
@@ -468,58 +586,94 @@ def _vulnerability_scan_find_in_progress_task(
def _vulnerability_scan_results(
- block: htm.Block, vulns: list[osv.CdxVulnerabilityDetail], scans:
list[str], task: sql.Task | None
+ block: htm.Block,
+ vulns: list[osv.CdxVulnerabilityDetail],
+ scans: list[str],
+ task: sql.Task | None,
+ prev: list[osv.CdxVulnerabilityDetail] | None,
) -> None:
+ previous_vulns = None
+ if prev is not None:
+ previous_osv = [
+ (_cdx_to_osv(v), [a.get("ref", "") for a in v.affects] if
v.affects is not None else []) for v in prev
+ ]
+ previous_vulns = {v.id: (_extract_vulnerability_severity(v), a) for v,
a in previous_osv}
if task is not None:
- task_result = task.result
- if not isinstance(task_result, results.SBOMOSVScan):
- block.p["Invalid scan result format."]
- return
+ _vulnerability_results_from_scan(task, block, previous_vulns)
+ else:
+ _vulnerability_results_from_bom(vulns, block, scans, previous_vulns)
- components = task_result.components
- ignored = task_result.ignored
- ignored_count = len(ignored)
- if not components:
- block.p["No vulnerabilities found."]
- if ignored_count > 0:
- component_word = "component was" if (ignored_count == 1) else
"components were"
- block.p[f"{ignored_count} {component_word} ignored due to
missing PURL or version information:"]
- block.p[f"{','.join(ignored)}"]
- return
+def _vulnerability_results_from_bom(
+ vulns: list[osv.CdxVulnerabilityDetail],
+ block: htm.Block,
+ scans: list[str],
+ previous_vulns: dict[str, tuple[str, list[str]]] | None,
+) -> None:
+ total_new = 0
+ new_block = htm.Block()
+ if len(vulns) == 0:
+ block.p["No vulnerabilities listed in this SBOM."]
+ return
+ components = {a.get("ref", "") for v in vulns if v.affects is not None for
a in v.affects}
+
+ if len(scans) > 0:
+ block.p["This SBOM was scanned for vulnerabilities at revision ",
htm.code[scans[-1]], "."]
+
+ for component in components:
+ new = _vulnerability_component_details_osv(
+ new_block,
+ results.OSVComponent(
+ purl=component,
+ vulnerabilities=[
+ _cdx_to_osv(v)
+ for v in vulns
+ if (v.affects is not None) and (component in [a.get("ref")
for a in v.affects])
+ ],
+ ),
+ previous_vulns,
+ )
+ total_new = total_new + new
- block.p[f"Scan found vulnerabilities in {len(components)} components:"]
+ new_str = f" ({total_new!s} new since last release)" if (total_new > 0)
else ""
+ block.p[f"Vulnerabilities{new_str} found in {len(components)} components:"]
+ block.append(new_block)
- for component in components:
- _vulnerability_component_details_osv(block, component)
+def _vulnerability_results_from_scan(
+ task: sql.Task, block: htm.Block, previous_vulns: dict[str, tuple[str,
list[str]]] | None
+) -> None:
+ total_new = 0
+ new_block = htm.Block()
+ task_result = task.result
+ if not isinstance(task_result, results.SBOMOSVScan):
+ block.p["Invalid scan result format."]
+ return
+
+ components = task_result.components
+ ignored = task_result.ignored
+ ignored_count = len(ignored)
+
+ if not components:
+ block.p["No vulnerabilities found."]
if ignored_count > 0:
component_word = "component was" if (ignored_count == 1) else
"components were"
block.p[f"{ignored_count} {component_word} ignored due to missing
PURL or version information:"]
block.p[f"{','.join(ignored)}"]
- else:
- if len(vulns) == 0:
- block.p["No vulnerabilities listed in this SBOM."]
- return
- components = {a.get("ref", "") for v in vulns if v.affects is not None
for a in v.affects}
+ return
- if len(scans) > 0:
- block.p["This SBOM was scanned for vulnerabilities at revision ",
htm.code[scans[-1]], "."]
+ for component in components:
+ new = _vulnerability_component_details_osv(new_block, component,
previous_vulns)
+ total_new = total_new + new
- block.p[f"Vulnerabilities found in {len(components)} components:"]
+ new_str = f" ({total_new!s} new since last release)" if (total_new > 0)
else ""
+ block.p[f"Scan found vulnerabilities{new_str} in {len(components)}
components:"]
- for component in components:
- _vulnerability_component_details_osv(
- block,
- results.OSVComponent(
- purl=component,
- vulnerabilities=[
- _cdx_to_osv(v)
- for v in vulns
- if (v.affects is not None) and (component in
[a.get("ref") for a in v.affects])
- ],
- ),
- )
+ if ignored_count > 0:
+ component_word = "component was" if (ignored_count == 1) else
"components were"
+ block.p[f"{ignored_count} {component_word} ignored due to missing PURL
or version information:"]
+ block.p[f"{','.join(ignored)}"]
+ block.append(new_block)
def _cdx_to_osv(cdx: osv.CdxVulnerabilityDetail) -> osv.VulnerabilityDetails:
@@ -547,8 +701,7 @@ def _vulnerability_scan_section(
version: str,
file_path: str,
task_result: results.SBOMToolScore,
- vulnerabilities: list[osv.CdxVulnerabilityDetail],
- osv_tasks: collections.abc.Sequence[sql.Task],
+ osv_tasks: Sequence[sql.Task],
is_release_candidate: bool,
) -> None:
"""Display the vulnerability scan section based on task status."""
@@ -559,9 +712,21 @@ def _vulnerability_scan_section(
block.h2["Vulnerabilities"]
scans = []
+ if task_result.vulnerabilities is not None:
+ vulnerabilities = [
+ sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for
e in task_result.vulnerabilities
+ ]
+ else:
+ vulnerabilities = []
+ if task_result.prev_vulnerabilities is not None:
+ prev_vulnerabilities = [
+ sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for
e in task_result.prev_vulnerabilities
+ ]
+ else:
+ prev_vulnerabilities = None
if task_result.atr_props is not None:
scans = [t.get("value", "") for t in task_result.atr_props if
t.get("name", "") == "asf:atr:osv-scan"]
- _vulnerability_scan_results(block, vulnerabilities, scans, completed_task)
+ _vulnerability_scan_results(block, vulnerabilities, scans, completed_task,
prev_vulnerabilities)
if not is_release_candidate:
if in_progress_task is not None:
diff --git a/atr/htm.py b/atr/htm.py
index 5b64ea3..c0d925c 100644
--- a/atr/htm.py
+++ b/atr/htm.py
@@ -44,6 +44,7 @@ h1 = htpy.h1
h2 = htpy.h2
h3 = htpy.h3
html = htpy.html
+i = htpy.i
li = htpy.li
p = htpy.p
pre = htpy.pre
@@ -279,6 +280,10 @@ class Block:
return BlockElementCallable(self, ul)
+def icon(name: str, classes="") -> Element:
+ return i(f".bi.bi-{name}{classes}")
+
+
def ul_links(*items: tuple[str, str]) -> Element:
li_items = [li[a(href=item[0])[item[1]]] for item in items]
return ul[*li_items]
diff --git a/atr/models/results.py b/atr/models/results.py
index b8e8c13..5d3ffbc 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -128,6 +128,9 @@ class SBOMToolScore(schema.Strict):
version_name: str = schema.description("Version name")
revision_number: str = schema.description("Revision number")
bom_version: int | None = schema.Field(default=None, strict=False,
description="BOM Version scanned")
+ prev_bom_version: int | None = schema.Field(
+ default=None, strict=False, description="BOM Version from previous
release"
+ )
file_path: str = schema.description("Relative path to the scored SBOM
file")
warnings: list[str] = schema.description("Warnings from the SBOM tool")
errors: list[str] = schema.description("Errors from the SBOM tool")
@@ -141,6 +144,12 @@ class SBOMToolScore(schema.Strict):
vulnerabilities: list[str] | None = schema.Field(
default=None, strict=False, description="Vulnerabilities found in the
SBOM"
)
+ prev_licenses: list[str] | None = schema.Field(
+ default=None, strict=False, description="Licenses from previous
release"
+ )
+ prev_vulnerabilities: list[str] | None = schema.Field(
+ default=None, strict=False, description="Vulnerabilities from previous
release"
+ )
atr_props: list[dict[str, str]] | None = schema.Field(
default=None, strict=False, description="ATR properties found in the
SBOM"
)
diff --git a/atr/models/schema.py b/atr/models/schema.py
index 6bcb013..f7ac483 100644
--- a/atr/models/schema.py
+++ b/atr/models/schema.py
@@ -25,7 +25,7 @@ Field = pydantic.Field
class Lax(pydantic.BaseModel):
- model_config = pydantic.ConfigDict(extra="allow", strict=False,
validate_assignment=True)
+ model_config = pydantic.ConfigDict(extra="allow", strict=False,
validate_assignment=True, validate_by_name=True)
class Strict(pydantic.BaseModel):
diff --git a/atr/sbom/cli.py b/atr/sbom/cli.py
index 512a93c..a9cbd80 100644
--- a/atr/sbom/cli.py
+++ b/atr/sbom/cli.py
@@ -31,7 +31,7 @@ from .utilities import bundle_to_ntia_patch,
bundle_to_vuln_patch, patch_to_data
def command_license(bundle: models.bundle.Bundle) -> None:
- warnings, errors = check(bundle.bom)
+ _, warnings, errors = check(bundle.bom)
if warnings:
print("WARNINGS (Category B):")
for warning in warnings:
diff --git a/atr/sbom/licenses.py b/atr/sbom/licenses.py
index bfed06a..470571c 100644
--- a/atr/sbom/licenses.py
+++ b/atr/sbom/licenses.py
@@ -23,9 +23,11 @@ from .spdx import license_expression_atoms
def check(
bom_value: models.bom.Bom,
-) -> tuple[list[models.licenses.Issue], list[models.licenses.Issue]]:
+ include_all: bool = False,
+) -> tuple[list[models.licenses.Issue], list[models.licenses.Issue],
list[models.licenses.Issue]]:
warnings: list[models.licenses.Issue] = []
errors: list[models.licenses.Issue] = []
+ good: list[models.licenses.Issue] = []
components = bom_value.components or []
if bom_value.metadata and bom_value.metadata.component:
@@ -99,5 +101,17 @@ def check(
component_type=type,
)
)
+ elif include_all:
+ good.append(
+ models.licenses.Issue(
+ component_name=name,
+ component_version=version,
+ license_expression=license_expr,
+ category=models.licenses.Category.A,
+ any_unknown=False,
+ scope=scope,
+ component_type=type,
+ )
+ )
- return warnings, errors
+ return good, warnings, errors
diff --git a/atr/sbom/models/base.py b/atr/sbom/models/base.py
index d392d17..2e34024 100644
--- a/atr/sbom/models/base.py
+++ b/atr/sbom/models/base.py
@@ -21,7 +21,7 @@ import pydantic
class Lax(pydantic.BaseModel):
- model_config = pydantic.ConfigDict(extra="allow", strict=False)
+ model_config = pydantic.ConfigDict(extra="allow", strict=False,
validate_assignment=True, validate_by_name=True)
class Strict(pydantic.BaseModel):
diff --git a/atr/static/css/atr.css b/atr/static/css/atr.css
index aee676b..bd3566d 100644
--- a/atr/static/css/atr.css
+++ b/atr/static/css/atr.css
@@ -545,3 +545,7 @@ td.atr-shrink {
.atr-checks-files {
line-height: 2.25;
}
+
+.atr-text-strike {
+ text-decoration: line-through;
+}
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 2c4b985..963f765 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -69,6 +69,18 @@ async def draft_checks(
release = await data.release(name=sql.release_name(project_name,
release_version), _committee=True).demand(
RuntimeError("Release not found")
)
+ other_releases = (
+ await data.release(project_name=project_name,
phase=sql.ReleasePhase.RELEASE)
+ .order_by(sql.Release.released)
+ .all()
+ )
+ release_versions = sorted(
+ [v for v in other_releases], key=lambda v:
util.version_sort_key(v.version), reverse=True
+ )
+ release_version_sortable = util.version_sort_key(release_version)
+ previous_version = next(
+ (v for v in release_versions if util.version_sort_key(v.version) <
release_version_sortable), None
+ )
for path in relative_paths:
path_str = str(path)
task_function: Callable[[str, sql.Release, str, str],
Awaitable[list[sql.Task]]] | None = None
@@ -94,6 +106,7 @@ async def draft_checks(
"project_name": project_name,
"version_name": release_version,
"revision_number": revision_number,
+ "previous_release_version":
previous_version.version if previous_version else None,
"file_path": path_str,
"asf_uid": asf_uid,
},
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 46ee194..0b2ea04 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -74,6 +74,10 @@ class FileArgs(schema.Strict):
asf_uid: str | None = None
+class ScoreArgs(FileArgs):
+ previous_release_version: str | None = schema.description("Previous
release version")
+
+
@checks.with_model(FileArgs)
async def augment(args: FileArgs) -> results.Results | None:
base_dir = util.get_unfinished_dir() / args.project_name /
args.version_name / args.revision_number
@@ -209,9 +213,12 @@ async def score_qs(args: FileArgs) -> results.Results |
None:
)
[email protected]_model(FileArgs)
-async def score_tool(args: FileArgs) -> results.Results | None:
[email protected]_model(ScoreArgs)
+async def score_tool(args: ScoreArgs) -> results.Results | None:
base_dir = util.get_unfinished_dir() / args.project_name /
args.version_name / args.revision_number
+ previous_base_dir = None
+ if args.previous_release_version is not None:
+ previous_base_dir = util.get_finished_dir() / args.project_name /
args.previous_release_version
if not os.path.isdir(base_dir):
raise SBOMScoringError("Revision directory does not exist",
{"base_dir": str(base_dir)})
full_path = os.path.join(base_dir, args.file_path)
@@ -223,15 +230,35 @@ async def score_tool(args: FileArgs) -> results.Results |
None:
# TODO: Could update the ATR version with a constant showing last change
to the augment/scan
# tools so we know if it's outdated
outdated = sbom.tool.plugin_outdated_version(bundle.bom)
- license_warnings, license_errors = sbom.licenses.check(bundle.bom)
+ _, license_warnings, license_errors = sbom.licenses.check(bundle.bom)
vulnerabilities = sbom.osv.vulns_from_bundle(bundle)
cli_errors = sbom.cyclonedx.validate_cli(bundle)
+
+ prev_version = None
+ prev_licenses = None
+ prev_vulnerabilities = None
+ if previous_base_dir is not None:
+ previous_full_path = os.path.join(previous_base_dir, args.file_path)
+ try:
+ previous_bundle =
sbom.utilities.path_to_bundle(pathlib.Path(previous_full_path))
+ except FileNotFoundError:
+ # Previous release didn't include this file
+ previous_bundle = None
+ if previous_bundle is not None:
+ prev_version, _ =
sbom.utilities.get_props_from_bundle(previous_bundle)
+ prev_good, prev_license_warnings, prev_license_errors =
sbom.licenses.check(
+ previous_bundle.bom, include_all=True
+ )
+ prev_licenses = [*prev_good, *prev_license_warnings,
*prev_license_errors]
+ prev_vulnerabilities = sbom.osv.vulns_from_bundle(previous_bundle)
+
return results.SBOMToolScore(
kind="sbom_tool_score",
project_name=args.project_name,
version_name=args.version_name,
revision_number=args.revision_number,
bom_version=version,
+ prev_bom_version=prev_version,
file_path=args.file_path,
warnings=[w.model_dump_json() for w in warnings],
errors=[e.model_dump_json() for e in errors],
@@ -239,6 +266,8 @@ async def score_tool(args: FileArgs) -> results.Results |
None:
license_warnings=[w.model_dump_json() for w in license_warnings] if
license_warnings else None,
license_errors=[e.model_dump_json() for e in license_errors] if
license_errors else None,
vulnerabilities=[v.model_dump_json() for v in vulnerabilities],
+ prev_licenses=[w.model_dump_json() for w in prev_licenses] if
prev_licenses else None,
+ prev_vulnerabilities=[v.model_dump_json() for v in
prev_vulnerabilities] if prev_vulnerabilities else None,
atr_props=properties,
cli_errors=cli_errors,
)
diff --git a/atr/util.py b/atr/util.py
index 42ebea8..0838124 100644
--- a/atr/util.py
+++ b/atr/util.py
@@ -1011,6 +1011,53 @@ def version_name_error(version_name: str) -> str | None:
return None
+def version_sort_key(version: str) -> bytes:
+ """
+ Convert a version string into a sortable byte sequence.
+ Prefixes each digit sequence with its length as u16 little-endian.
+ Strips leading zeros and appends a byte for the count of leading zeros.
+ """
+ result = []
+ i = 0
+ length = len(version)
+ while i < length:
+ if version[i].isdigit():
+ # Find the end of this digit sequence
+ j = i
+ while j < length and version[j].isdigit():
+ j += 1
+
+ digit_sequence = version[i:j]
+
+ # Count leading zeros
+ leading_zeros = 0
+ for char in digit_sequence:
+ if char == "0":
+ leading_zeros += 1
+ else:
+ break
+
+ # Strip leading zeros (but keep at least one digit if all zeros)
+ stripped = digit_sequence.lstrip("0")
+
+ # Count the stripped digits and encode as u16 little-endian
+ digit_count = len(stripped)
+ length_bytes = digit_count.to_bytes(2)
+
+ # Add length prefix + stripped digits + leading zero count
+ result.extend(length_bytes)
+ result.extend(stripped.encode("utf-8"))
+ result.append(leading_zeros)
+
+ i = j
+ else:
+ # Non-digit character, just add it
+ result.extend(version[i].encode("utf-8"))
+ i += 1
+
+ return bytes(result)
+
+
async def _create_hard_link_clone_checks(
source_dir: pathlib.Path,
dest_dir: pathlib.Path,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]