This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new fe92fad Add a rough draft of a module for zip format checks
fe92fad is described below
commit fe92fad63f79579ec0d4d8becf321d9d654a9c26
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Apr 7 15:26:51 2025 +0100
Add a rough draft of a module for zip format checks
---
atr/db/models.py | 6 +-
atr/tasks/__init__.py | 57 ++++++-
atr/tasks/checks/__init__.py | 5 +
atr/tasks/checks/license.py | 152 ++++++++---------
atr/tasks/checks/zipformat.py | 375 ++++++++++++++++++++++++++++++++++++++++++
5 files changed, 517 insertions(+), 78 deletions(-)
diff --git a/atr/db/models.py b/atr/db/models.py
index 5612438..6bfe988 100644
--- a/atr/db/models.py
+++ b/atr/db/models.py
@@ -294,9 +294,13 @@ class TaskType(str, enum.Enum):
PATHS_CHECK = "paths_check"
RAT_CHECK = "rat_check"
RSYNC_ANALYSE = "rsync_analyse"
+ SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
SIGNATURE_CHECK = "signature_check"
VOTE_INITIATE = "vote_initiate"
- SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
+ ZIPFORMAT_INTEGRITY = "zipformat_integrity"
+ ZIPFORMAT_LICENSE_FILES = "zipformat_license_files"
+ ZIPFORMAT_LICENSE_HEADERS = "zipformat_license_headers"
+ ZIPFORMAT_STRUCTURE = "zipformat_structure"
class Task(sqlmodel.SQLModel, table=True):
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 419343f..b42294d 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -23,12 +23,14 @@ import aiofiles.os
import atr.db as db
import atr.db.models as models
+import atr.tasks.checks as checks
import atr.tasks.checks.archive as archive
import atr.tasks.checks.hashing as hashing
import atr.tasks.checks.license as license
import atr.tasks.checks.paths as paths
import atr.tasks.checks.rat as rat
import atr.tasks.checks.signature as signature
+import atr.tasks.checks.zipformat as zipformat
import atr.tasks.rsync as rsync
import atr.tasks.sbom as sbom
import atr.tasks.vote as vote
@@ -115,12 +117,20 @@ def resolve(task_type: models.TaskType) -> Callable[...,
Awaitable[str | None]]:
return rat.check
case models.TaskType.RSYNC_ANALYSE:
return rsync.analyse
+ case models.TaskType.SBOM_GENERATE_CYCLONEDX:
+ return sbom.generate_cyclonedx
case models.TaskType.SIGNATURE_CHECK:
return signature.check
case models.TaskType.VOTE_INITIATE:
return vote.initiate
- case models.TaskType.SBOM_GENERATE_CYCLONEDX:
- return sbom.generate_cyclonedx
+ case models.TaskType.ZIPFORMAT_INTEGRITY:
+ return zipformat.integrity
+ case models.TaskType.ZIPFORMAT_STRUCTURE:
+ return zipformat.structure
+ case models.TaskType.ZIPFORMAT_LICENSE_FILES:
+ return zipformat.license_files
+ case models.TaskType.ZIPFORMAT_LICENSE_HEADERS:
+ return zipformat.license_headers
# NOTE: Do NOT add "case _" here
# Otherwise we lose exhaustiveness checking
@@ -212,10 +222,53 @@ async def tar_gz_checks(release: models.Release, path:
str) -> list[models.Task]
return tasks
+async def zip_checks(release: models.Release, path: str) -> list[models.Task]:
+ """Create check tasks for a .zip file."""
+ full_path = str(util.get_release_candidate_draft_dir() /
release.project.name / release.version / path)
+ modified = int(await aiofiles.os.path.getmtime(full_path))
+
+ tasks = [
+ models.Task(
+ status=models.TaskStatus.QUEUED,
+ task_type=models.TaskType.ZIPFORMAT_INTEGRITY,
+ task_args=checks.ReleaseAndAbsPath(release_name=release.name,
abs_path=full_path).model_dump(),
+ release_name=release.name,
+ path=path,
+ modified=modified,
+ ),
+ models.Task(
+ status=models.TaskStatus.QUEUED,
+ task_type=models.TaskType.ZIPFORMAT_LICENSE_FILES,
+ task_args=checks.ReleaseAndAbsPath(release_name=release.name,
abs_path=full_path).model_dump(),
+ release_name=release.name,
+ path=path,
+ modified=modified,
+ ),
+ models.Task(
+ status=models.TaskStatus.QUEUED,
+ task_type=models.TaskType.ZIPFORMAT_LICENSE_HEADERS,
+ task_args=checks.ReleaseAndAbsPath(release_name=release.name,
abs_path=full_path).model_dump(),
+ release_name=release.name,
+ path=path,
+ modified=modified,
+ ),
+ models.Task(
+ status=models.TaskStatus.QUEUED,
+ task_type=models.TaskType.ZIPFORMAT_STRUCTURE,
+ task_args=checks.ReleaseAndAbsPath(release_name=release.name,
abs_path=full_path).model_dump(),
+ release_name=release.name,
+ path=path,
+ modified=modified,
+ ),
+ ]
+ return tasks
+
+
TASK_FUNCTIONS: Final[dict[str, Callable[..., Coroutine[Any, Any,
list[models.Task]]]]] = {
".asc": asc_checks,
".sha256": sha_checks,
".sha512": sha_checks,
".tar.gz": tar_gz_checks,
".tgz": tar_gz_checks,
+ ".zip": zip_checks,
}
diff --git a/atr/tasks/checks/__init__.py b/atr/tasks/checks/__init__.py
index 921d1c4..eff9973 100644
--- a/atr/tasks/checks/__init__.py
+++ b/atr/tasks/checks/__init__.py
@@ -153,3 +153,8 @@ def with_model(model_class: type[T]) ->
Callable[[Callable[..., Awaitable[R]]],
return wrapper
return decorator
+
+
+class ReleaseAndAbsPath(pydantic.BaseModel):
+ release_name: str = pydantic.Field(..., description="Release name")
+ abs_path: str = pydantic.Field(..., description="Absolute path to the file
to check")
diff --git a/atr/tasks/checks/license.py b/atr/tasks/checks/license.py
index f050a15..ec96628 100644
--- a/atr/tasks/checks/license.py
+++ b/atr/tasks/checks/license.py
@@ -30,9 +30,30 @@ import atr.tasks.checks.archive as archive
_LOGGER = logging.getLogger(__name__)
+
+# Constant that must be present in the Apache License header
+APACHE_LICENSE_HEADER: Final[bytes] = b"""\
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License."""
+
+
# File type comment style definitions
# Ordered by their popularity in the Stack Overflow Developer Survey 2024
-_COMMENT_STYLES: Final[dict[str, dict[str, str]]] = {
+COMMENT_STYLES: Final[dict[str, dict[str, str]]] = {
# JavaScript and variants
"js": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
"mjs": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
@@ -121,7 +142,7 @@ _COMMENT_STYLES: Final[dict[str, dict[str, str]]] = {
# Patterns for files to include in license header checks
# Ordered by their popularity in the Stack Overflow Developer Survey 2024
-_INCLUDED_PATTERNS: Final[list[str]] = [
+INCLUDED_PATTERNS: Final[list[str]] = [
r"\.(js|mjs|cjs|jsx)$", # JavaScript
r"\.py$", # Python
r"\.(sql|ddl|dml)$", # SQL
@@ -150,25 +171,6 @@ _INCLUDED_PATTERNS: Final[list[str]] = [
r"\.(pl|pm|t)$", # Perl
]
-# Constant that must be present in the Apache License header
-_APACHE_LICENSE_HEADER: Final[bytes] = b"""\
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License."""
-
# Tasks
@@ -236,6 +238,54 @@ async def headers(args: Headers) -> str | None:
return None
+def strip_comments(content: bytes, file_ext: str) -> bytes:
+ """Strip comment prefixes from the content based on the file extension."""
+ if file_ext not in COMMENT_STYLES:
+ return content
+
+ comment_style = COMMENT_STYLES[file_ext]
+ lines = content.split(b"\n")
+ cleaned_lines = []
+
+ # Get comment markers as bytes
+ multi_start = comment_style.get("multi_start", "").encode()
+ multi_end = comment_style.get("multi_end", "").encode()
+ single = comment_style.get("single", "").encode()
+
+ # State tracking
+ in_multiline = False
+ is_c_style = (multi_start == b"/*") and (multi_end == b"*/")
+
+ for line in lines:
+ line = line.strip()
+
+ # Handle start of multi-line comment
+ if not in_multiline and multi_start and multi_start in line:
+ # Get content after multi-start
+ line = line[line.find(multi_start) + len(multi_start) :].strip()
+ in_multiline = True
+
+ # Handle end of multi-line comment
+ elif in_multiline and multi_end and multi_end in line:
+ # Get content before multi-end
+ line = line[: line.find(multi_end)].strip()
+ in_multiline = False
+
+ # Handle single-line comments
+ elif not in_multiline and single and line.startswith(single):
+ line = line[len(single) :].strip()
+
+ # For C style comments, strip leading asterisk if present
+ elif is_c_style and in_multiline and line.startswith(b"*"):
+ line = line[1:].strip()
+
+ # Only add non-empty lines
+ if line:
+ cleaned_lines.append(line)
+
+ return b"\n".join(cleaned_lines)
+
+
# File helpers
@@ -425,7 +475,7 @@ def _headers_check_core_logic_process_file(
# Allow for some extra content at the start of the file
# That may be shebangs, encoding declarations, etc.
- content = f.read(len(_APACHE_LICENSE_HEADER) + 512)
+ content = f.read(len(APACHE_LICENSE_HEADER) + 512)
is_valid, error = _headers_validate(content, member.name)
if is_valid:
return True, {"valid": True}
@@ -442,11 +492,11 @@ def _headers_check_core_logic_should_check(filepath: str)
-> bool:
return False
# First check if we have comment style definitions for this extension
- if ext not in _COMMENT_STYLES:
+ if ext not in COMMENT_STYLES:
return False
# Then check if the file matches any of our included patterns
- for pattern in _INCLUDED_PATTERNS:
+ for pattern in INCLUDED_PATTERNS:
if re.search(pattern, filepath, re.IGNORECASE):
return True
@@ -461,66 +511,18 @@ def _get_file_extension(filename: str) -> str | None:
return ext[1:].lower()
-def _strip_comments(content: bytes, file_ext: str) -> bytes:
- """Strip comment prefixes from the content based on the file extension."""
- if file_ext not in _COMMENT_STYLES:
- return content
-
- comment_style = _COMMENT_STYLES[file_ext]
- lines = content.split(b"\n")
- cleaned_lines = []
-
- # Get comment markers as bytes
- multi_start = comment_style.get("multi_start", "").encode()
- multi_end = comment_style.get("multi_end", "").encode()
- single = comment_style.get("single", "").encode()
-
- # State tracking
- in_multiline = False
- is_c_style = (multi_start == b"/*") and (multi_end == b"*/")
-
- for line in lines:
- line = line.strip()
-
- # Handle start of multi-line comment
- if not in_multiline and multi_start and multi_start in line:
- # Get content after multi-start
- line = line[line.find(multi_start) + len(multi_start) :].strip()
- in_multiline = True
-
- # Handle end of multi-line comment
- elif in_multiline and multi_end and multi_end in line:
- # Get content before multi-end
- line = line[: line.find(multi_end)].strip()
- in_multiline = False
-
- # Handle single-line comments
- elif not in_multiline and single and line.startswith(single):
- line = line[len(single) :].strip()
-
- # For C style comments, strip leading asterisk if present
- elif is_c_style and in_multiline and line.startswith(b"*"):
- line = line[1:].strip()
-
- # Only add non-empty lines
- if line:
- cleaned_lines.append(line)
-
- return b"\n".join(cleaned_lines)
-
-
def _headers_validate(content: bytes, filename: str) -> tuple[bool, str |
None]:
"""Validate that the content contains the Apache License header after
removing comments."""
# Get the file extension from the filename
file_ext = _get_file_extension(filename)
- if not file_ext or file_ext not in _COMMENT_STYLES:
+ if not file_ext or file_ext not in COMMENT_STYLES:
return False, "Could not determine file type from extension"
# Strip comments, removing empty lines in the process
- cleaned_header = _strip_comments(content, file_ext)
+ cleaned_header = strip_comments(content, file_ext)
# Normalise the expected header in the same way as directly above
- expected_lines = [line.strip() for line in
_APACHE_LICENSE_HEADER.split(b"\n")]
+ expected_lines = [line.strip() for line in
APACHE_LICENSE_HEADER.split(b"\n")]
expected_lines = [line for line in expected_lines if line]
expected_header = b"\n".join(expected_lines)
diff --git a/atr/tasks/checks/zipformat.py b/atr/tasks/checks/zipformat.py
new file mode 100644
index 0000000..b47c9ee
--- /dev/null
+++ b/atr/tasks/checks/zipformat.py
@@ -0,0 +1,375 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import logging
+import os
+import zipfile
+from typing import Any
+
+import atr.tasks.checks as checks
+import atr.tasks.checks.license as license
+
+_LOGGER = logging.getLogger(__name__)
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def integrity(args: checks.ReleaseAndAbsPath) -> str | None:
+ """Check that the zip archive is not corrupted and can be opened."""
+ rel_path = checks.rel_path(args.abs_path)
+ check_instance = await checks.Check.create(checker=integrity,
release_name=args.release_name, path=rel_path)
+ _LOGGER.info(f"Checking zip integrity for {args.abs_path} (rel:
{rel_path})")
+
+ try:
+ result_data = await asyncio.to_thread(_integrity_check_core_logic,
args.abs_path)
+ if result_data.get("error"):
+ await check_instance.failure(result_data["error"], result_data)
+ else:
+ await check_instance.success(
+ f"Zip archive integrity OK ({result_data['member_count']}
members)", result_data
+ )
+ except Exception as e:
+ await check_instance.exception("Error checking zip integrity",
{"error": str(e)})
+
+ return None
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def license_files(args: checks.ReleaseAndAbsPath) -> str | None:
+ """Check that the LICENSE and NOTICE files exist and are valid within the
zip."""
+ rel_path = checks.rel_path(args.abs_path)
+ check_instance = await checks.Check.create(checker=license_files,
release_name=args.release_name, path=rel_path)
+ _LOGGER.info(f"Checking zip license files for {args.abs_path} (rel:
{rel_path})")
+
+ try:
+ result_data = await
asyncio.to_thread(_license_files_check_core_logic_zip, args.abs_path)
+
+ if result_data.get("error"):
+ await check_instance.failure(result_data["error"], result_data)
+ elif result_data.get("license_valid") and
result_data.get("notice_valid"):
+ await check_instance.success("LICENSE and NOTICE files present and
valid in zip", result_data)
+ else:
+ issues = []
+ if not result_data.get("license_found"):
+ issues.append("LICENSE missing")
+ elif not result_data.get("license_valid"):
+ issues.append("LICENSE invalid or empty")
+ if not result_data.get("notice_found"):
+ issues.append("NOTICE missing")
+ elif not result_data.get("notice_valid"):
+ issues.append("NOTICE invalid or empty")
+ issue_str = ", ".join(issues) if issues else "Issues found with
LICENSE or NOTICE files"
+ await check_instance.failure(issue_str, result_data)
+
+ except Exception as e:
+ await check_instance.exception("Error checking zip license files",
{"error": str(e)})
+
+ return None
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def license_headers(args: checks.ReleaseAndAbsPath) -> str | None:
+ """Check that all source files within the zip have valid license
headers."""
+ rel_path = checks.rel_path(args.abs_path)
+ check_instance = await checks.Check.create(checker=license_headers,
release_name=args.release_name, path=rel_path)
+ _LOGGER.info(f"Checking zip license headers for {args.abs_path} (rel:
{rel_path})")
+
+ try:
+ result_data = await
asyncio.to_thread(_license_headers_check_core_logic_zip, args.abs_path)
+
+ if result_data.get("error_message"):
+ await check_instance.failure(result_data["error_message"],
result_data)
+ elif not result_data.get("valid"):
+ num_issues = len(result_data.get("files_without_headers", []))
+ failure_msg = f"{num_issues} file(s) missing or having invalid
license headers"
+ await check_instance.failure(failure_msg, result_data)
+ else:
+ await check_instance.success(
+ f"License headers OK ({result_data.get('files_checked', 0)}
files checked)", result_data
+ )
+
+ except Exception as e:
+ await check_instance.exception("Error checking zip license headers",
{"error": str(e)})
+
+ return None
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def structure(args: checks.ReleaseAndAbsPath) -> str | None:
+ """Check that the zip archive has a single root directory matching the
artifact name."""
+ rel_path = checks.rel_path(args.abs_path)
+ check_instance = await checks.Check.create(checker=structure,
release_name=args.release_name, path=rel_path)
+ _LOGGER.info(f"Checking zip structure for {args.abs_path} (rel:
{rel_path})")
+
+ try:
+ result_data = await asyncio.to_thread(_structure_check_core_logic,
args.abs_path)
+ if result_data.get("error"):
+ await check_instance.failure(result_data["error"], result_data)
+ else:
+ await check_instance.success(f"Zip structure OK (root:
{result_data['root_dir']})", result_data)
+ except Exception as e:
+ await check_instance.exception("Error checking zip structure",
{"error": str(e)})
+
+ return None
+
+
+def _integrity_check_core_logic(artifact_path: str) -> dict[str, Any]:
+ """Verify that a zip file can be opened and its members listed."""
+ try:
+ with zipfile.ZipFile(artifact_path, "r") as zf:
+ # This is a simple check using list members
+ # We can use zf.testzip() for CRC checks if needed, though this
will be slower
+ member_list = zf.namelist()
+ return {"member_count": len(member_list)}
+ except zipfile.BadZipFile as e:
+ return {"error": f"Bad zip file: {e}"}
+ except FileNotFoundError:
+ return {"error": "File not found"}
+ except Exception as e:
+ return {"error": f"Unexpected error: {e}"}
+
+
+def _license_files_check_file_zip(zf: zipfile.ZipFile, artifact_path: str,
expected_path: str) -> tuple[bool, bool]:
+ """Check for the presence and basic validity of a specific file in a
zip."""
+ found = False
+ valid = False
+ try:
+ with zf.open(expected_path) as file_handle:
+ found = True
+ content = file_handle.read().strip()
+ if content:
+ # TODO: Add more specific NOTICE checks if needed
+ valid = True
+ except KeyError:
+ # File not found in zip
+ ...
+ except Exception as e:
+ filename = os.path.basename(expected_path)
+ _LOGGER.warning(f"Error reading {filename} in zip {artifact_path}:
{e}")
+ return found, valid
+
+
+def _license_files_check_core_logic_zip(artifact_path: str) -> dict[str, Any]:
+ """Verify LICENSE and NOTICE files within a zip archive."""
+ # TODO: Obviously we want to reuse the license files check logic from
license.py
+ # But we'd need to have task dependencies to do that, ideally
+ try:
+ with zipfile.ZipFile(artifact_path, "r") as zf:
+ members = zf.namelist()
+ if not members:
+ return {"error": "Archive is empty"}
+
+ root_dir = _license_files_find_root_dir_zip(members)
+ if not root_dir:
+ return {"error": "Could not determine root directory"}
+
+ expected_license_path = root_dir + "LICENSE"
+ expected_notice_path = root_dir + "NOTICE"
+
+ member_set = set(members)
+
+ license_found, license_valid = (
+ _license_files_check_file_zip(zf, artifact_path,
expected_license_path)
+ if (expected_license_path in member_set)
+ else (False, False)
+ )
+ notice_found, notice_valid = (
+ _license_files_check_file_zip(zf, artifact_path,
expected_notice_path)
+ if (expected_notice_path in member_set)
+ else (False, False)
+ )
+
+ return {
+ "root_dir": root_dir,
+ "license_found": license_found,
+ "license_valid": license_valid,
+ "notice_found": notice_found,
+ "notice_valid": notice_valid,
+ }
+
+ except zipfile.BadZipFile as e:
+ return {"error": f"Bad zip file: {e}"}
+ except FileNotFoundError:
+ return {"error": "File not found"}
+ except Exception as e:
+ return {"error": f"Unexpected error: {e}"}
+
+
+def _license_files_find_root_dir_zip(members: list[str]) -> str | None:
+ """Find the root directory in a list of zip members."""
+ for member in members:
+ if "/" in member:
+ return member.split("/", 1)[0]
+ return None
+
+
+def _license_headers_check_core_logic_zip(artifact_path: str) -> dict[str,
Any]:
+ """Verify license headers for files within a zip archive."""
+ files_checked = 0
+ files_with_issues: list[str] = []
+ try:
+ with zipfile.ZipFile(artifact_path, "r") as zf:
+ members = zf.infolist()
+
+ for member_info in members:
+ if member_info.is_dir():
+ continue
+
+ member_path = member_info.filename
+ _, extension = os.path.splitext(member_path)
+ extension = extension.lower().lstrip(".")
+
+ if not _license_headers_check_should_check_zip(member_path,
extension):
+ continue
+
+ files_checked += 1
+ is_valid, error_msg =
_license_headers_check_single_file_zip(zf, member_info, extension)
+
+ if error_msg:
+ # Already includes path and error type
+ files_with_issues.append(error_msg)
+ elif not is_valid:
+ # Just append path for header mismatch
+ files_with_issues.append(member_path)
+
+ if files_with_issues:
+ return {
+ "valid": False,
+ "files_checked": files_checked,
+ "files_without_headers": files_with_issues,
+ "error_message": None,
+ }
+ else:
+ return {
+ "valid": True,
+ "files_checked": files_checked,
+ "files_without_headers": [],
+ "error_message": None,
+ }
+
+ except zipfile.BadZipFile as e:
+ return {"valid": False, "error_message": f"Bad zip file: {e}"}
+ except FileNotFoundError:
+ return {"valid": False, "error_message": "File not found"}
+ except Exception as e:
+ return {"valid": False, "error_message": f"Unexpected error: {e}"}
+
+
+def _license_headers_check_should_check_zip(member_path: str, extension: str)
-> bool:
+ """Determine whether a file in a zip should be checked for license
headers."""
+ for pattern in license.INCLUDED_PATTERNS:
+ if license.re.match(pattern, f".{extension}"):
+ # Also check whether we have a comment style defined for it
+ if license.COMMENT_STYLES.get(extension):
+ return True
+ else:
+ _LOGGER.warning(f"No comment style defined for included
extension '{extension}' in {member_path}")
+ return False
+ return False
+
+
+def _license_headers_check_single_file_zip(
+ zf: zipfile.ZipFile, member_info: zipfile.ZipInfo, extension: str
+) -> tuple[bool, str | None]:
+ """Check the license header of a single file within a zip. Returns
(is_valid, error_message)."""
+ member_path = member_info.filename
+ try:
+ with zf.open(member_path) as file_in_zip:
+ content_bytes = file_in_zip.read(2048)
+ header_bytes = license.strip_comments(content_bytes, extension)
+ expected_header_bytes = license.APACHE_LICENSE_HEADER
+ if header_bytes == expected_header_bytes:
+ return True, None
+ else:
+ # Header mismatch
+ return False, None
+ except Exception as read_error:
+ return False, f"{member_path} (Read Error: {read_error})"
+
+
+def _structure_check_core_logic(artifact_path: str) -> dict[str, Any]:
+ """Verify the internal structure of the zip archive."""
+ try:
+ with zipfile.ZipFile(artifact_path, "r") as zf:
+ members = zf.namelist()
+ if not members:
+ return {"error": "Archive is empty"}
+
+ base_name = os.path.basename(artifact_path)
+ name_part = base_name.removesuffix(".zip")
+ # # TODO: Airavata has e.g. "-source-release"
+ # # It would be useful if there were a function in analysis.py for
stripping these
+ # # But the root directory should probably always match the name
of the file sans suffix
+ # # (This would also be easier to implement)
+ # if name_part.endswith(("-src", "-bin", "-dist")):
+ # name_part = "-".join(name_part.split("-")[:-1])
+ expected_root = name_part
+
+ root_dirs, non_rooted_files =
_structure_check_core_logic_find_roots(zf, members)
+ actual_root, error_msg = _structure_check_core_logic_validate_root(
+ members, root_dirs, non_rooted_files, expected_root
+ )
+
+ if error_msg:
+ return {"error": error_msg}
+ if actual_root:
+ return {"root_dir": actual_root}
+ return {"error": "Unknown structure validation error"}
+
+ except zipfile.BadZipFile as e:
+ return {"error": f"Bad zip file: {e}"}
+ except FileNotFoundError:
+ return {"error": "File not found"}
+ except Exception as e:
+ return {"error": f"Unexpected error: {e}"}
+
+
+def _structure_check_core_logic_find_roots(zf: zipfile.ZipFile, members:
list[str]) -> tuple[set[str], list[str]]:
+ """Identify root directories and non-rooted files in a zip archive."""
+ root_dirs: set[str] = set()
+ non_rooted_files: list[str] = []
+ for member in members:
+ if "/" in member:
+ root_dirs.add(member.split("/", 1)[0])
+ elif not zipfile.Path(zf, member).is_dir():
+ non_rooted_files.append(member)
+ return root_dirs, non_rooted_files
+
+
+def _structure_check_core_logic_validate_root(
+ members: list[str], root_dirs: set[str], non_rooted_files: list[str],
expected_root: str
+) -> tuple[str | None, str | None]:
+ """Validate the identified root structure against expectations."""
+ if non_rooted_files:
+ return None, f"Files found directly in root: {non_rooted_files}"
+ if not root_dirs:
+ return None, "No directories found in archive"
+ if len(root_dirs) > 1:
+ return None, f"Multiple root directories found:
{sorted(list(root_dirs))}"
+
+ actual_root = next(iter(root_dirs))
+ if actual_root != expected_root:
+ return None, f"Root directory mismatch. Expected '{expected_root}',
found '{actual_root}'"
+
+ # Check whether all members are under the correct root directory
+ for member in members:
+ if member == actual_root.rstrip("/"):
+ continue
+ if not member.startswith(expected_root):
+ return None, f"Member found outside expected root directory:
{member}"
+
+ return actual_root, None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]