This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch previous_sbom_results
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 1ce2be5026aed6656fd0ebab1a9ba4d981476345
Author: Alastair McFarlane <[email protected]>
AuthorDate: Mon Dec 22 18:12:25 2025 +0000

    Find previous results and include in score result for report.
---
 atr/get/sbom.py       | 60 +++++++++++++++++++++++++++++++++++----------------
 atr/models/results.py | 12 +++++++++++
 atr/tasks/__init__.py | 13 +++++++++++
 atr/tasks/sbom.py     | 32 +++++++++++++++++++++++++--
 atr/util.py           | 47 ++++++++++++++++++++++++++++++++++++++++
 5 files changed, 144 insertions(+), 20 deletions(-)

diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index d60ca1a..7bda641 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -91,7 +91,9 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
     block = htm.Block()
     block.h1["SBOM report"]
 
-    await _report_task_results(block, list(tasks))
+    task_status = await _report_task_results(block, list(tasks))
+    if task_status:
+        return task_status
 
     task_result = tasks[0].result
     if not isinstance(task_result, results.SBOMToolScore):
@@ -115,16 +117,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
     _conformance_section(block, task_result)
     _license_section(block, task_result)
 
-    if task_result.vulnerabilities is not None:
-        vulnerabilities = [
-            sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for 
e in task_result.vulnerabilities
-        ]
-    else:
-        vulnerabilities = []
-
-    _vulnerability_scan_section(
-        block, project, version, file_path, task_result, vulnerabilities, 
osv_tasks, is_release_candidate
-    )
+    _vulnerability_scan_section(block, project, version, file_path, 
task_result, osv_tasks, is_release_candidate)
 
     _outdated_tool_section(block, task_result)
 
@@ -197,17 +190,29 @@ def _license_section(block: htm.Block, task_result: 
results.SBOMToolScore) -> No
     block.h2["Licenses"]
     warnings = []
     errors = []
+    prev_warnings = []
+    prev_errors = []
     if task_result.license_warnings is not None:
         warnings = [sbom.models.licenses.Issue.model_validate(json.loads(w)) 
for w in task_result.license_warnings]
+        prev_warnings = (
+            [sbom.models.licenses.Issue.model_validate(json.loads(w)) for w in 
task_result.prev_license_warnings]
+            if task_result.prev_license_warnings is not None
+            else []
+        )
     if task_result.license_errors is not None:
         errors = [sbom.models.licenses.Issue.model_validate(json.loads(e)) for 
e in task_result.license_errors]
+        prev_errors = (
+            [sbom.models.licenses.Issue.model_validate(json.loads(e)) for e in 
task_result.prev_license_errors]
+            if task_result.prev_license_errors is not None
+            else []
+        )
     if warnings:
         block.h3["Warnings"]
-        _license_table(block, warnings)
+        _license_table(block, warnings, prev_warnings)
 
     if errors:
         block.h3["Errors"]
-        _license_table(block, errors)
+        _license_table(block, errors, prev_errors)
 
     if not (warnings or errors):
         block.p["No license warnings or errors found."]
@@ -236,7 +241,7 @@ async def _report_task_results(block: htm.Block, tasks: 
list[sql.Task]):
 
     task_status = tasks[0].status
     task_error = tasks[0].error
-    if task_status == sql.TaskStatus.QUEUED:
+    if task_status == sql.TaskStatus.QUEUED or task_status == 
sql.TaskStatus.ACTIVE:
         block.p["SBOM score is being computed."]
         return await template.blank("SBOM report", content=block.collect())
 
@@ -314,7 +319,11 @@ def _extract_vulnerability_severity(vuln: 
osv.VulnerabilityDetails) -> str:
     return "Unknown"
 
 
