This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch previous_sbom_results in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 1ce2be5026aed6656fd0ebab1a9ba4d981476345 Author: Alastair McFarlane <[email protected]> AuthorDate: Mon Dec 22 18:12:25 2025 +0000 Find previous results and include in score result for report. --- atr/get/sbom.py | 60 +++++++++++++++++++++++++++++++++++---------------- atr/models/results.py | 12 +++++++++++ atr/tasks/__init__.py | 13 +++++++++++ atr/tasks/sbom.py | 32 +++++++++++++++++++++++++-- atr/util.py | 47 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 144 insertions(+), 20 deletions(-) diff --git a/atr/get/sbom.py b/atr/get/sbom.py index d60ca1a..7bda641 100644 --- a/atr/get/sbom.py +++ b/atr/get/sbom.py @@ -91,7 +91,9 @@ async def report(session: web.Committer, project: str, version: str, file_path: block = htm.Block() block.h1["SBOM report"] - await _report_task_results(block, list(tasks)) + task_status = await _report_task_results(block, list(tasks)) + if task_status: + return task_status task_result = tasks[0].result if not isinstance(task_result, results.SBOMToolScore): @@ -115,16 +117,7 @@ async def report(session: web.Committer, project: str, version: str, file_path: _conformance_section(block, task_result) _license_section(block, task_result) - if task_result.vulnerabilities is not None: - vulnerabilities = [ - sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in task_result.vulnerabilities - ] - else: - vulnerabilities = [] - - _vulnerability_scan_section( - block, project, version, file_path, task_result, vulnerabilities, osv_tasks, is_release_candidate - ) + _vulnerability_scan_section(block, project, version, file_path, task_result, osv_tasks, is_release_candidate) _outdated_tool_section(block, task_result) @@ -197,17 +190,29 @@ def _license_section(block: htm.Block, task_result: results.SBOMToolScore) -> No block.h2["Licenses"] warnings = [] errors = [] + prev_warnings = [] + prev_errors = [] if task_result.license_warnings is not None: warnings = [sbom.models.licenses.Issue.model_validate(json.loads(w)) for w in task_result.license_warnings] + prev_warnings = ( + [sbom.models.licenses.Issue.model_validate(json.loads(w)) for w in task_result.prev_license_warnings] + if task_result.prev_license_warnings is not None + else [] + ) if task_result.license_errors is not None: errors = [sbom.models.licenses.Issue.model_validate(json.loads(e)) for e in task_result.license_errors] + prev_errors = ( + [sbom.models.licenses.Issue.model_validate(json.loads(e)) for e in task_result.prev_license_errors] + if task_result.prev_license_errors is not None + else [] + ) if warnings: block.h3["Warnings"] - _license_table(block, warnings) + _license_table(block, warnings, prev_warnings) if errors: block.h3["Errors"] - _license_table(block, errors) + _license_table(block, errors, prev_errors) if not (warnings or errors): block.p["No license warnings or errors found."] @@ -236,7 +241,7 @@ async def _report_task_results(block: htm.Block, tasks: list[sql.Task]): task_status = tasks[0].status task_error = tasks[0].error - if task_status == sql.TaskStatus.QUEUED: + if task_status == sql.TaskStatus.QUEUED or task_status == sql.TaskStatus.ACTIVE: block.p["SBOM score is being computed."] return await template.blank("SBOM report", content=block.collect()) @@ -314,7 +319,11 @@ def _extract_vulnerability_severity(vuln: osv.VulnerabilityDetails) -> str: return "Unknown" -def _license_table(block: htm.Block, items: list[sbom.models.licenses.Issue]) -> None: +def _license_table( + block: htm.Block, + items: list[sbom.models.licenses.Issue], + prev: list[sbom.models.licenses.Issue], # TODO: Compare items to old items +) -> None: warning_rows = [ htm.tr[ htm.td[ @@ -463,7 +472,11 @@ def _vulnerability_scan_find_in_progress_task( def _vulnerability_scan_results( - block: htm.Block, vulns: list[osv.CdxVulnerabilityDetail], scans: list[str], task: sql.Task | None + block: htm.Block, + vulns: list[osv.CdxVulnerabilityDetail], + scans: list[str], + task: sql.Task | None, + prev: list[osv.CdxVulnerabilityDetail], # TODO: highlight new vulnerabilities ) -> None: if task is not None: task_result = task.result @@ -542,7 +555,6 @@ def _vulnerability_scan_section( version: str, file_path: str, task_result: results.SBOMToolScore, - vulnerabilities: list[osv.CdxVulnerabilityDetail], osv_tasks: collections.abc.Sequence[sql.Task], is_release_candidate: bool, ) -> None: @@ -554,9 +566,21 @@ def _vulnerability_scan_section( block.h2["Vulnerabilities"] scans = [] + if task_result.vulnerabilities is not None: + vulnerabilities = [ + sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in task_result.vulnerabilities + ] + else: + vulnerabilities = [] + if task_result.prev_vulnerabilities is not None: + prev_vulnerabilities = [ + sbom.models.osv.CdxVulnAdapter.validate_python(json.loads(e)) for e in task_result.prev_vulnerabilities + ] + else: + prev_vulnerabilities = [] if task_result.atr_props is not None: scans = [t.get("value", "") for t in task_result.atr_props if t.get("name", "") == "asf:atr:osv-scan"] - _vulnerability_scan_results(block, vulnerabilities, scans, completed_task) + _vulnerability_scan_results(block, vulnerabilities, scans, completed_task, prev_vulnerabilities) if not is_release_candidate: if in_progress_task is not None: diff --git a/atr/models/results.py b/atr/models/results.py index b8e8c13..2a2e6a1 100644 --- a/atr/models/results.py +++ b/atr/models/results.py @@ -128,6 +128,9 @@ class SBOMToolScore(schema.Strict): version_name: str = schema.description("Version name") revision_number: str = schema.description("Revision number") bom_version: int | None = schema.Field(default=None, strict=False, description="BOM Version scanned") + prev_bom_version: int | None = schema.Field( + default=None, strict=False, description="BOM Version from previous release" + ) file_path: str = schema.description("Relative path to the scored SBOM file") warnings: list[str] = schema.description("Warnings from the SBOM tool") errors: list[str] = schema.description("Errors from the SBOM tool") @@ -141,6 +144,15 @@ class SBOMToolScore(schema.Strict): vulnerabilities: list[str] | None = schema.Field( default=None, strict=False, description="Vulnerabilities found in the SBOM" ) + prev_license_warnings: list[str] | None = schema.Field( + default=[], strict=False, description="License warnings from previous release" + ) + prev_license_errors: list[str] | None = schema.Field( + default=[], strict=False, description="License errors from previous release" + ) + prev_vulnerabilities: list[str] | None = schema.Field( + default=None, strict=False, description="Vulnerabilities from previous release" + ) atr_props: list[dict[str, str]] | None = schema.Field( default=None, strict=False, description="ATR properties found in the SBOM" ) diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index 2c4b985..963f765 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -69,6 +69,18 @@ async def draft_checks( release = await data.release(name=sql.release_name(project_name, release_version), _committee=True).demand( RuntimeError("Release not found") ) + other_releases = ( + await data.release(project_name=project_name, phase=sql.ReleasePhase.RELEASE) + .order_by(sql.Release.released) + .all() + ) + release_versions = sorted( + [v for v in other_releases], key=lambda v: util.version_sort_key(v.version), reverse=True + ) + release_version_sortable = util.version_sort_key(release_version) + previous_version = next( + (v for v in release_versions if util.version_sort_key(v.version) < release_version_sortable), None + ) for path in relative_paths: path_str = str(path) task_function: Callable[[str, sql.Release, str, str], Awaitable[list[sql.Task]]] | None = None @@ -94,6 +106,7 @@ async def draft_checks( "project_name": project_name, "version_name": release_version, "revision_number": revision_number, + "previous_release_version": previous_version.version if previous_version else None, "file_path": path_str, "asf_uid": asf_uid, }, diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py index 46ee194..b11d316 100644 --- a/atr/tasks/sbom.py +++ b/atr/tasks/sbom.py @@ -74,6 +74,10 @@ class FileArgs(schema.Strict): asf_uid: str | None = None +class ScoreArgs(FileArgs): + previous_release_version: str | None = schema.description("Previous release version") + + @checks.with_model(FileArgs) async def augment(args: FileArgs) -> results.Results | None: base_dir = util.get_unfinished_dir() / args.project_name / args.version_name / args.revision_number @@ -209,9 +213,12 @@ async def score_qs(args: FileArgs) -> results.Results | None: ) [email protected]_model(FileArgs) -async def score_tool(args: FileArgs) -> results.Results | None: [email protected]_model(ScoreArgs) +async def score_tool(args: ScoreArgs) -> results.Results | None: base_dir = util.get_unfinished_dir() / args.project_name / args.version_name / args.revision_number + previous_base_dir = None + if args.previous_release_version is not None: + previous_base_dir = util.get_finished_dir() / args.project_name / args.previous_release_version if not os.path.isdir(base_dir): raise SBOMScoringError("Revision directory does not exist", {"base_dir": str(base_dir)}) full_path = os.path.join(base_dir, args.file_path) @@ -226,12 +233,30 @@ async def score_tool(args: FileArgs) -> results.Results | None: license_warnings, license_errors = sbom.licenses.check(bundle.bom) vulnerabilities = sbom.osv.vulns_from_bundle(bundle) cli_errors = sbom.cyclonedx.validate_cli(bundle) + + prev_version = None + prev_license_warnings = None + prev_license_errors = None + prev_vulnerabilities = None + if previous_base_dir is not None: + previous_full_path = os.path.join(previous_base_dir, args.file_path) + try: + previous_bundle = sbom.utilities.path_to_bundle(pathlib.Path(previous_full_path)) + except FileNotFoundError: + # Previous release didn't include this file + previous_bundle = None + if previous_bundle is not None: + prev_version, _ = sbom.utilities.get_props_from_bundle(previous_bundle) + prev_license_warnings, prev_license_errors = sbom.licenses.check(previous_bundle.bom) + prev_vulnerabilities = sbom.osv.vulns_from_bundle(previous_bundle) + return results.SBOMToolScore( kind="sbom_tool_score", project_name=args.project_name, version_name=args.version_name, revision_number=args.revision_number, bom_version=version, + prev_bom_version=prev_version, file_path=args.file_path, warnings=[w.model_dump_json() for w in warnings], errors=[e.model_dump_json() for e in errors], @@ -239,6 +264,9 @@ async def score_tool(args: FileArgs) -> results.Results | None: license_warnings=[w.model_dump_json() for w in license_warnings] if license_warnings else None, license_errors=[e.model_dump_json() for e in license_errors] if license_errors else None, vulnerabilities=[v.model_dump_json() for v in vulnerabilities], + prev_license_warnings=[w.model_dump_json() for w in prev_license_warnings] if prev_license_warnings else None, + prev_license_errors=[e.model_dump_json() for e in prev_license_errors] if prev_license_errors else None, + prev_vulnerabilities=[v.model_dump_json() for v in prev_vulnerabilities] if prev_vulnerabilities else None, atr_props=properties, cli_errors=cli_errors, ) diff --git a/atr/util.py b/atr/util.py index 42ebea8..f037700 100644 --- a/atr/util.py +++ b/atr/util.py @@ -1011,6 +1011,53 @@ def version_name_error(version_name: str) -> str | None: return None +def version_sort_key(version: str) -> bytes: + """ + Convert a version string into a sortable byte sequence. + Prefixes each digit sequence with its length as u16 little-endian. + Strips leading zeros and appends a byte for the count of leading zeros. + """ + result = [] + i = 0 + length = len(version) + while i < length: + if version[i].isdigit(): + # Find the end of this digit sequence + j = i + while j < length and version[j].isdigit(): + j += 1 + + digit_sequence = version[i:j] + + # Count leading zeros + leading_zeros = 0 + for char in digit_sequence: + if char == "0": + leading_zeros += 1 + else: + break + + # Strip leading zeros (but keep at least one digit if all zeros) + stripped = digit_sequence.lstrip("0") or "0" + + # Count the stripped digits and encode as u16 little-endian + digit_count = len(stripped) + length_bytes = digit_count.to_bytes(2, "little") + + # Add length prefix + stripped digits + leading zero count + result.extend(length_bytes) + result.extend(stripped.encode("utf-8")) + result.append(leading_zeros) + + i = j + else: + # Non-digit character, just add it + result.append(ord(version[i])) + i += 1 + + return bytes(result) + + async def _create_hard_link_clone_checks( source_dir: pathlib.Path, dest_dir: pathlib.Path, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
