This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new 3355ee0  Add an OSV scanning task
3355ee0 is described below

commit 3355ee0069c9ac310789368ae318963576bcff75
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Oct 17 19:10:30 2025 +0100

    Add an OSV scanning task
---
 atr/models/results.py | 18 +++++++++++++++++-
 atr/models/sql.py     |  1 +
 atr/sbom/__init__.py  |  3 ++-
 atr/tasks/__init__.py |  2 ++
 atr/tasks/sbom.py     | 28 ++++++++++++++++++++++++++++
 5 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/atr/models/results.py b/atr/models/results.py
index eb68fe4..9f68eae 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Annotated, Literal
+from typing import Annotated, Any, Literal
 
 import pydantic
 
@@ -46,6 +46,21 @@ class SBOMGenerateCycloneDX(schema.Strict):
     msg: str = schema.description("The message from the SBOM generation")
 
 
+class OSVComponent(schema.Strict):
+    purl: str = schema.description("Package URL")
+    vulnerabilities: list[dict[str, Any]] = 
schema.description("Vulnerabilities found")
+
+
+class SBOMOSVScan(schema.Strict):
+    kind: Literal["sbom_osv_scan"] = schema.Field(alias="kind")
+    project_name: str = schema.description("Project name")
+    version_name: str = schema.description("Version name")
+    revision_number: str = schema.description("Revision number")
+    file_path: str = schema.description("Relative path to the scanned SBOM 
file")
+    components: list[OSVComponent] = schema.description("Components with 
vulnerabilities")
+    ignored_count: int = schema.description("Number of components ignored")
+
+
 class SbomQsScore(schema.Strict):
     category: str
     feature: str
@@ -132,6 +147,7 @@ Results = Annotated[
     | MessageSend
     | SBOMAugment
     | SBOMGenerateCycloneDX
+    | SBOMOSVScan
     | SBOMQsScore
     | SBOMToolScore
     | SvnImportFiles
diff --git a/atr/models/sql.py b/atr/models/sql.py
index d3f25c3..f1f0742 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -183,6 +183,7 @@ class TaskType(str, enum.Enum):
     RAT_CHECK = "rat_check"
     SBOM_AUGMENT = "sbom_augment"
     SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
+    SBOM_OSV_SCAN = "sbom_osv_scan"
     SBOM_QS_SCORE = "sbom_qs_score"
     SBOM_TOOL_SCORE = "sbom_tool_score"
     SIGNATURE_CHECK = "signature_check"
diff --git a/atr/sbom/__init__.py b/atr/sbom/__init__.py
index c59f817..8326a4d 100644
--- a/atr/sbom/__init__.py
+++ b/atr/sbom/__init__.py
@@ -17,7 +17,7 @@
 
 from __future__ import annotations
 
-from . import cli, conformance, constants, cyclonedx, licenses, maven, models, 
sbomqs, spdx, utilities
+from . import cli, conformance, constants, cyclonedx, licenses, maven, models, 
osv, sbomqs, spdx, utilities
 
 __all__ = [
     "cli",
@@ -27,6 +27,7 @@ __all__ = [
     "licenses",
     "maven",
     "models",
+    "osv",
     "sbomqs",
     "spdx",
     "utilities",
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index d2270a3..b7925fe 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -175,6 +175,8 @@ def resolve(task_type: sql.TaskType) -> Callable[..., 
Awaitable[results.Results
             return sbom.augment
         case sql.TaskType.SBOM_GENERATE_CYCLONEDX:
             return sbom.generate_cyclonedx
+        case sql.TaskType.SBOM_OSV_SCAN:
+            return sbom.osv_scan
         case sql.TaskType.SBOM_QS_SCORE:
             return sbom.score_qs
         case sql.TaskType.SBOM_TOOL_SCORE:
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index f239534..6451f10 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -54,6 +54,12 @@ class SBOMGenerationError(Exception):
         self.details = details or {}
 
 
+class SBOMScanningError(Exception):
+    """Custom exception for SBOM scanning failures."""
+
+    pass
+
+
 class SBOMScoringError(Exception):
     """Raised on a failure to score an SBOM."""
 
@@ -125,6 +131,28 @@ async def generate_cyclonedx(args: GenerateCycloneDX) -> 
results.Results | None:
         raise
 
 
[email protected]_model(FileArgs)
+async def osv_scan(args: FileArgs) -> results.Results | None:
+    base_dir = util.get_unfinished_dir() / args.project_name / 
args.version_name / args.revision_number
+    if not os.path.isdir(base_dir):
+        raise SBOMScanningError("Revision directory does not exist", 
{"base_dir": str(base_dir)})
+    full_path = os.path.join(base_dir, args.file_path)
+    if not (full_path.endswith(".cdx.json") and os.path.isfile(full_path)):
+        raise SBOMScanningError("SBOM file does not exist", {"file_path": 
args.file_path})
+    bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path))
+    vulnerabilities, ignored_count = await sbom.osv.scan_bundle(bundle)
+    components = [results.OSVComponent(purl=v.purl, 
vulnerabilities=v.vulnerabilities) for v in vulnerabilities]
+    return results.SBOMOSVScan(
+        kind="sbom_osv_scan",
+        project_name=args.project_name,
+        version_name=args.version_name,
+        revision_number=args.revision_number,
+        file_path=args.file_path,
+        components=components,
+        ignored_count=ignored_count,
+    )
+
+
 @checks.with_model(FileArgs)
 async def score_qs(args: FileArgs) -> results.Results | None:
     base_dir = util.get_unfinished_dir() / args.project_name / 
args.version_name / args.revision_number


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to