This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch previous_sbom_results
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit c552114e0886cafe3b4d58a7f0606e50fa2ca76c
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Dec 23 16:17:44 2025 +0000

    Abstract task fetches
---
 atr/get/sbom.py | 116 ++++++++++++++++++++++++++++++--------------------------
 1 file changed, 62 insertions(+), 54 deletions(-)

diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index 67de48e..a90c2b2 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -45,6 +45,9 @@ if TYPE_CHECKING:
 async def report(session: web.Committer, project: str, version: str, 
file_path: str) -> str:
     await session.check_access(project)
 
+    block = htm.Block()
+    block.h1["SBOM report"]
+
     # If the draft is not found, we try to get the release candidate
     try:
         release = await session.release(project, version, with_committee=True)
@@ -52,9 +55,48 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
         release = await session.release(project, version, 
phase=sql.ReleasePhase.RELEASE_CANDIDATE, with_committee=True)
     is_release_candidate = release.phase == sql.ReleasePhase.RELEASE_CANDIDATE
 
+    task, augment_tasks, osv_tasks = await _fetch_tasks(file_path, project, 
release, version)
+
+    task_status = await _report_task_results(block, task)
+    if task_status:
+        return task_status
+
+    if task is None or (not isinstance(task.result, results.SBOMToolScore)):
+        raise base.ASFQuartException("Invalid SBOM score result", 
errorcode=500)
+
+    task_result = task.result
+    _report_header(block, is_release_candidate, release, task_result)
+
+    if not is_release_candidate:
+        latest_augment = None
+        last_augmented_bom = None
+        if len(augment_tasks) > 0:
+            latest_augment = augment_tasks[0]
+            augment_results: list[Any] = [t.result for t in augment_tasks]
+            augmented_bom_versions = [
+                r.bom_version for r in augment_results if (r is not None) and 
(r.bom_version is not None)
+            ]
+            if len(augmented_bom_versions) > 0:
+                last_augmented_bom = max(augmented_bom_versions)
+        _augment_section(block, release, task_result, latest_augment, 
last_augmented_bom)
+
+    _conformance_section(block, task_result)
+    _license_section(block, task_result)
+
+    _vulnerability_scan_section(block, project, version, file_path, 
task_result, osv_tasks, is_release_candidate)
+
+    _outdated_tool_section(block, task_result)
+
+    _cyclonedx_cli_errors(block, task_result)
+
+    return await template.blank("SBOM report", content=block.collect())
+
+
+async def _fetch_tasks(
+    file_path: str, project: str, release: sql.Release, version: str
+) -> tuple[sql.Task | None, collections.abc.Sequence[sql.Task], 
collections.abc.Sequence[sql.Task]]:
     async with db.session() as data:
         via = sql.validate_instrumented_attribute
-        # TODO: Abstract this code and the sbomtool.MissingAdapter validators
         tasks = (
             await data.task(
                 project_name=project,
@@ -88,43 +130,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
             .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
             .all()
         )
-
-    block = htm.Block()
-    block.h1["SBOM report"]
-
-    task_status = await _report_task_results(block, list(tasks))
-    if task_status:
-        return task_status
-
-    task_result = tasks[0].result
-    if not isinstance(task_result, results.SBOMToolScore):
-        raise base.ASFQuartException("Invalid SBOM score result", 
errorcode=500)
-
-    _report_header(block, is_release_candidate, release, task_result)
-
-    if not is_release_candidate:
-        latest_augment = None
-        last_augmented_bom = None
-        if len(augment_tasks) > 0:
-            latest_augment = augment_tasks[0]
-            augment_results: list[Any] = [t.result for t in augment_tasks]
-            augmented_bom_versions = [
-                r.bom_version for r in augment_results if (r is not None) and 
(r.bom_version is not None)
-            ]
-            if len(augmented_bom_versions) > 0:
-                last_augmented_bom = max(augmented_bom_versions)
-        _augment_section(block, release, task_result, latest_augment, 
last_augmented_bom)
-
-    _conformance_section(block, task_result)
-    _license_section(block, task_result)
-
-    _vulnerability_scan_section(block, project, version, file_path, 
task_result, osv_tasks, is_release_candidate)
-
-    _outdated_tool_section(block, task_result)
-
-    _cyclonedx_cli_errors(block, task_result)
-
-    return await template.blank("SBOM report", content=block.collect())
+        return tasks[0] if len(tasks) > 0 else None, augment_tasks, osv_tasks
 
 
 def _outdated_tool_section(block: htm.Block, task_result: 
results.SBOMToolScore):
@@ -194,19 +200,9 @@ def _license_section(block: htm.Block, task_result: 
results.SBOMToolScore) -> No
     prev_warnings = []
     prev_errors = []
     if task_result.license_warnings is not None:
-        warnings = [sbom.models.licenses.Issue.model_validate(json.loads(w)) 
for w in task_result.license_warnings]
-        prev_warnings = (
-            [sbom.models.licenses.Issue.model_validate(json.loads(w)) for w in 
task_result.prev_license_warnings]
-            if task_result.prev_license_warnings is not None
-            else []
-        )
+        prev_warnings, warnings = 
_load_license_issues(task_result.license_warnings, 
task_result.prev_license_warnings)
     if task_result.license_errors is not None:
-        errors = [sbom.models.licenses.Issue.model_validate(json.loads(e)) for 
e in task_result.license_errors]
-        prev_errors = (
-            [sbom.models.licenses.Issue.model_validate(json.loads(e)) for e in 
task_result.prev_license_errors]
-            if task_result.prev_license_errors is not None
-            else []
-        )
+        errors, prev_errors = _load_license_issues(task_result.license_errors, 
task_result.prev_license_errors)
     if warnings:
         block.h3["Warnings"]
         _license_table(block, warnings, prev_warnings)
@@ -219,6 +215,18 @@ def _license_section(block: htm.Block, task_result: 
results.SBOMToolScore) -> No
         block.p["No license warnings or errors found."]
 
 
+def _load_license_issues(
+    issues: list[str], prev_issues: list[str] | None
+) -> tuple[list[sbom.models.licenses.Issue], list[sbom.models.licenses.Issue]]:
+    iss = [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in 
issues]
+    prev = (
+        [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in 
prev_issues]
+        if prev_issues is not None
+        else []
+    )
+    return iss, prev
+
+
 def _report_header(
     block: htm.Block, is_release_candidate: bool, release: sql.Release, 
task_result: results.SBOMToolScore
 ) -> None:
@@ -235,13 +243,13 @@ def _report_header(
         block.p[f"This report is for the latest {release.version} release 
candidate."]
 
 
-async def _report_task_results(block: htm.Block, tasks: list[sql.Task]):
-    if not tasks:
+async def _report_task_results(block: htm.Block, task: sql.Task | None):
+    if task is None:
         block.p["No SBOM score found."]
         return await template.blank("SBOM report", content=block.collect())
 
-    task_status = tasks[0].status
-    task_error = tasks[0].error
+    task_status = task.status
+    task_error = task.error
     if task_status == sql.TaskStatus.QUEUED or task_status == 
sql.TaskStatus.ACTIVE:
         block.p["SBOM score is being computed."]
         return await template.blank("SBOM report", content=block.collect())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to