This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 3355ee0 Add an OSV scanning task
3355ee0 is described below
commit 3355ee0069c9ac310789368ae318963576bcff75
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Oct 17 19:10:30 2025 +0100
Add an OSV scanning task
---
atr/models/results.py | 18 +++++++++++++++++-
atr/models/sql.py | 1 +
atr/sbom/__init__.py | 3 ++-
atr/tasks/__init__.py | 2 ++
atr/tasks/sbom.py | 28 ++++++++++++++++++++++++++++
5 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/atr/models/results.py b/atr/models/results.py
index eb68fe4..9f68eae 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Annotated, Literal
+from typing import Annotated, Any, Literal
import pydantic
@@ -46,6 +46,21 @@ class SBOMGenerateCycloneDX(schema.Strict):
msg: str = schema.description("The message from the SBOM generation")
+class OSVComponent(schema.Strict):
+ purl: str = schema.description("Package URL")
+ vulnerabilities: list[dict[str, Any]] =
schema.description("Vulnerabilities found")
+
+
+class SBOMOSVScan(schema.Strict):
+ kind: Literal["sbom_osv_scan"] = schema.Field(alias="kind")
+ project_name: str = schema.description("Project name")
+ version_name: str = schema.description("Version name")
+ revision_number: str = schema.description("Revision number")
+ file_path: str = schema.description("Relative path to the scanned SBOM
file")
+ components: list[OSVComponent] = schema.description("Components with
vulnerabilities")
+ ignored_count: int = schema.description("Number of components ignored")
+
+
class SbomQsScore(schema.Strict):
category: str
feature: str
@@ -132,6 +147,7 @@ Results = Annotated[
| MessageSend
| SBOMAugment
| SBOMGenerateCycloneDX
+ | SBOMOSVScan
| SBOMQsScore
| SBOMToolScore
| SvnImportFiles
diff --git a/atr/models/sql.py b/atr/models/sql.py
index d3f25c3..f1f0742 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -183,6 +183,7 @@ class TaskType(str, enum.Enum):
RAT_CHECK = "rat_check"
SBOM_AUGMENT = "sbom_augment"
SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
+ SBOM_OSV_SCAN = "sbom_osv_scan"
SBOM_QS_SCORE = "sbom_qs_score"
SBOM_TOOL_SCORE = "sbom_tool_score"
SIGNATURE_CHECK = "signature_check"
diff --git a/atr/sbom/__init__.py b/atr/sbom/__init__.py
index c59f817..8326a4d 100644
--- a/atr/sbom/__init__.py
+++ b/atr/sbom/__init__.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from . import cli, conformance, constants, cyclonedx, licenses, maven, models,
sbomqs, spdx, utilities
+from . import cli, conformance, constants, cyclonedx, licenses, maven, models,
osv, sbomqs, spdx, utilities
__all__ = [
"cli",
@@ -27,6 +27,7 @@ __all__ = [
"licenses",
"maven",
"models",
+ "osv",
"sbomqs",
"spdx",
"utilities",
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index d2270a3..b7925fe 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -175,6 +175,8 @@ def resolve(task_type: sql.TaskType) -> Callable[...,
Awaitable[results.Results
return sbom.augment
case sql.TaskType.SBOM_GENERATE_CYCLONEDX:
return sbom.generate_cyclonedx
+ case sql.TaskType.SBOM_OSV_SCAN:
+ return sbom.osv_scan
case sql.TaskType.SBOM_QS_SCORE:
return sbom.score_qs
case sql.TaskType.SBOM_TOOL_SCORE:
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index f239534..6451f10 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -54,6 +54,12 @@ class SBOMGenerationError(Exception):
self.details = details or {}
+class SBOMScanningError(Exception):
+ """Custom exception for SBOM scanning failures."""
+
+ pass
+
+
class SBOMScoringError(Exception):
"""Raised on a failure to score an SBOM."""
@@ -125,6 +131,28 @@ async def generate_cyclonedx(args: GenerateCycloneDX) ->
results.Results | None:
raise
[email protected]_model(FileArgs)
+async def osv_scan(args: FileArgs) -> results.Results | None:
+ base_dir = util.get_unfinished_dir() / args.project_name /
args.version_name / args.revision_number
+ if not os.path.isdir(base_dir):
+ raise SBOMScanningError("Revision directory does not exist",
{"base_dir": str(base_dir)})
+ full_path = os.path.join(base_dir, args.file_path)
+ if not (full_path.endswith(".cdx.json") and os.path.isfile(full_path)):
+ raise SBOMScanningError("SBOM file does not exist", {"file_path":
args.file_path})
+ bundle = sbom.utilities.path_to_bundle(pathlib.Path(full_path))
+ vulnerabilities, ignored_count = await sbom.osv.scan_bundle(bundle)
+ components = [results.OSVComponent(purl=v.purl,
vulnerabilities=v.vulnerabilities) for v in vulnerabilities]
+ return results.SBOMOSVScan(
+ kind="sbom_osv_scan",
+ project_name=args.project_name,
+ version_name=args.version_name,
+ revision_number=args.revision_number,
+ file_path=args.file_path,
+ components=components,
+ ignored_count=ignored_count,
+ )
+
+
@checks.with_model(FileArgs)
async def score_qs(args: FileArgs) -> results.Results | None:
base_dir = util.get_unfinished_dir() / args.project_name /
args.version_name / args.revision_number
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]