This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch previous_sbom_results in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit c552114e0886cafe3b4d58a7f0606e50fa2ca76c Author: Alastair McFarlane <[email protected]> AuthorDate: Tue Dec 23 16:17:44 2025 +0000 Abstract task fetches --- atr/get/sbom.py | 116 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/atr/get/sbom.py b/atr/get/sbom.py index 67de48e..a90c2b2 100644 --- a/atr/get/sbom.py +++ b/atr/get/sbom.py @@ -45,6 +45,9 @@ if TYPE_CHECKING: async def report(session: web.Committer, project: str, version: str, file_path: str) -> str: await session.check_access(project) + block = htm.Block() + block.h1["SBOM report"] + # If the draft is not found, we try to get the release candidate try: release = await session.release(project, version, with_committee=True) @@ -52,9 +55,48 @@ async def report(session: web.Committer, project: str, version: str, file_path: release = await session.release(project, version, phase=sql.ReleasePhase.RELEASE_CANDIDATE, with_committee=True) is_release_candidate = release.phase == sql.ReleasePhase.RELEASE_CANDIDATE + task, augment_tasks, osv_tasks = await _fetch_tasks(file_path, project, release, version) + + task_status = await _report_task_results(block, task) + if task_status: + return task_status + + if task is None or (not isinstance(task.result, results.SBOMToolScore)): + raise base.ASFQuartException("Invalid SBOM score result", errorcode=500) + + task_result = task.result + _report_header(block, is_release_candidate, release, task_result) + + if not is_release_candidate: + latest_augment = None + last_augmented_bom = None + if len(augment_tasks) > 0: + latest_augment = augment_tasks[0] + augment_results: list[Any] = [t.result for t in augment_tasks] + augmented_bom_versions = [ + r.bom_version for r in augment_results if (r is not None) and (r.bom_version is not None) + ] + if len(augmented_bom_versions) > 0: + last_augmented_bom = max(augmented_bom_versions) + _augment_section(block, release, task_result, latest_augment, last_augmented_bom) + + _conformance_section(block, task_result) + _license_section(block, task_result) + + _vulnerability_scan_section(block, project, version, file_path, task_result, osv_tasks, is_release_candidate) + + _outdated_tool_section(block, task_result) + + _cyclonedx_cli_errors(block, task_result) + + return await template.blank("SBOM report", content=block.collect()) + + +async def _fetch_tasks( + file_path: str, project: str, release: sql.Release, version: str +) -> tuple[sql.Task | None, collections.abc.Sequence[sql.Task], collections.abc.Sequence[sql.Task]]: async with db.session() as data: via = sql.validate_instrumented_attribute - # TODO: Abstract this code and the sbomtool.MissingAdapter validators tasks = ( await data.task( project_name=project, @@ -88,43 +130,7 @@ async def report(session: web.Committer, project: str, version: str, file_path: .order_by(sql.sqlmodel.desc(via(sql.Task.added))) .all() ) - - block = htm.Block() - block.h1["SBOM report"] - - task_status = await _report_task_results(block, list(tasks)) - if task_status: - return task_status - - task_result = tasks[0].result - if not isinstance(task_result, results.SBOMToolScore): - raise base.ASFQuartException("Invalid SBOM score result", errorcode=500) - - _report_header(block, is_release_candidate, release, task_result) - - if not is_release_candidate: - latest_augment = None - last_augmented_bom = None - if len(augment_tasks) > 0: - latest_augment = augment_tasks[0] - augment_results: list[Any] = [t.result for t in augment_tasks] - augmented_bom_versions = [ - r.bom_version for r in augment_results if (r is not None) and (r.bom_version is not None) - ] - if len(augmented_bom_versions) > 0: - last_augmented_bom = max(augmented_bom_versions) - _augment_section(block, release, task_result, latest_augment, last_augmented_bom) - - _conformance_section(block, task_result) - _license_section(block, task_result) - - _vulnerability_scan_section(block, project, version, file_path, task_result, osv_tasks, is_release_candidate) - - _outdated_tool_section(block, task_result) - - _cyclonedx_cli_errors(block, task_result) - - return await template.blank("SBOM report", content=block.collect()) + return tasks[0] if len(tasks) > 0 else None, augment_tasks, osv_tasks def _outdated_tool_section(block: htm.Block, task_result: results.SBOMToolScore): @@ -194,19 +200,9 @@ def _license_section(block: htm.Block, task_result: results.SBOMToolScore) -> No prev_warnings = [] prev_errors = [] if task_result.license_warnings is not None: - warnings = [sbom.models.licenses.Issue.model_validate(json.loads(w)) for w in task_result.license_warnings] - prev_warnings = ( - [sbom.models.licenses.Issue.model_validate(json.loads(w)) for w in task_result.prev_license_warnings] - if task_result.prev_license_warnings is not None - else [] - ) + prev_warnings, warnings = _load_license_issues(task_result.license_warnings, task_result.prev_license_warnings) if task_result.license_errors is not None: - errors = [sbom.models.licenses.Issue.model_validate(json.loads(e)) for e in task_result.license_errors] - prev_errors = ( - [sbom.models.licenses.Issue.model_validate(json.loads(e)) for e in task_result.prev_license_errors] - if task_result.prev_license_errors is not None - else [] - ) + errors, prev_errors = _load_license_issues(task_result.license_errors, task_result.prev_license_errors) if warnings: block.h3["Warnings"] _license_table(block, warnings, prev_warnings) @@ -219,6 +215,18 @@ def _license_section(block: htm.Block, task_result: results.SBOMToolScore) -> No block.p["No license warnings or errors found."] +def _load_license_issues( + issues: list[str], prev_issues: list[str] | None +) -> tuple[list[sbom.models.licenses.Issue], list[sbom.models.licenses.Issue]]: + iss = [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in issues] + prev = ( + [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in prev_issues] + if prev_issues is not None + else [] + ) + return iss, prev + + def _report_header( block: htm.Block, is_release_candidate: bool, release: sql.Release, task_result: results.SBOMToolScore ) -> None: @@ -235,13 +243,13 @@ def _report_header( block.p[f"This report is for the latest {release.version} release candidate."] -async def _report_task_results(block: htm.Block, tasks: list[sql.Task]): - if not tasks: +async def _report_task_results(block: htm.Block, task: sql.Task | None): + if task is None: block.p["No SBOM score found."] return await template.blank("SBOM report", content=block.collect()) - task_status = tasks[0].status - task_error = tasks[0].error + task_status = task.status + task_error = task.error if task_status == sql.TaskStatus.QUEUED or task_status == sql.TaskStatus.ACTIVE: block.p["SBOM score is being computed."] return await template.blank("SBOM report", content=block.collect()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
