This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 3831f21 Split apart some RAT check functions
3831f21 is described below
commit 3831f21567ce023b4718b116ea8894d1befa85fb
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Jan 12 20:04:20 2026 +0000
Split apart some RAT check functions
---
atr/tasks/checks/rat.py | 482 +++++++++++++++++++++++++-----------------------
1 file changed, 255 insertions(+), 227 deletions(-)
diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py
index 88772d8..cf0ba87 100644
--- a/atr/tasks/checks/rat.py
+++ b/atr/tasks/checks/rat.py
@@ -70,6 +70,10 @@ _STD_EXCLUSIONS_EXTENDED: Final[list[str]] = [
]
+class RatError(RuntimeError):
+ pass
+
+
async def check(args: checks.FunctionArguments) -> results.Results | None:
"""Use Apache RAT to check the licenses of the files in the artifact."""
recorder = await args.recorder()
@@ -155,7 +159,7 @@ async def _check_core(
policy_excludes: list[str],
) -> None:
result = await asyncio.to_thread(
- _check_core_logic,
+ _synchronous,
artifact_path=str(artifact_abs_path),
policy_excludes=policy_excludes,
rat_jar_path=args.extra_args.get("rat_jar_path",
_CONFIG.APACHE_RAT_JAR_PATH),
@@ -166,7 +170,6 @@ async def _check_core(
# Record individual file failures before the overall result
for file in result.unknown_license_files:
await recorder.failure("Unknown license", None,
member_rel_path=file.name)
-
for file in result.unapproved_files:
await recorder.failure("Unapproved license", {"license":
file.license}, member_rel_path=file.name)
@@ -181,148 +184,6 @@ async def _check_core(
await recorder.success(result.message, result_data)
-def _check_core_logic( # noqa: C901
- artifact_path: str,
- policy_excludes: list[str],
- rat_jar_path: str = _CONFIG.APACHE_RAT_JAR_PATH,
- max_extract_size: int = _CONFIG.MAX_EXTRACT_SIZE,
- chunk_size: int = _CONFIG.EXTRACT_CHUNK_SIZE,
-) -> checkdata.Rat:
- """Verify license headers using Apache RAT."""
- log.info(f"Verifying licenses with Apache RAT for {artifact_path}")
- log.info(f"PATH environment variable: {os.environ.get('PATH', 'PATH not
found')}")
-
- java_check = _check_java_installed()
- if java_check is not None:
- return java_check
-
- # Verify RAT JAR exists and is accessible
- rat_jar_path, jar_error = _check_core_logic_jar_exists(rat_jar_path)
- if jar_error:
- return jar_error
-
- try:
- # Create a temporary directory for extraction
- # TODO: We could extract to somewhere in "state/" instead
- with tempfile.TemporaryDirectory(prefix="rat_verify_") as temp_dir:
- log.info(f"Created temporary directory: {temp_dir}")
-
- # Extract the archive to the temporary directory
- log.info(f"Extracting {artifact_path} to {temp_dir}")
- extracted_size, exclude_file_paths = archives.extract(
- artifact_path,
- temp_dir,
- max_size=max_extract_size,
- chunk_size=chunk_size,
- track_files={_RAT_EXCLUDES_FILENAME},
- )
- log.info(f"Extracted {extracted_size} bytes")
- log.info(f"Found {len(exclude_file_paths)}
{_RAT_EXCLUDES_FILENAME} file(s): {exclude_file_paths}")
-
- # Validate that we found at most one exclusion file
- if len(exclude_file_paths) > 1:
- log.error(f"Multiple {_RAT_EXCLUDES_FILENAME} files found:
{exclude_file_paths}")
- return checkdata.Rat(
- message=f"Multiple {_RAT_EXCLUDES_FILENAME} files not
allowed (found {len(exclude_file_paths)})",
- errors=[f"Found {len(exclude_file_paths)}
{_RAT_EXCLUDES_FILENAME} files"],
- )
-
- # Narrow to single path after validation
- archive_excludes_path: str | None = exclude_file_paths[0] if
exclude_file_paths else None
-
- # Determine excludes_source and effective excludes file
- excludes_source: str
- effective_excludes_path: str | None
-
- if archive_excludes_path is not None:
- excludes_source = "archive"
- effective_excludes_path = archive_excludes_path
- log.info(f"Using archive {_RAT_EXCLUDES_FILENAME}:
{archive_excludes_path}")
- elif policy_excludes:
- excludes_source = "policy"
- policy_excludes_file = os.path.join(temp_dir,
_POLICY_EXCLUDES_FILENAME)
- with open(policy_excludes_file, "w") as f:
- f.write("\n".join(policy_excludes))
- effective_excludes_path =
os.path.relpath(policy_excludes_file, temp_dir)
- log.info(f"Using policy excludes written to:
{policy_excludes_file}")
- else:
- excludes_source = "none"
- effective_excludes_path = None
- log.info("No excludes: using defaults only")
-
- # Determine scan root based on archive .rat-excludes location
- if archive_excludes_path is not None:
- scan_root = os.path.dirname(os.path.join(temp_dir,
archive_excludes_path))
-
- # Verify that scan_root is inside temp_dir
- abs_scan_root = os.path.abspath(scan_root)
- abs_temp_dir = os.path.abspath(temp_dir)
- scan_root_is_inside = (abs_scan_root == abs_temp_dir) or
abs_scan_root.startswith(abs_temp_dir + os.sep)
- if not scan_root_is_inside:
- log.error(f"Scan root {scan_root} is outside temp_dir
{temp_dir}")
- return checkdata.Rat(
- message="Invalid archive structure: exclusion file
path escapes extraction directory",
- errors=["Exclusion file path escapes extraction
directory"],
- excludes_source=excludes_source,
- )
-
- log.info(f"Using {_RAT_EXCLUDES_FILENAME} directory as scan
root: {scan_root}")
-
- untracked_count = _count_files_outside_directory(temp_dir,
scan_root)
- if untracked_count > 0:
- log.error(f"Found {untracked_count} file(s) outside
{_RAT_EXCLUDES_FILENAME} directory")
- return checkdata.Rat(
- message=f"Files exist outside {_RAT_EXCLUDES_FILENAME}
directory ({untracked_count} found)",
- errors=[f"{untracked_count} file(s) outside
{_RAT_EXCLUDES_FILENAME} directory"],
- excludes_source=excludes_source,
- )
- else:
- scan_root = temp_dir
- log.info(f"No archive {_RAT_EXCLUDES_FILENAME} found, using
temp_dir as scan root: {scan_root}")
-
- # Execute RAT and get results or error
- # Extended std exclusions apply when there's no archive
.rat-excludes
- apply_extended_std = excludes_source != "archive"
- error_result, xml_output_path = _check_core_logic_execute_rat(
- rat_jar_path, scan_root, temp_dir, effective_excludes_path,
apply_extended_std, excludes_source
- )
- if error_result is not None:
- error_result.excludes_source = excludes_source
- error_result.extended_std_applied = apply_extended_std
- return error_result
-
- # Parse the XML output
- log.info(f"Parsing RAT XML output: {xml_output_path}")
- # Make sure xml_output_path is not None before parsing
- if xml_output_path is None:
- raise ValueError("XML output path is None")
-
- result = _check_core_logic_parse_output(xml_output_path, scan_root)
- log.info(f"Successfully parsed RAT output with
{util.plural(result.total_files, 'file')}")
-
- # The unknown_license_files and unapproved_files contain FileEntry
objects
- # The path is relative to scan_root, so we prepend the scan_root
relative path
- scan_root_rel = os.path.relpath(scan_root, temp_dir)
- if scan_root_rel != ".":
- for file in result.unknown_license_files:
- file.name = os.path.join(scan_root_rel,
os.path.normpath(file.name))
- for file in result.unapproved_files:
- file.name = os.path.join(scan_root_rel,
os.path.normpath(file.name))
-
- result.excludes_source = excludes_source
- result.extended_std_applied = apply_extended_std
- return result
-
- except Exception as e:
- import traceback
-
- log.exception("Error running Apache RAT")
- return checkdata.Rat(
- message=f"Failed to run Apache RAT: {e!s}",
- errors=[str(e), traceback.format_exc()],
- )
-
-
def _check_core_logic_execute_rat(
rat_jar_path: str,
scan_root: str,
@@ -425,53 +286,10 @@ def _check_core_logic_execute_rat(
return None, xml_output_path
-def _check_core_logic_jar_exists(rat_jar_path: str) -> tuple[str,
checkdata.Rat | None]:
- """Verify that the Apache RAT JAR file exists and is accessible."""
- # Check that the RAT JAR exists
- if not os.path.exists(rat_jar_path):
- log.error(f"Apache RAT JAR not found at: {rat_jar_path}")
- # Try a few common locations:
- # ./rat.jar
- # ./state/rat.jar
- # ../rat.jar
- # ../state/rat.jar
- # NOTE: We're also doing something like this in task_verify_rat_license
- # Should probably decide one place to do it, and do it well
- alternative_paths = [
- os.path.join(os.getcwd(), os.path.basename(rat_jar_path)),
- os.path.join(os.getcwd(), "state", os.path.basename(rat_jar_path)),
- os.path.join(os.path.dirname(os.getcwd()),
os.path.basename(rat_jar_path)),
- os.path.join(os.path.dirname(os.getcwd()), "state",
os.path.basename(rat_jar_path)),
- ]
-
- for alt_path in alternative_paths:
- if os.path.exists(alt_path):
- log.info(f"Found alternative RAT JAR at: {alt_path}")
- rat_jar_path = alt_path
- break
-
- # Double check whether we found the JAR
- if not os.path.exists(rat_jar_path):
- log.error("Tried alternative paths but Apache RAT JAR still not
found")
- log.error(f"Current directory: {os.getcwd()}")
- log.error(f"Directory contents: {os.listdir(os.getcwd())}")
- if os.path.exists("state"):
- log.error(f"State directory contents: {os.listdir('state')}")
-
- return rat_jar_path, checkdata.Rat(
- message=f"Apache RAT JAR not found at: {rat_jar_path}",
- errors=[f"Missing JAR: {rat_jar_path}"],
- )
- else:
- log.info(f"Found Apache RAT JAR at: {rat_jar_path}")
-
- return rat_jar_path, None
-
-
-def _check_core_logic_parse_output(xml_file: str, base_dir: str) ->
checkdata.Rat:
+def _synchronous_extract_parse_output(xml_file: str, base_dir: str) ->
checkdata.Rat:
"""Parse the XML output from Apache RAT safely."""
try:
- return _check_core_logic_parse_output_core(xml_file, base_dir)
+ return _synchronous_extract_parse_output_core(xml_file, base_dir)
except Exception as e:
log.error(f"Error parsing RAT output: {e}")
return checkdata.Rat(
@@ -480,7 +298,7 @@ def _check_core_logic_parse_output(xml_file: str, base_dir:
str) -> checkdata.Ra
)
-def _check_core_logic_parse_output_core(xml_file: str, base_dir: str) ->
checkdata.Rat:
+def _synchronous_extract_parse_output_core(xml_file: str, base_dir: str) ->
checkdata.Rat:
"""Parse the XML output from Apache RAT."""
tree = ElementTree.parse(xml_file)
root = tree.getroot()
@@ -547,43 +365,6 @@ def _check_core_logic_parse_output_core(xml_file: str,
base_dir: str) -> checkda
)
-def _check_java_installed() -> checkdata.Rat | None:
- # Check that Java is installed
- # TODO: Run this only once, when the server starts
- try:
- java_version = subprocess.check_output(
- ["java", *_JAVA_MEMORY_ARGS, "-version"],
stderr=subprocess.STDOUT, text=True
- )
- log.info(f"Java version: {java_version.splitlines()[0]}")
- except (subprocess.SubprocessError, FileNotFoundError) as e:
- log.error(f"Java is not properly installed or not in PATH: {e}")
-
- # Try to get some output even if the command failed
- try:
- # Use run instead of check_output to avoid exceptions
- java_result = subprocess.run(
- ["java", *_JAVA_MEMORY_ARGS, "-version"],
- stderr=subprocess.STDOUT,
- stdout=subprocess.PIPE,
- text=True,
- check=False,
- )
- log.info(f"Java command return code: {java_result.returncode}")
- log.info(f"Java command output: {java_result.stdout or
java_result.stderr}")
-
- # Try to find where Java might be located
- which_java = subprocess.run(["which", "java"],
capture_output=True, text=True, check=False)
- which_java_result = which_java.stdout.strip() if
(which_java.returncode == 0) else "not found"
- log.info(f"Result for which java: {which_java_result}")
- except Exception as inner_e:
- log.error(f"Additional error while trying to debug java:
{inner_e}")
-
- return checkdata.Rat(
- message="Java is not properly installed or not in PATH",
- errors=[f"Java error: {e}"],
- )
-
-
def _count_files_outside_directory(temp_dir: str, scan_root: str) -> int:
"""Count regular files that exist outside the scan_root directory."""
count = 0
@@ -627,3 +408,250 @@ def _summary_message(valid: bool, unapproved_licenses:
int, unknown_licenses: in
if unknown_licenses > 0:
message += f"{util.plural(unknown_licenses, 'file')} with unknown
licenses"
return message
+
+
+def _synchronous(
+ artifact_path: str,
+ policy_excludes: list[str],
+ rat_jar_path: str = _CONFIG.APACHE_RAT_JAR_PATH,
+ max_extract_size: int = _CONFIG.MAX_EXTRACT_SIZE,
+ chunk_size: int = _CONFIG.EXTRACT_CHUNK_SIZE,
+) -> checkdata.Rat:
+ """Verify license headers using Apache RAT."""
+ log.info(f"Verifying licenses with Apache RAT for {artifact_path}")
+ log.info(f"PATH environment variable: {os.environ.get('PATH', 'PATH not
found')}")
+
+ java_check = _synchronous_check_java_installed()
+ if java_check is not None:
+ return java_check
+
+ # Verify RAT JAR exists and is accessible
+ rat_jar_path, jar_error = _synchronous_check_jar_exists(rat_jar_path)
+ if jar_error:
+ return jar_error
+
+ try:
+ # Create a temporary directory for extraction
+ # TODO: We could extract to somewhere in "state/" instead
+ with tempfile.TemporaryDirectory(prefix="rat_verify_") as temp_dir:
+ log.info(f"Created temporary directory: {temp_dir}")
+ return _synchronous_extract(
+ artifact_path, temp_dir, max_extract_size, chunk_size,
policy_excludes, rat_jar_path
+ )
+ except Exception as e:
+ import traceback
+
+ log.exception("Error running Apache RAT")
+ return checkdata.Rat(
+ message=f"Failed to run Apache RAT: {e!s}",
+ errors=[str(e), traceback.format_exc()],
+ )
+
+
+def _synchronous_check_jar_exists(rat_jar_path: str) -> tuple[str,
checkdata.Rat | None]:
+ """Verify that the Apache RAT JAR file exists and is accessible."""
+ # Check that the RAT JAR exists
+ if not os.path.exists(rat_jar_path):
+ log.error(f"Apache RAT JAR not found at: {rat_jar_path}")
+ # Try a few common locations:
+ # ./rat.jar
+ # ./state/rat.jar
+ # ../rat.jar
+ # ../state/rat.jar
+ # NOTE: We're also doing something like this in task_verify_rat_license
+ # Should probably decide one place to do it, and do it well
+ alternative_paths = [
+ os.path.join(os.getcwd(), os.path.basename(rat_jar_path)),
+ os.path.join(os.getcwd(), "state", os.path.basename(rat_jar_path)),
+ os.path.join(os.path.dirname(os.getcwd()),
os.path.basename(rat_jar_path)),
+ os.path.join(os.path.dirname(os.getcwd()), "state",
os.path.basename(rat_jar_path)),
+ ]
+
+ for alt_path in alternative_paths:
+ if os.path.exists(alt_path):
+ log.info(f"Found alternative RAT JAR at: {alt_path}")
+ rat_jar_path = alt_path
+ break
+
+ # Double check whether we found the JAR
+ if not os.path.exists(rat_jar_path):
+ log.error("Tried alternative paths but Apache RAT JAR still not
found")
+ log.error(f"Current directory: {os.getcwd()}")
+ log.error(f"Directory contents: {os.listdir(os.getcwd())}")
+ if os.path.exists("state"):
+ log.error(f"State directory contents: {os.listdir('state')}")
+
+ return rat_jar_path, checkdata.Rat(
+ message=f"Apache RAT JAR not found at: {rat_jar_path}",
+ errors=[f"Missing JAR: {rat_jar_path}"],
+ )
+ else:
+ log.info(f"Found Apache RAT JAR at: {rat_jar_path}")
+
+ return rat_jar_path, None
+
+
+def _synchronous_check_java_installed() -> checkdata.Rat | None:
+ # Check that Java is installed
+ # TODO: Run this only once, when the server starts
+ try:
+ java_version = subprocess.check_output(
+ ["java", *_JAVA_MEMORY_ARGS, "-version"],
stderr=subprocess.STDOUT, text=True
+ )
+ log.info(f"Java version: {java_version.splitlines()[0]}")
+ except (subprocess.SubprocessError, FileNotFoundError) as e:
+ log.error(f"Java is not properly installed or not in PATH: {e}")
+
+ # Try to get some output even if the command failed
+ try:
+ # Use run instead of check_output to avoid exceptions
+ java_result = subprocess.run(
+ ["java", *_JAVA_MEMORY_ARGS, "-version"],
+ stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE,
+ text=True,
+ check=False,
+ )
+ log.info(f"Java command return code: {java_result.returncode}")
+ log.info(f"Java command output: {java_result.stdout or
java_result.stderr}")
+
+ # Try to find where Java might be located
+ which_java = subprocess.run(["which", "java"],
capture_output=True, text=True, check=False)
+ which_java_result = which_java.stdout.strip() if
(which_java.returncode == 0) else "not found"
+ log.info(f"Result for which java: {which_java_result}")
+ except Exception as inner_e:
+ log.error(f"Additional error while trying to debug java:
{inner_e}")
+
+ return checkdata.Rat(
+ message="Java is not properly installed or not in PATH",
+ errors=[f"Java error: {e}"],
+ )
+
+
+def _synchronous_extract(
+ artifact_path: str,
+ temp_dir: str,
+ max_extract_size: int,
+ chunk_size: int,
+ policy_excludes: list[str],
+ rat_jar_path: str,
+) -> checkdata.Rat:
+ # Extract the archive to the temporary directory
+ log.info(f"Extracting {artifact_path} to {temp_dir}")
+ extracted_size, exclude_file_paths = archives.extract(
+ artifact_path,
+ temp_dir,
+ max_size=max_extract_size,
+ chunk_size=chunk_size,
+ track_files={_RAT_EXCLUDES_FILENAME},
+ )
+ log.info(f"Extracted {extracted_size} bytes")
+ log.info(f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME}
file(s): {exclude_file_paths}")
+
+ # Validate that we found at most one exclusion file
+ if len(exclude_file_paths) > 1:
+ log.error(f"Multiple {_RAT_EXCLUDES_FILENAME} files found:
{exclude_file_paths}")
+ return checkdata.Rat(
+ message=f"Multiple {_RAT_EXCLUDES_FILENAME} files not allowed
(found {len(exclude_file_paths)})",
+ errors=[f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME}
files"],
+ )
+
+ # Narrow to single path after validation
+ archive_excludes_path: str | None = exclude_file_paths[0] if
exclude_file_paths else None
+
+ excludes_source, effective_excludes_path =
_synchronous_extract_excludes_source(
+ archive_excludes_path, policy_excludes, temp_dir
+ )
+
+ try:
+ scan_root = _synchronous_extract_scan_root(archive_excludes_path,
temp_dir)
+ except RatError as e:
+ return checkdata.Rat(
+ message=f"Failed to determine scan root: {e}",
+ errors=[str(e)],
+ excludes_source=excludes_source,
+ )
+
+ # Execute RAT and get results or error
+ # Extended std exclusions apply when there's no archive .rat-excludes
+ apply_extended_std = excludes_source != "archive"
+ error_result, xml_output_path = _check_core_logic_execute_rat(
+ rat_jar_path, scan_root, temp_dir, effective_excludes_path,
apply_extended_std, excludes_source
+ )
+ if error_result is not None:
+ error_result.excludes_source = excludes_source
+ error_result.extended_std_applied = apply_extended_std
+ return error_result
+
+ # Parse the XML output
+ log.info(f"Parsing RAT XML output: {xml_output_path}")
+ # Make sure xml_output_path is not None before parsing
+ if xml_output_path is None:
+ raise ValueError("XML output path is None")
+
+ result = _synchronous_extract_parse_output(xml_output_path, scan_root)
+ log.info(f"Successfully parsed RAT output with
{util.plural(result.total_files, 'file')}")
+
+ # The unknown_license_files and unapproved_files contain FileEntry objects
+ # The path is relative to scan_root, so we prepend the scan_root relative
path
+ scan_root_rel = os.path.relpath(scan_root, temp_dir)
+ if scan_root_rel != ".":
+ for file in result.unknown_license_files:
+ file.name = os.path.join(scan_root_rel,
os.path.normpath(file.name))
+ for file in result.unapproved_files:
+ file.name = os.path.join(scan_root_rel,
os.path.normpath(file.name))
+
+ result.excludes_source = excludes_source
+ result.extended_std_applied = apply_extended_std
+ return result
+
+
+def _synchronous_extract_excludes_source(
+ archive_excludes_path: str | None, policy_excludes: list[str], temp_dir:
str
+) -> tuple[str, str | None]:
+ # Determine excludes_source and effective excludes file
+ excludes_source: str
+ effective_excludes_path: str | None
+
+ if archive_excludes_path is not None:
+ excludes_source = "archive"
+ effective_excludes_path = archive_excludes_path
+ log.info(f"Using archive {_RAT_EXCLUDES_FILENAME}:
{archive_excludes_path}")
+ elif policy_excludes:
+ excludes_source = "policy"
+ policy_excludes_file = os.path.join(temp_dir,
_POLICY_EXCLUDES_FILENAME)
+ with open(policy_excludes_file, "w") as f:
+ f.write("\n".join(policy_excludes))
+ effective_excludes_path = os.path.relpath(policy_excludes_file,
temp_dir)
+ log.info(f"Using policy excludes written to: {policy_excludes_file}")
+ else:
+ excludes_source = "none"
+ effective_excludes_path = None
+ log.info("No excludes: using defaults only")
+ return excludes_source, effective_excludes_path
+
+
+def _synchronous_extract_scan_root(archive_excludes_path: str | None,
temp_dir: str) -> str:
+ # Determine scan root based on archive .rat-excludes location
+ if archive_excludes_path is not None:
+ scan_root = os.path.dirname(os.path.join(temp_dir,
archive_excludes_path))
+
+ # Verify that scan_root is inside temp_dir
+ abs_scan_root = os.path.abspath(scan_root)
+ abs_temp_dir = os.path.abspath(temp_dir)
+ scan_root_is_inside = (abs_scan_root == abs_temp_dir) or
abs_scan_root.startswith(abs_temp_dir + os.sep)
+ if not scan_root_is_inside:
+ log.error(f"Scan root {scan_root} is outside temp_dir {temp_dir}")
+ raise RatError("Invalid archive structure: exclusion file path
escapes extraction directory")
+
+ log.info(f"Using {_RAT_EXCLUDES_FILENAME} directory as scan root:
{scan_root}")
+
+ untracked_count = _count_files_outside_directory(temp_dir, scan_root)
+ if untracked_count > 0:
+ log.error(f"Found {untracked_count} file(s) outside
{_RAT_EXCLUDES_FILENAME} directory")
+ raise RatError(f"Files exist outside {_RAT_EXCLUDES_FILENAME}
directory ({untracked_count} found)")
+ else:
+ scan_root = temp_dir
+ log.info(f"No archive {_RAT_EXCLUDES_FILENAME} found, using temp_dir
as scan root: {scan_root}")
+
+ return scan_root
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]