This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new fe92fad  Add a rough draft of a module for zip format checks
fe92fad is described below

commit fe92fad63f79579ec0d4d8becf321d9d654a9c26
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Apr 7 15:26:51 2025 +0100

    Add a rough draft of a module for zip format checks
---
 atr/db/models.py              |   6 +-
 atr/tasks/__init__.py         |  57 ++++++-
 atr/tasks/checks/__init__.py  |   5 +
 atr/tasks/checks/license.py   | 152 ++++++++---------
 atr/tasks/checks/zipformat.py | 375 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 517 insertions(+), 78 deletions(-)

diff --git a/atr/db/models.py b/atr/db/models.py
index 5612438..6bfe988 100644
--- a/atr/db/models.py
+++ b/atr/db/models.py
@@ -294,9 +294,13 @@ class TaskType(str, enum.Enum):
     PATHS_CHECK = "paths_check"
     RAT_CHECK = "rat_check"
     RSYNC_ANALYSE = "rsync_analyse"
+    SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
     SIGNATURE_CHECK = "signature_check"
     VOTE_INITIATE = "vote_initiate"
-    SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
+    ZIPFORMAT_INTEGRITY = "zipformat_integrity"
+    ZIPFORMAT_LICENSE_FILES = "zipformat_license_files"
+    ZIPFORMAT_LICENSE_HEADERS = "zipformat_license_headers"
+    ZIPFORMAT_STRUCTURE = "zipformat_structure"
 
 
 class Task(sqlmodel.SQLModel, table=True):
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 419343f..b42294d 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -23,12 +23,14 @@ import aiofiles.os
 
 import atr.db as db
 import atr.db.models as models
+import atr.tasks.checks as checks
 import atr.tasks.checks.archive as archive
 import atr.tasks.checks.hashing as hashing
 import atr.tasks.checks.license as license
 import atr.tasks.checks.paths as paths
 import atr.tasks.checks.rat as rat
 import atr.tasks.checks.signature as signature
+import atr.tasks.checks.zipformat as zipformat
 import atr.tasks.rsync as rsync
 import atr.tasks.sbom as sbom
 import atr.tasks.vote as vote
@@ -115,12 +117,20 @@ def resolve(task_type: models.TaskType) -> Callable[..., 
Awaitable[str | None]]:
             return rat.check
         case models.TaskType.RSYNC_ANALYSE:
             return rsync.analyse
+        case models.TaskType.SBOM_GENERATE_CYCLONEDX:
+            return sbom.generate_cyclonedx
         case models.TaskType.SIGNATURE_CHECK:
             return signature.check
         case models.TaskType.VOTE_INITIATE:
             return vote.initiate
-        case models.TaskType.SBOM_GENERATE_CYCLONEDX:
-            return sbom.generate_cyclonedx
+        case models.TaskType.ZIPFORMAT_INTEGRITY:
+            return zipformat.integrity
+        case models.TaskType.ZIPFORMAT_STRUCTURE:
+            return zipformat.structure
+        case models.TaskType.ZIPFORMAT_LICENSE_FILES:
+            return zipformat.license_files
+        case models.TaskType.ZIPFORMAT_LICENSE_HEADERS:
+            return zipformat.license_headers
         # NOTE: Do NOT add "case _" here
         # Otherwise we lose exhaustiveness checking
 
@@ -212,10 +222,53 @@ async def tar_gz_checks(release: models.Release, path: 
str) -> list[models.Task]
     return tasks
 
 