-def _license_table(block: htm.Block, items: list[sbom.models.licenses.Issue]) 
-> None:
+def _license_table(
+    block: htm.Block,
+    items: list[sbom.models.licenses.Issue],
+    prev: list[sbom.models.licenses.Issue],  # TODO: Compare items to old items
+) -> None:
     warning_rows = [
         htm.tr[
             htm.td[
@@ -463,7 +472,11 @@ def _vulnerability_scan_find_in_progress_task(
 
 
 def _vulnerability_scan_results(
-    block: htm.Block, vulns: list[osv.CdxVulnerabilityDetail], scans: 
list[str], task: sql.Task | None
+    block: htm.Block,
+    vulns: list[osv.CdxVulnerabilityDetail],
+    scans: list[str],
+    task: sql.Task | None,
+    prev: list[osv.CdxVulnerabilityDetail],  # TODO: highlight new 
vulnerabilities
 ) -> None:
     if task is not None:
         task_result = task.result
@@ -542,7 +555,6 @@ def _vulnerability_scan_section(
     version: str,
     file_path: str,
     task_result: results.SBOMToolScore,
-    vulnerabilities: list[osv.CdxVulnerabilityDetail],
     osv_tasks: collections.abc.Sequence[sql.Task],
     is_release_candidate: bool,
 ) -> None:
@@ -554,9 +566,21 @@ def _vulnerability_scan_section(
     block.h2["Vulnerabilities"]
 
     scans = []
+    if task_result.vulnerabilities is not None:
+        vulnerabilities = [
+            sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for 
e in task_result.vulnerabilities
+        ]
+    else:
+        vulnerabilities = []
+    if task_result.prev_vulnerabilities is not None:
+        prev_vulnerabilities = [
+            sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for 
e in task_result.prev_vulnerabilities
+        ]
+    else:
+        prev_vulnerabilities = []
     if task_result.atr_props is not None:
         scans = [t.get("value", "") for t in task_result.atr_props if 
t.get("name", "") == "asf:atr:osv-scan"]
-    _vulnerability_scan_results(block, vulnerabilities, scans, completed_task)
+    _vulnerability_scan_results(block, vulnerabilities, scans, completed_task, 
prev_vulnerabilities)
 
     if not is_release_candidate:
         if in_progress_task is not None:
diff --git a/atr/models/results.py b/atr/models/results.py
index b8e8c13..2a2e6a1 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -128,6 +128,9 @@ class SBOMToolScore(schema.Strict):
     version_name: str = schema.description("Version name")
     revision_number: str = schema.description("Revision number")
     bom_version: int | None = schema.Field(default=None, strict=False, 
description="BOM Version scanned")
+    prev_bom_version: int | None = schema.Field(
+        default=None, strict=False, description="BOM Version from previous 
release"
+    )
     file_path: str = schema.description("Relative path to the scored SBOM 
file")
     warnings: list[str] = schema.description("Warnings from the SBOM tool")
     errors: list[str] = schema.description("Errors from the SBOM tool")
@@ -141,6 +144,15 @@ class SBOMToolScore(schema.Strict):
     vulnerabilities: list[str] | None = schema.Field(
         default=None, strict=False, description="Vulnerabilities found in the 
SBOM"
     )
+    prev_license_warnings: list[str] | None = schema.Field(
+        default=[], strict=False, description="License warnings from previous 
release"
+    )
+    prev_license_errors: list[str] | None = schema.Field(
+        default=[], strict=False, description="License errors from previous 
release"
+    )
+    prev_vulnerabilities: list[str] | None = schema.Field(
+        default=None, strict=False, description="Vulnerabilities from previous 
release"
+    )
     atr_props: list[dict[str, str]] | None = schema.Field(
         default=None, strict=False, description="ATR properties found in the 
SBOM"
     )
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 2c4b985..963f765 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -69,6 +69,18 @@ async def draft_checks(
         release = await data.release(name=sql.release_name(project_name, 
release_version), _committee=True).demand(
             RuntimeError("Release not found")
         )
+        other_releases = (
+            await data.release(project_name=project_name, 
phase=sql.ReleasePhase.RELEASE)
+            .order_by(sql.Release.released)
+            .all()
+        )
+        release_versions = sorted(
+            [v for v in other_releases], key=lambda v: 
util.version_sort_key(v.version), reverse=True
+        )
+        release_version_sortable = util.version_sort_key(release_version)
+        previous_version = next(
+            (v for v in release_versions if util.version_sort_key(v.version) < 
release_version_sortable), None
+        )
         for path in relative_paths:
             path_str = str(path)
             task_function: Callable[[str, sql.Release, str, str], 
Awaitable[list[sql.Task]]] | None = None
@@ -94,6 +106,7 @@ async def draft_checks(
                             "project_name": project_name,
                             "version_name": release_version,
                             "revision_number": revision_number,
+                            "previous_release_version": 
previous_version.version if previous_version else None,
                             "file_path": path_str,
                             "asf_uid": asf_uid,
                         },
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 46ee194..b11d316 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -74,6 +74,10 @@ class FileArgs(schema.Strict):
     asf_uid: str | None = None
 
 
+class ScoreArgs(FileArgs):
+    previous_release_version: str | None = schema.description("Previous 
release version")
+
+
 @checks.with_model(FileArgs)
 async def augment(args: FileArgs) -> results.Results | None:
     base_dir = util.get_unfinished_dir() / args.project_name / 
args.version_name / args.revision_number
@@ -209,9 +213,12 @@ async def score_qs(args: FileArgs) -> results.Results | 
None:
     )
 
 
[email protected]_model(FileArgs)
-async def score_tool(args: FileArgs) -> results.Results | None:
[email protected]_model(ScoreArgs)
+async def score_tool(args: ScoreArgs) -> results.Results | None:
     base_dir = util.get_unfinished_dir() / args.project_name / 
args.version_name / args.revision_number
+    previous_base_dir = None
+    if args.previous_release_version is not None:
+        previous_base_dir = util.get_finished_dir() / args.project_name / 
args.previous_release_version
     if not os.path.isdir(base_dir):
         raise SBOMScoringError("Revision directory does not exist", 
{"base_dir": str(base_dir)})
     full_path = os.path.join(base_dir, args.file_path)
@@ -226,12 +233,30 @@ async def score_tool(args: FileArgs) -> results.Results | 
None:
     license_warnings, license_errors = sbom.licenses.check(bundle.bom)
     vulnerabilities = sbom.osv.vulns_from_bundle(bundle)
     cli_errors = sbom.cyclonedx.validate_cli(bundle)
+
+    prev_version = None
+    prev_license_warnings = None
+    prev_license_errors = None
+    prev_vulnerabilities = None
+    if previous_base_dir is not None:
+        previous_full_path = os.path.join(previous_base_dir, args.file_path)
+        try:
+            previous_bundle = 
sbom.utilities.path_to_bundle(pathlib.Path(previous_full_path))
+        except FileNotFoundError:
+            # Previous release didn't include this file
+            previous_bundle = None
+        if previous_bundle is not None:
+            prev_version, _ = 
sbom.utilities.get_props_from_bundle(previous_bundle)
+            prev_license_warnings, prev_license_errors = 
sbom.licenses.check(previous_bundle.bom)
+            prev_vulnerabilities = sbom.osv.vulns_from_bundle(previous_bundle)
+
     return results.SBOMToolScore(
         kind="sbom_tool_score",
         project_name=args.project_name,
         version_name=args.version_name,
         revision_number=args.revision_number,
         bom_version=version,
+        prev_bom_version=prev_version,
         file_path=args.file_path,
         warnings=[w.model_dump_json() for w in warnings],
         errors=[e.model_dump_json() for e in errors],
@@ -239,6 +264,9 @@ async def score_tool(args: FileArgs) -> results.Results | 
None:
         license_warnings=[w.model_dump_json() for w in license_warnings] if 
license_warnings else None,
         license_errors=[e.model_dump_json() for e in license_errors] if 
license_errors else None,
         vulnerabilities=[v.model_dump_json() for v in vulnerabilities],
+        prev_license_warnings=[w.model_dump_json() for w in 
prev_license_warnings] if prev_license_warnings else None,
+        prev_license_errors=[e.model_dump_json() for e in prev_license_errors] 
if prev_license_errors else None,
+        prev_vulnerabilities=[v.model_dump_json() for v in 
prev_vulnerabilities] if prev_vulnerabilities else None,
         atr_props=properties,
         cli_errors=cli_errors,
     )
diff --git a/atr/util.py b/atr/util.py
index 42ebea8..f037700 100644
--- a/atr/util.py
+++ b/atr/util.py
@@ -1011,6 +1011,53 @@ def version_name_error(version_name: str) -> str | None:
     return None
 
 
+def version_sort_key(version: str) -> bytes:
+    """
+    Convert a version string into a sortable byte sequence.
+    Prefixes each digit sequence with its length as u16 little-endian.
+    Strips leading zeros and appends a byte for the count of leading zeros.
+    """
+    result = []
+    i = 0
+    length = len(version)
+    while i < length:
+        if version[i].isdigit():
+            # Find the end of this digit sequence
+            j = i
+            while j < length and version[j].isdigit():
+                j += 1
+
+            digit_sequence = version[i:j]
+
+            # Count leading zeros
+            leading_zeros = 0
+            for char in digit_sequence:
+                if char == "0":
+                    leading_zeros += 1
+                else:
+                    break
+
+            # Strip leading zeros (but keep at least one digit if all zeros)
+            stripped = digit_sequence.lstrip("0") or "0"
+
+            # Count the stripped digits and encode as u16 little-endian
+            digit_count = len(stripped)
+            length_bytes = digit_count.to_bytes(2, "little")
+
+            # Add length prefix + stripped digits + leading zero count
+            result.extend(length_bytes)
+            result.extend(stripped.encode("utf-8"))
+            result.append(leading_zeros)
+
+            i = j
+        else:
+            # Non-digit character, just add it
+            result.append(ord(version[i]))
+            i += 1
+
+    return bytes(result)
+
+
 async def _create_hard_link_clone_checks(
     source_dir: pathlib.Path,
     dest_dir: pathlib.Path,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to