This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new d8bc263 Extract hard and soft link member types in archive files
d8bc263 is described below
commit d8bc263cdc1ea431cef09b33d62d27e59f42193f
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Jun 23 16:16:35 2025 +0100
Extract hard and soft link member types in archive files
---
atr/tasks/checks/rat.py | 3 +-
atr/tasks/sbom.py | 140 +++++++++++++++++++++++++++++++++++++-----------
2 files changed, 110 insertions(+), 33 deletions(-)
diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py
index 169fcb0..ec6d125 100644
--- a/atr/tasks/checks/rat.py
+++ b/atr/tasks/checks/rat.py
@@ -93,6 +93,7 @@ def _check_core_logic(
_LOGGER.info(f"PATH environment variable: {os.environ.get('PATH', 'PATH
not found')}")
# Check that Java is installed
+ # TODO: Run this only once, when the server starts
try:
java_version = subprocess.check_output(
["java", *_JAVA_MEMORY_ARGS, "-version"],
stderr=subprocess.STDOUT, text=True
@@ -116,7 +117,7 @@ def _check_core_logic(
# Try to find where Java might be located
which_java = subprocess.run(["which", "java"],
capture_output=True, text=True, check=False)
- which_java_result = which_java.stdout.strip() if
which_java.returncode == 0 else "not found"
+ which_java_result = which_java.stdout.strip() if
(which_java.returncode == 0) else "not found"
_LOGGER.info(f"Result for which java: {which_java_result}")
except Exception as inner_e:
_LOGGER.error(f"Additional error while trying to debug java:
{inner_e}")
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 530db6c..c4967f5 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -61,38 +61,11 @@ def archive_extract_safe(
try:
with tarfile.open(archive_path, mode="r|gz") as tf:
for member in tf:
- if member.name and member.name.split("/")[-1].startswith("._"):
- # Metadata convention
- continue
-
- # Skip anything that's not a file or directory
- if not (member.isreg() or member.isdir()):
- continue
-
- # Check whether extraction would exceed the size limit
- if member.isreg() and ((total_extracted + member.size) >
max_size):
- raise SBOMGenerationError(
- f"Extraction would exceed maximum size limit of
{max_size} bytes",
- {"max_size": max_size, "current_size":
total_extracted, "file_size": member.size},
- )
-
- # Extract directories directly
- if member.isdir():
- # Ensure the path is safe before extracting
- target_path = os.path.join(extract_dir, member.name)
- if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
- _LOGGER.warning(f"Skipping potentially unsafe path:
{member.name}")
- continue
- tf.extract(member, extract_dir, numeric_owner=True)
- continue
-
- if member.isreg():
- extracted_size = _archive_extract_safe_process_file(
- tf, member, extract_dir, total_extracted, max_size,
chunk_size
- )
- total_extracted += extracted_size
-
- # TODO: Add other types here
+ keep_going, total_extracted = archive_extract_member(
+ tf, member, extract_dir, total_extracted, max_size,
chunk_size
+ )
+ if not keep_going:
+ break
except tarfile.ReadError as e:
raise SBOMGenerationError(f"Failed to read archive: {e}",
{"archive_path": archive_path}) from e
@@ -159,6 +132,101 @@ def _archive_extract_safe_process_file(
return extracted_file_size
+def archive_extract_member(
+ tf: tarfile.TarFile, member: tarfile.TarInfo, extract_dir: str,
total_extracted: int, max_size: int, chunk_size: int
+) -> tuple[bool, int]:
+ if member.name and member.name.split("/")[-1].startswith("._"):
+ # Metadata convention
+ return False, 0
+
+ # Skip any character device, block device, or FIFO
+ if member.isdev():
+ return False, 0
+
+ # Check whether extraction would exceed the size limit
+ if member.isreg() and ((total_extracted + member.size) > max_size):
+ raise SBOMGenerationError(
+ f"Extraction would exceed maximum size limit of {max_size} bytes",
+ {"max_size": max_size, "current_size": total_extracted,
"file_size": member.size},
+ )
+
+ # Extract directories directly
+ if member.isdir():
+ # Ensure the path is safe before extracting
+ target_path = os.path.join(extract_dir, member.name)
+ if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+ _LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
+ return False, 0
+ tf.extract(member, extract_dir, numeric_owner=True)
+
+ elif member.isreg():
+ extracted_size = _archive_extract_safe_process_file(
+ tf, member, extract_dir, total_extracted, max_size, chunk_size
+ )
+ total_extracted += extracted_size
+
+ elif member.issym():
+ _archive_extract_safe_process_symlink(member, extract_dir)
+
+ elif member.islnk():
+ _archive_extract_safe_process_hardlink(member, extract_dir)
+
+ return True, total_extracted
+
+
+def _archive_extract_safe_process_hardlink(member: tarfile.TarInfo,
extract_dir: str) -> None:
+ """Safely create a hard link from the TarInfo entry."""
+ target_path = _safe_path(extract_dir, member.name)
+ if target_path is None:
+ _LOGGER.warning(f"Skipping potentially unsafe hard link path:
{member.name}")
+ return
+
+ link_target = member.linkname or ""
+ source_path = _safe_path(extract_dir, link_target)
+ if source_path is None or not os.path.exists(source_path):
+ _LOGGER.warning(f"Skipping hard link with invalid target:
{member.name} -> {link_target}")
+ return
+
+ os.makedirs(os.path.dirname(target_path), exist_ok=True)
+
+ try:
+ if os.path.lexists(target_path):
+ return
+ os.link(source_path, target_path)
+ except (OSError, NotImplementedError) as e:
+ _LOGGER.warning(f"Failed to create hard link {target_path} ->
{source_path}: {e}")
+
+
+def _archive_extract_safe_process_symlink(member: tarfile.TarInfo,
extract_dir: str) -> None:
+ """Safely create a symbolic link from the TarInfo entry."""
+ target_path = _safe_path(extract_dir, member.name)
+ if target_path is None:
+ _LOGGER.warning(f"Skipping potentially unsafe symlink path:
{member.name}")
+ return
+
+ link_target = member.linkname or ""
+
+ # Reject absolute targets to avoid links outside the tree
+ if os.path.isabs(link_target):
+ _LOGGER.warning(f"Skipping symlink with absolute target: {member.name}
-> {link_target}")
+ return
+
+ # Ensure that the resolved link target stays within the extraction
directory
+ resolved_target = _safe_path(os.path.dirname(target_path), link_target)
+ if resolved_target is None:
+ _LOGGER.warning(f"Skipping symlink pointing outside tree:
{member.name} -> {link_target}")
+ return
+
+ os.makedirs(os.path.dirname(target_path), exist_ok=True)
+
+ try:
+ if os.path.lexists(target_path):
+ return
+ os.symlink(link_target, target_path)
+ except (OSError, NotImplementedError) as e:
+ _LOGGER.warning("Failed to create symlink %s -> %s: %s", target_path,
link_target, e)
+
+
async def _generate_cyclonedx_core(artifact_path: str, output_path: str) ->
dict[str, Any]:
"""Core logic to generate CycloneDX SBOM, raising SBOMGenerationError on
failure."""
_LOGGER.info(f"Generating CycloneDX SBOM for {artifact_path} ->
{output_path}")
@@ -247,3 +315,11 @@ async def _generate_cyclonedx_core(artifact_path: str,
output_path: str) -> dict
except FileNotFoundError:
_LOGGER.error("syft command not found. Is it installed and in
PATH?")
raise SBOMGenerationError("syft command not found")
+
+
+def _safe_path(base_dir: str, *paths: str) -> str | None:
+ """Return an absolute path within the base_dir built from the given paths,
or None if it escapes."""
+ target = os.path.abspath(os.path.join(base_dir, *paths))
+ if target.startswith(os.path.abspath(base_dir)):
+ return target
+ return None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]