+async def zip_checks(release: models.Release, path: str) -> list[models.Task]:
+    """Create check tasks for a .zip file."""
+    full_path = str(util.get_release_candidate_draft_dir() / 
release.project.name / release.version / path)
+    modified = int(await aiofiles.os.path.getmtime(full_path))
+
+    tasks = [
+        models.Task(
+            status=models.TaskStatus.QUEUED,
+            task_type=models.TaskType.ZIPFORMAT_INTEGRITY,
+            task_args=checks.ReleaseAndAbsPath(release_name=release.name, 
abs_path=full_path).model_dump(),
+            release_name=release.name,
+            path=path,
+            modified=modified,
+        ),
+        models.Task(
+            status=models.TaskStatus.QUEUED,
+            task_type=models.TaskType.ZIPFORMAT_LICENSE_FILES,
+            task_args=checks.ReleaseAndAbsPath(release_name=release.name, 
abs_path=full_path).model_dump(),
+            release_name=release.name,
+            path=path,
+            modified=modified,
+        ),
+        models.Task(
+            status=models.TaskStatus.QUEUED,
+            task_type=models.TaskType.ZIPFORMAT_LICENSE_HEADERS,
+            task_args=checks.ReleaseAndAbsPath(release_name=release.name, 
abs_path=full_path).model_dump(),
+            release_name=release.name,
+            path=path,
+            modified=modified,
+        ),
+        models.Task(
+            status=models.TaskStatus.QUEUED,
+            task_type=models.TaskType.ZIPFORMAT_STRUCTURE,
+            task_args=checks.ReleaseAndAbsPath(release_name=release.name, 
abs_path=full_path).model_dump(),
+            release_name=release.name,
+            path=path,
+            modified=modified,
+        ),
+    ]
+    return tasks
+
+
 TASK_FUNCTIONS: Final[dict[str, Callable[..., Coroutine[Any, Any, 
list[models.Task]]]]] = {
     ".asc": asc_checks,
     ".sha256": sha_checks,
     ".sha512": sha_checks,
     ".tar.gz": tar_gz_checks,
     ".tgz": tar_gz_checks,
+    ".zip": zip_checks,
 }
diff --git a/atr/tasks/checks/__init__.py b/atr/tasks/checks/__init__.py
index 921d1c4..eff9973 100644
--- a/atr/tasks/checks/__init__.py
+++ b/atr/tasks/checks/__init__.py
@@ -153,3 +153,8 @@ def with_model(model_class: type[T]) -> 
Callable[[Callable[..., Awaitable[R]]],
         return wrapper
 
     return decorator
+
+
+class ReleaseAndAbsPath(pydantic.BaseModel):
+    release_name: str = pydantic.Field(..., description="Release name")
+    abs_path: str = pydantic.Field(..., description="Absolute path to the file 
to check")
diff --git a/atr/tasks/checks/license.py b/atr/tasks/checks/license.py
index f050a15..ec96628 100644
--- a/atr/tasks/checks/license.py
+++ b/atr/tasks/checks/license.py
@@ -30,9 +30,30 @@ import atr.tasks.checks.archive as archive
 
 _LOGGER = logging.getLogger(__name__)
 
+
+# Constant that must be present in the Apache License header
+APACHE_LICENSE_HEADER: Final[bytes] = b"""\
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License."""
+
+
 # File type comment style definitions
 # Ordered by their popularity in the Stack Overflow Developer Survey 2024
-_COMMENT_STYLES: Final[dict[str, dict[str, str]]] = {
+COMMENT_STYLES: Final[dict[str, dict[str, str]]] = {
     # JavaScript and variants
     "js": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
     "mjs": {"single": "//", "multi_start": "/*", "multi_end": "*/"},
@@ -121,7 +142,7 @@ _COMMENT_STYLES: Final[dict[str, dict[str, str]]] = {
 
 # Patterns for files to include in license header checks
 # Ordered by their popularity in the Stack Overflow Developer Survey 2024
-_INCLUDED_PATTERNS: Final[list[str]] = [
+INCLUDED_PATTERNS: Final[list[str]] = [
     r"\.(js|mjs|cjs|jsx)$",  # JavaScript
     r"\.py$",  # Python
     r"\.(sql|ddl|dml)$",  # SQL
@@ -150,25 +171,6 @@ _INCLUDED_PATTERNS: Final[list[str]] = [
     r"\.(pl|pm|t)$",  # Perl
 ]
 
-# Constant that must be present in the Apache License header
-_APACHE_LICENSE_HEADER: Final[bytes] = b"""\
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License."""
-
 # Tasks
 
 
@@ -236,6 +238,54 @@ async def headers(args: Headers) -> str | None:
     return None
 
 
+def strip_comments(content: bytes, file_ext: str) -> bytes:
+    """Strip comment prefixes from the content based on the file extension."""
+    if file_ext not in COMMENT_STYLES:
+        return content
+
+    comment_style = COMMENT_STYLES[file_ext]
+    lines = content.split(b"\n")
+    cleaned_lines = []
+
+    # Get comment markers as bytes
+    multi_start = comment_style.get("multi_start", "").encode()
+    multi_end = comment_style.get("multi_end", "").encode()
+    single = comment_style.get("single", "").encode()
+
+    # State tracking
+    in_multiline = False
+    is_c_style = (multi_start == b"/*") and (multi_end == b"*/")
+
+    for line in lines:
+        line = line.strip()
+
+        # Handle start of multi-line comment
+        if not in_multiline and multi_start and multi_start in line:
+            # Get content after multi-start
+            line = line[line.find(multi_start) + len(multi_start) :].strip()
+            in_multiline = True
+
+        # Handle end of multi-line comment
+        elif in_multiline and multi_end and multi_end in line:
+            # Get content before multi-end
+            line = line[: line.find(multi_end)].strip()
+            in_multiline = False
+
+        # Handle single-line comments
+        elif not in_multiline and single and line.startswith(single):
+            line = line[len(single) :].strip()
+
+        # For C style comments, strip leading asterisk if present
+        elif is_c_style and in_multiline and line.startswith(b"*"):
+            line = line[1:].strip()
+
+        # Only add non-empty lines
+        if line:
+            cleaned_lines.append(line)
+
+    return b"\n".join(cleaned_lines)
+
+
 # File helpers
 
 
@@ -425,7 +475,7 @@ def _headers_check_core_logic_process_file(
 
         # Allow for some extra content at the start of the file
         # That may be shebangs, encoding declarations, etc.
-        content = f.read(len(_APACHE_LICENSE_HEADER) + 512)
+        content = f.read(len(APACHE_LICENSE_HEADER) + 512)
         is_valid, error = _headers_validate(content, member.name)
         if is_valid:
             return True, {"valid": True}
@@ -442,11 +492,11 @@ def _headers_check_core_logic_should_check(filepath: str) 
-> bool:
         return False
 
     # First check if we have comment style definitions for this extension
-    if ext not in _COMMENT_STYLES:
+    if ext not in COMMENT_STYLES:
         return False
 
     # Then check if the file matches any of our included patterns
-    for pattern in _INCLUDED_PATTERNS:
+    for pattern in INCLUDED_PATTERNS:
         if re.search(pattern, filepath, re.IGNORECASE):
             return True
 
@@ -461,66 +511,18 @@ def _get_file_extension(filename: str) -> str | None:
     return ext[1:].lower()
 
 
-def _strip_comments(content: bytes, file_ext: str) -> bytes:
-    """Strip comment prefixes from the content based on the file extension."""
-    if file_ext not in _COMMENT_STYLES:
-        return content
-
-    comment_style = _COMMENT_STYLES[file_ext]
-    lines = content.split(b"\n")
-    cleaned_lines = []
-
-    # Get comment markers as bytes
-    multi_start = comment_style.get("multi_start", "").encode()
-    multi_end = comment_style.get("multi_end", "").encode()
-    single = comment_style.get("single", "").encode()
-
-    # State tracking
-    in_multiline = False
-    is_c_style = (multi_start == b"/*") and (multi_end == b"*/")
-
-    for line in lines:
-        line = line.strip()
-
-        # Handle start of multi-line comment
-        if not in_multiline and multi_start and multi_start in line:
-            # Get content after multi-start
-            line = line[line.find(multi_start) + len(multi_start) :].strip()
-            in_multiline = True
-
-        # Handle end of multi-line comment
-        elif in_multiline and multi_end and multi_end in line:
-            # Get content before multi-end
-            line = line[: line.find(multi_end)].strip()
-            in_multiline = False
-
-        # Handle single-line comments
-        elif not in_multiline and single and line.startswith(single):
-            line = line[len(single) :].strip()
-
-        # For C style comments, strip leading asterisk if present
-        elif is_c_style and in_multiline and line.startswith(b"*"):
-            line = line[1:].strip()
-
-        # Only add non-empty lines
-        if line:
-            cleaned_lines.append(line)
-
-    return b"\n".join(cleaned_lines)
-
-
 def _headers_validate(content: bytes, filename: str) -> tuple[bool, str | 
None]:
     """Validate that the content contains the Apache License header after 
removing comments."""
     # Get the file extension from the filename
     file_ext = _get_file_extension(filename)
-    if not file_ext or file_ext not in _COMMENT_STYLES:
+    if not file_ext or file_ext not in COMMENT_STYLES:
         return False, "Could not determine file type from extension"
 
     # Strip comments, removing empty lines in the process
-    cleaned_header = _strip_comments(content, file_ext)
+    cleaned_header = strip_comments(content, file_ext)
 
     # Normalise the expected header in the same way as directly above
-    expected_lines = [line.strip() for line in 
_APACHE_LICENSE_HEADER.split(b"\n")]
+    expected_lines = [line.strip() for line in 
APACHE_LICENSE_HEADER.split(b"\n")]
     expected_lines = [line for line in expected_lines if line]
     expected_header = b"\n".join(expected_lines)
 
diff --git a/atr/tasks/checks/zipformat.py b/atr/tasks/checks/zipformat.py
new file mode 100644
index 0000000..b47c9ee
--- /dev/null
+++ b/atr/tasks/checks/zipformat.py
@@ -0,0 +1,375 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import logging
+import os
+import zipfile
+from typing import Any
+
+import atr.tasks.checks as checks
+import atr.tasks.checks.license as license
+
+_LOGGER = logging.getLogger(__name__)
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def integrity(args: checks.ReleaseAndAbsPath) -> str | None:
+    """Check that the zip archive is not corrupted and can be opened."""
+    rel_path = checks.rel_path(args.abs_path)
+    check_instance = await checks.Check.create(checker=integrity, 
release_name=args.release_name, path=rel_path)
+    _LOGGER.info(f"Checking zip integrity for {args.abs_path} (rel: 
{rel_path})")
+
+    try:
+        result_data = await asyncio.to_thread(_integrity_check_core_logic, 
args.abs_path)
+        if result_data.get("error"):
+            await check_instance.failure(result_data["error"], result_data)
+        else:
+            await check_instance.success(
+                f"Zip archive integrity OK ({result_data['member_count']} 
members)", result_data
+            )
+    except Exception as e:
+        await check_instance.exception("Error checking zip integrity", 
{"error": str(e)})
+
+    return None
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def license_files(args: checks.ReleaseAndAbsPath) -> str | None:
+    """Check that the LICENSE and NOTICE files exist and are valid within the 
zip."""
+    rel_path = checks.rel_path(args.abs_path)
+    check_instance = await checks.Check.create(checker=license_files, 
release_name=args.release_name, path=rel_path)
+    _LOGGER.info(f"Checking zip license files for {args.abs_path} (rel: 
{rel_path})")
+
+    try:
+        result_data = await 
asyncio.to_thread(_license_files_check_core_logic_zip, args.abs_path)
+
+        if result_data.get("error"):
+            await check_instance.failure(result_data["error"], result_data)
+        elif result_data.get("license_valid") and 
result_data.get("notice_valid"):
+            await check_instance.success("LICENSE and NOTICE files present and 
valid in zip", result_data)
+        else:
+            issues = []
+            if not result_data.get("license_found"):
+                issues.append("LICENSE missing")
+            elif not result_data.get("license_valid"):
+                issues.append("LICENSE invalid or empty")
+            if not result_data.get("notice_found"):
+                issues.append("NOTICE missing")
+            elif not result_data.get("notice_valid"):
+                issues.append("NOTICE invalid or empty")
+            issue_str = ", ".join(issues) if issues else "Issues found with 
LICENSE or NOTICE files"
+            await check_instance.failure(issue_str, result_data)
+
+    except Exception as e:
+        await check_instance.exception("Error checking zip license files", 
{"error": str(e)})
+
+    return None
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def license_headers(args: checks.ReleaseAndAbsPath) -> str | None:
+    """Check that all source files within the zip have valid license 
headers."""
+    rel_path = checks.rel_path(args.abs_path)
+    check_instance = await checks.Check.create(checker=license_headers, 
release_name=args.release_name, path=rel_path)
+    _LOGGER.info(f"Checking zip license headers for {args.abs_path} (rel: 
{rel_path})")
+
+    try:
+        result_data = await 
asyncio.to_thread(_license_headers_check_core_logic_zip, args.abs_path)
+
+        if result_data.get("error_message"):
+            await check_instance.failure(result_data["error_message"], 
result_data)
+        elif not result_data.get("valid"):
+            num_issues = len(result_data.get("files_without_headers", []))
+            failure_msg = f"{num_issues} file(s) missing or having invalid 
license headers"
+            await check_instance.failure(failure_msg, result_data)
+        else:
+            await check_instance.success(
+                f"License headers OK ({result_data.get('files_checked', 0)} 
files checked)", result_data
+            )
+
+    except Exception as e:
+        await check_instance.exception("Error checking zip license headers", 
{"error": str(e)})
+
+    return None
+
+
[email protected]_model(checks.ReleaseAndAbsPath)
+async def structure(args: checks.ReleaseAndAbsPath) -> str | None:
+    """Check that the zip archive has a single root directory matching the 
artifact name."""
+    rel_path = checks.rel_path(args.abs_path)
+    check_instance = await checks.Check.create(checker=structure, 
release_name=args.release_name, path=rel_path)
+    _LOGGER.info(f"Checking zip structure for {args.abs_path} (rel: 
{rel_path})")
+
+    try:
+        result_data = await asyncio.to_thread(_structure_check_core_logic, 
args.abs_path)
+        if result_data.get("error"):
+            await check_instance.failure(result_data["error"], result_data)
+        else:
+            await check_instance.success(f"Zip structure OK (root: 
{result_data['root_dir']})", result_data)
+    except Exception as e:
+        await check_instance.exception("Error checking zip structure", 
{"error": str(e)})
+
+    return None
+
+
+def _integrity_check_core_logic(artifact_path: str) -> dict[str, Any]:
+    """Verify that a zip file can be opened and its members listed."""
+    try:
+        with zipfile.ZipFile(artifact_path, "r") as zf:
+            # This is a simple check using list members
+            # We can use zf.testzip() for CRC checks if needed, though this 
will be slower
+            member_list = zf.namelist()
+            return {"member_count": len(member_list)}
+    except zipfile.BadZipFile as e:
+        return {"error": f"Bad zip file: {e}"}
+    except FileNotFoundError:
+        return {"error": "File not found"}
+    except Exception as e:
+        return {"error": f"Unexpected error: {e}"}
+
+
+def _license_files_check_file_zip(zf: zipfile.ZipFile, artifact_path: str, 
expected_path: str) -> tuple[bool, bool]:
+    """Check for the presence and basic validity of a specific file in a 
zip."""
+    found = False
+    valid = False
+    try:
+        with zf.open(expected_path) as file_handle:
+            found = True
+            content = file_handle.read().strip()
+            if content:
+                # TODO: Add more specific NOTICE checks if needed
+                valid = True
+    except KeyError:
+        # File not found in zip
+        ...
+    except Exception as e:
+        filename = os.path.basename(expected_path)
+        _LOGGER.warning(f"Error reading {filename} in zip {artifact_path}: 
{e}")
+    return found, valid
+
+
+def _license_files_check_core_logic_zip(artifact_path: str) -> dict[str, Any]:
+    """Verify LICENSE and NOTICE files within a zip archive."""
+    # TODO: Obviously we want to reuse the license files check logic from 
license.py
+    # But we'd need to have task dependencies to do that, ideally
+    try:
+        with zipfile.ZipFile(artifact_path, "r") as zf:
+            members = zf.namelist()
+            if not members:
+                return {"error": "Archive is empty"}
+
+            root_dir = _license_files_find_root_dir_zip(members)
+            if not root_dir:
+                return {"error": "Could not determine root directory"}
+
+            expected_license_path = root_dir + "LICENSE"
+            expected_notice_path = root_dir + "NOTICE"
+
+            member_set = set(members)
+
+            license_found, license_valid = (
+                _license_files_check_file_zip(zf, artifact_path, 
expected_license_path)
+                if (expected_license_path in member_set)
+                else (False, False)
+            )
+            notice_found, notice_valid = (
+                _license_files_check_file_zip(zf, artifact_path, 
expected_notice_path)
+                if (expected_notice_path in member_set)
+                else (False, False)
+            )
+
+            return {
+                "root_dir": root_dir,
+                "license_found": license_found,
+                "license_valid": license_valid,
+                "notice_found": notice_found,
+                "notice_valid": notice_valid,
+            }
+
+    except zipfile.BadZipFile as e:
+        return {"error": f"Bad zip file: {e}"}
+    except FileNotFoundError:
+        return {"error": "File not found"}
+    except Exception as e:
+        return {"error": f"Unexpected error: {e}"}
+
+
+def _license_files_find_root_dir_zip(members: list[str]) -> str | None:
+    """Find the root directory in a list of zip members."""
+    for member in members:
+        if "/" in member:
+            return member.split("/", 1)[0]
+    return None
+
+
+def _license_headers_check_core_logic_zip(artifact_path: str) -> dict[str, 
Any]:
+    """Verify license headers for files within a zip archive."""
+    files_checked = 0
+    files_with_issues: list[str] = []
+    try:
+        with zipfile.ZipFile(artifact_path, "r") as zf:
+            members = zf.infolist()
+
+            for member_info in members:
+                if member_info.is_dir():
+                    continue
+
+                member_path = member_info.filename
+                _, extension = os.path.splitext(member_path)
+                extension = extension.lower().lstrip(".")
+
+                if not _license_headers_check_should_check_zip(member_path, 
extension):
+                    continue
+
+                files_checked += 1
+                is_valid, error_msg = 
_license_headers_check_single_file_zip(zf, member_info, extension)
+
+                if error_msg:
+                    # Already includes path and error type
+                    files_with_issues.append(error_msg)
+                elif not is_valid:
+                    # Just append path for header mismatch
+                    files_with_issues.append(member_path)
+
+            if files_with_issues:
+                return {
+                    "valid": False,
+                    "files_checked": files_checked,
+                    "files_without_headers": files_with_issues,
+                    "error_message": None,
+                }
+            else:
+                return {
+                    "valid": True,
+                    "files_checked": files_checked,
+                    "files_without_headers": [],
+                    "error_message": None,
+                }
+
+    except zipfile.BadZipFile as e:
+        return {"valid": False, "error_message": f"Bad zip file: {e}"}
+    except FileNotFoundError:
+        return {"valid": False, "error_message": "File not found"}
+    except Exception as e:
+        return {"valid": False, "error_message": f"Unexpected error: {e}"}
+
+
+def _license_headers_check_should_check_zip(member_path: str, extension: str) 
-> bool:
+    """Determine whether a file in a zip should be checked for license 
headers."""
+    for pattern in license.INCLUDED_PATTERNS:
+        if license.re.match(pattern, f".{extension}"):
+            # Also check whether we have a comment style defined for it
+            if license.COMMENT_STYLES.get(extension):
+                return True
+            else:
+                _LOGGER.warning(f"No comment style defined for included 
extension '{extension}' in {member_path}")
+                return False
+    return False
+
+
+def _license_headers_check_single_file_zip(
+    zf: zipfile.ZipFile, member_info: zipfile.ZipInfo, extension: str
+) -> tuple[bool, str | None]:
+    """Check the license header of a single file within a zip. Returns 
(is_valid, error_message)."""
+    member_path = member_info.filename
+    try:
+        with zf.open(member_path) as file_in_zip:
+            content_bytes = file_in_zip.read(2048)
+            header_bytes = license.strip_comments(content_bytes, extension)
+            expected_header_bytes = license.APACHE_LICENSE_HEADER
+            if header_bytes == expected_header_bytes:
+                return True, None
+            else:
+                # Header mismatch
+                return False, None
+    except Exception as read_error:
+        return False, f"{member_path} (Read Error: {read_error})"
+
+
+def _structure_check_core_logic(artifact_path: str) -> dict[str, Any]:
+    """Verify the internal structure of the zip archive."""
+    try:
+        with zipfile.ZipFile(artifact_path, "r") as zf:
+            members = zf.namelist()
+            if not members:
+                return {"error": "Archive is empty"}
+
+            base_name = os.path.basename(artifact_path)
+            name_part = base_name.removesuffix(".zip")
+            # # TODO: Airavata has e.g. "-source-release"
+            # # It would be useful if there were a function in analysis.py for 
stripping these
+            # # But the root directory should probably always match the name 
of the file sans suffix
+            # # (This would also be easier to implement)
+            # if name_part.endswith(("-src", "-bin", "-dist")):
+            #     name_part = "-".join(name_part.split("-")[:-1])
+            expected_root = name_part
+
+            root_dirs, non_rooted_files = 
_structure_check_core_logic_find_roots(zf, members)
+            actual_root, error_msg = _structure_check_core_logic_validate_root(
+                members, root_dirs, non_rooted_files, expected_root
+            )
+
+            if error_msg:
+                return {"error": error_msg}
+            if actual_root:
+                return {"root_dir": actual_root}
+            return {"error": "Unknown structure validation error"}
+
+    except zipfile.BadZipFile as e:
+        return {"error": f"Bad zip file: {e}"}
+    except FileNotFoundError:
+        return {"error": "File not found"}
+    except Exception as e:
+        return {"error": f"Unexpected error: {e}"}
+
+
+def _structure_check_core_logic_find_roots(zf: zipfile.ZipFile, members: 
list[str]) -> tuple[set[str], list[str]]:
+    """Identify root directories and non-rooted files in a zip archive."""
+    root_dirs: set[str] = set()
+    non_rooted_files: list[str] = []
+    for member in members:
+        if "/" in member:
+            root_dirs.add(member.split("/", 1)[0])
+        elif not zipfile.Path(zf, member).is_dir():
+            non_rooted_files.append(member)
+    return root_dirs, non_rooted_files
+
+
+def _structure_check_core_logic_validate_root(
+    members: list[str], root_dirs: set[str], non_rooted_files: list[str], 
expected_root: str
+) -> tuple[str | None, str | None]:
+    """Validate the identified root structure against expectations."""
+    if non_rooted_files:
+        return None, f"Files found directly in root: {non_rooted_files}"
+    if not root_dirs:
+        return None, "No directories found in archive"
+    if len(root_dirs) > 1:
+        return None, f"Multiple root directories found: 
{sorted(list(root_dirs))}"
+
+    actual_root = next(iter(root_dirs))
+    if actual_root != expected_root:
+        return None, f"Root directory mismatch. Expected '{expected_root}', 
found '{actual_root}'"
+
+    # Check whether all members are under the correct root directory
+    for member in members:
+        if member == actual_root.rstrip("/"):
+            continue
+        if not member.startswith(expected_root):
+            return None, f"Member found outside expected root directory: 
{member}"
+
+    return actual_root, None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to