This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch sbp in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 4743bd24f5d33d0c77ecb8dcf0c581c5b85906cc Author: Sean B. Palmer <[email protected]> AuthorDate: Mon Jan 5 14:50:51 2026 +0000 Change how RAT checks are applied --- atr/archives.py | 31 +++-- atr/tasks/checks/__init__.py | 4 + atr/tasks/checks/rat.py | 296 +++++++++++++++++++++++++++++-------------- 3 files changed, 223 insertions(+), 108 deletions(-) diff --git a/atr/archives.py b/atr/archives.py index b3bcbf4..2560c95 100644 --- a/atr/archives.py +++ b/atr/archives.py @@ -82,7 +82,7 @@ def total_size(tgz_path: str, chunk_size: int = 4096) -> int: return total_size -def _archive_extract_member( +def _archive_extract_member( # noqa: C901 tf: tarfile.TarFile, member: tarfile.TarInfo, extract_dir: str, @@ -101,8 +101,10 @@ def _archive_extract_member( if member.isdev(): return 0, extracted_paths + # Only track files that pass the path safety check if track_files and isinstance(track_files, set) and (member_basename in track_files): - extracted_paths.append(member.name) + if _safe_path(extract_dir, member.name) is not None: + extracted_paths.append(member.name) # Check whether extraction would exceed the size limit if member.isreg() and ((total_extracted + member.size) > max_size): @@ -114,8 +116,7 @@ def _archive_extract_member( # Extract directories directly if member.isdir(): # Ensure the path is safe before extracting - target_path = os.path.join(extract_dir, member.name) - if not os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)): + if _safe_path(extract_dir, member.name) is None: log.warning(f"Skipping potentially unsafe path: {member.name}") return 0, extracted_paths tf.extract(member, extract_dir, numeric_owner=True) @@ -144,8 +145,8 @@ def _archive_extract_safe_process_file( chunk_size: int, ) -> int: """Process a single file member during safe archive extraction.""" - target_path = os.path.join(extract_dir, member.name) - if not os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)): + target_path = _safe_path(extract_dir, member.name) + if target_path is None: log.warning(f"Skipping potentially unsafe path: {member.name}") return 0 @@ -235,7 +236,8 @@ def _archive_extract_safe_process_symlink(member: tarfile.TarInfo, extract_dir: def _safe_path(base_dir: str, *paths: str) -> str | None: """Return an absolute path within the base_dir built from the given paths, or None if it escapes.""" target = os.path.abspath(os.path.join(base_dir, *paths)) - if target.startswith(os.path.abspath(base_dir)): + abs_base = os.path.abspath(base_dir) + if (target == abs_base) or target.startswith(abs_base + os.sep): return target return None @@ -277,8 +279,11 @@ def _zip_archive_extract_member( extracted_paths: list[str] = [], ) -> tuple[int, list[str]]: member_basename = os.path.basename(member.name) - if track_files and (isinstance(track_files, set) and (member_basename in track_files)): - extracted_paths.append(member.name) + + # Only track files that pass the path safety check + if track_files and isinstance(track_files, set) and (member_basename in track_files): + if _safe_path(extract_dir, member.name) is not None: + extracted_paths.append(member.name) if member_basename.startswith("._"): return 0, extracted_paths @@ -290,8 +295,8 @@ def _zip_archive_extract_member( ) if member.isdir(): - target_path = os.path.join(extract_dir, member.name) - if not os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)): + target_path = _safe_path(extract_dir, member.name) + if target_path is None: log.warning(f"Skipping potentially unsafe path: {member.name}") return 0, extracted_paths os.makedirs(target_path, exist_ok=True) @@ -314,8 +319,8 @@ def _zip_extract_safe_process_file( max_size: int, chunk_size: int, ) -> int: - target_path = os.path.join(extract_dir, member.name) - if not os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)): + target_path = _safe_path(extract_dir, member.name) + if target_path is None: log.warning(f"Skipping potentially unsafe path: {member.name}") return 0 diff --git a/atr/tasks/checks/__init__.py b/atr/tasks/checks/__init__.py index 09a15d3..6bcfb36 100644 --- a/atr/tasks/checks/__init__.py +++ b/atr/tasks/checks/__init__.py @@ -204,6 +204,10 @@ class Recorder: if not await aiofiles.os.path.isfile(path): return False + no_cache_file = self.abs_path_base() / ".atr-no-cache" + if await aiofiles.os.path.exists(no_cache_file): + return False + self.__input_hash = await _compute_file_hash(path) async with db.session() as data: diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py index 65840c8..0ca3d04 100644 --- a/atr/tasks/checks/rat.py +++ b/atr/tasks/checks/rat.py @@ -42,7 +42,35 @@ _JAVA_MEMORY_ARGS: Final[list[str]] = [] # "-XX:MaxRAM=256m", # "-XX:CompressedClassSpaceSize=16m" # ] -_RAT_EXCLUDES_FILENAMES: Final[set[str]] = {".rat-excludes", "rat-excludes.txt"} + +# Generated file patterns, always excluded +_GENERATED_FILE_PATTERNS: Final[list[str]] = [ + "**/*.bundle.js", + "**/*.chunk.js", + "**/*.css.map", + "**/*.js.map", + "**/*.min.css", + "**/*.min.js", + "**/*.min.map", +] + +# The name of the file that contains the exclusions for the specified archive +_RAT_EXCLUDES_FILENAME: Final[str] = ".rat-excludes" + +# The name of the RAT report file +_RAT_REPORT_FILENAME: Final[str] = "rat-report.xml" + +# Standard exclusions, always applied explicitly +_STD_EXCLUSIONS_ALWAYS: Final[list[str]] = ["MISC", "HIDDEN_DIR", "MAC"] + +# Additional exclusions when no project exclusion file is found +_STD_EXCLUSIONS_EXTENDED: Final[list[str]] = [ + "MAVEN", + "ECLIPSE", + "IDEA", + "GIT", + "STANDARD_SCMS", +] async def check(args: checks.FunctionArguments) -> results.Results | None: @@ -74,6 +102,50 @@ async def check(args: checks.FunctionArguments) -> results.Results | None: return None +def _build_rat_command( + rat_jar_path: str, + xml_output_path: str, + exclude_file: str | None, +) -> list[str]: + """Build the RAT command with appropriate exclusions.""" + command = [ + "java", + *_JAVA_MEMORY_ARGS, + "-jar", + rat_jar_path, + "--output-style", + "xml", + "--output-file", + xml_output_path, + "--counter-max", + "UNAPPROVED:-1", + "--counter-min", + "LICENSE_CATEGORIES:0", + "LICENSE_NAMES:0", + "STANDARDS:0", + ] + + for std in _STD_EXCLUSIONS_ALWAYS: + command.extend(["--input-exclude-std", std]) + + if exclude_file is None: + for std in _STD_EXCLUSIONS_EXTENDED: + command.extend(["--input-exclude-std", std]) + + for pattern in _GENERATED_FILE_PATTERNS: + command.extend(["--input-exclude", pattern]) + + command.extend(["--input-exclude", _RAT_REPORT_FILENAME]) + + if exclude_file is not None: + command.extend(["--input-exclude", _RAT_EXCLUDES_FILENAME]) + command.extend(["--input-exclude-file", exclude_file]) + + command.extend(["--", "."]) + + return command + + async def _check_core( args: checks.FunctionArguments, recorder: checks.Recorder, artifact_abs_path: pathlib.Path ) -> None: @@ -120,7 +192,7 @@ async def _check_core( await recorder.success(result_data["message"], result_data) -def _check_core_logic( +def _check_core_logic( # noqa: C901 artifact_path: str, rat_jar_path: str = _CONFIG.APACHE_RAT_JAR_PATH, max_extract_size: int = _CONFIG.MAX_EXTRACT_SIZE, @@ -168,29 +240,79 @@ def _check_core_logic( # Extract the archive to the temporary directory log.info(f"Extracting {artifact_path} to {temp_dir}") - extracted_size, extracted_paths = archives.extract( + extracted_size, exclude_file_paths = archives.extract( artifact_path, temp_dir, max_size=max_extract_size, chunk_size=chunk_size, - track_files=_RAT_EXCLUDES_FILENAMES, + track_files={_RAT_EXCLUDES_FILENAME}, ) log.info(f"Extracted {extracted_size} bytes") + log.info(f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME} file(s): {exclude_file_paths}") - # Find the root directory - if (extract_dir := _extracted_dir(temp_dir)) is None: - log.error("No root directory found in archive") + # Validate that we found at most one exclusion file + if len(exclude_file_paths) > 1: + log.error(f"Multiple {_RAT_EXCLUDES_FILENAME} files found: {exclude_file_paths}") return { "valid": False, - "message": "No root directory found in archive", - "errors": [], + "message": f"Multiple {_RAT_EXCLUDES_FILENAME} files not allowed (found {len(exclude_file_paths)})", + "total_files": 0, + "approved_licenses": 0, + "unapproved_licenses": 0, + "unknown_licenses": 0, + "unapproved_files": [], + "unknown_license_files": [], + "errors": [f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME} files"], } - log.info(f"Using root directory: {extract_dir}") + # Narrow to single path after validation + exclude_file_path: str | None = exclude_file_paths[0] if exclude_file_paths else None + + # Determine scan root + if exclude_file_path is not None: + scan_root = os.path.dirname(os.path.join(temp_dir, exclude_file_path)) + + # Verify that scan_root is inside temp_dir + abs_scan_root = os.path.abspath(scan_root) + abs_temp_dir = os.path.abspath(temp_dir) + scan_root_is_inside = (abs_scan_root == abs_temp_dir) or abs_scan_root.startswith(abs_temp_dir + os.sep) + if not scan_root_is_inside: + log.error(f"Scan root {scan_root} is outside temp_dir {temp_dir}") + return { + "valid": False, + "message": "Invalid archive structure: exclusion file path escapes extraction directory", + "total_files": 0, + "approved_licenses": 0, + "unapproved_licenses": 0, + "unknown_licenses": 0, + "unapproved_files": [], + "unknown_license_files": [], + "errors": ["Exclusion file path escapes extraction directory"], + } + + log.info(f"Using {_RAT_EXCLUDES_FILENAME} directory as scan root: {scan_root}") + + untracked_count = _count_files_outside_directory(temp_dir, scan_root) + if untracked_count > 0: + log.error(f"Found {untracked_count} file(s) outside {_RAT_EXCLUDES_FILENAME} directory") + return { + "valid": False, + "message": f"Files exist outside {_RAT_EXCLUDES_FILENAME} directory ({untracked_count} found)", + "total_files": 0, + "approved_licenses": 0, + "unapproved_licenses": 0, + "unknown_licenses": 0, + "unapproved_files": [], + "unknown_license_files": [], + "errors": [f"{untracked_count} file(s) outside {_RAT_EXCLUDES_FILENAME} directory"], + } + else: + scan_root = temp_dir + log.info(f"No {_RAT_EXCLUDES_FILENAME} found, using temp_dir as scan root: {scan_root}") # Execute RAT and get results or error error_result, xml_output_path = _check_core_logic_execute_rat( - rat_jar_path, extract_dir, temp_dir, extracted_paths + rat_jar_path, scan_root, temp_dir, exclude_file_path ) if error_result: return error_result @@ -201,18 +323,24 @@ def _check_core_logic( if xml_output_path is None: raise ValueError("XML output path is None") - results = _check_core_logic_parse_output(xml_output_path, extract_dir) + results = _check_core_logic_parse_output(xml_output_path, scan_root) log.info(f"Successfully parsed RAT output with {util.plural(results.get('total_files', 0), 'file')}") - # The unknown_license_files key may contain a list of dicts + # The unknown_license_files and unapproved_files keys contain lists of dicts # {"name": "./README.md", "license": "Unknown license"} - # The path is missing the root of the archive, so we add it - extract_dir_basename = os.path.basename(extract_dir) - for file in results["unknown_license_files"]: - file["name"] = os.path.join( - extract_dir_basename, - os.path.normpath(file["name"]), - ) + # The path is relative to scan_root, so we prepend the scan_root relative path + scan_root_rel = os.path.relpath(scan_root, temp_dir) + if scan_root_rel != ".": + for file in results["unknown_license_files"]: + file["name"] = os.path.join( + scan_root_rel, + os.path.normpath(file["name"]), + ) + for file in results["unapproved_files"]: + file["name"] = os.path.join( + scan_root_rel, + os.path.normpath(file["name"]), + ) return results @@ -234,42 +362,37 @@ def _check_core_logic( def _check_core_logic_execute_rat( - rat_jar_path: str, extract_dir: str, temp_dir: str, excluded_paths: list[str] + rat_jar_path: str, scan_root: str, temp_dir: str, exclude_file_path: str | None ) -> tuple[dict[str, Any] | None, str | None]: """Execute Apache RAT and process its output.""" - # Define output file path - xml_output_path = os.path.join(temp_dir, "rat-report.xml") + xml_output_path = os.path.join(temp_dir, _RAT_REPORT_FILENAME) log.info(f"XML output will be written to: {xml_output_path}") - # Run Apache RAT on the extracted directory - # TODO: From RAT 0.17, --exclude will become --input-exclude - # TODO: Check whether --exclude NAME works on inner files - # (Note that we presently use _rat_apply_exclusions to apply exclusions instead) - command = [ - "java", - *_JAVA_MEMORY_ARGS, - "-jar", - rat_jar_path, - "--output-style", - "xml", - "--output-file", - xml_output_path, - "--counter-max", - "UNAPPROVED:-1", - "--counter-min", - "LICENSE_CATEGORIES:0", - "LICENSE_NAMES:0", - "STANDARDS:0", - "--", - ".", - ] - if excluded_paths: - _rat_apply_exclusions(extract_dir, excluded_paths, temp_dir) + # Convert exclusion file path from temp_dir relative to scan_root relative + exclude_file: str | None = None + if exclude_file_path is not None: + abs_path = os.path.join(temp_dir, exclude_file_path) + if not (os.path.exists(abs_path) and os.path.isfile(abs_path)): + log.error(f"Exclusion file not found or not a regular file: {abs_path}") + return { + "valid": False, + "message": f"Exclusion file is not a regular file: {exclude_file_path}", + "total_files": 0, + "approved_licenses": 0, + "unapproved_licenses": 0, + "unknown_licenses": 0, + "unapproved_files": [], + "unknown_license_files": [], + "errors": [f"Expected exclusion file but found: {abs_path}"], + }, None + exclude_file = os.path.relpath(abs_path, scan_root) + log.info(f"Using exclusion file: {exclude_file}") + command = _build_rat_command(rat_jar_path, xml_output_path, exclude_file) log.info(f"Running Apache RAT: {' '.join(command)}") - # Change working directory to extract_dir when running the process + # Change working directory to scan_root when running the process current_dir = os.getcwd() - os.chdir(extract_dir) + os.chdir(scan_root) log.info(f"Executing Apache RAT from directory: {os.getcwd()}") @@ -549,53 +672,36 @@ def _check_java_installed() -> dict[str, Any] | None: } -def _extracted_dir(temp_dir: str) -> str | None: - # Loop through all the dirs in temp_dir - extract_dir = None - log.info(f"Checking directories in {temp_dir}: {os.listdir(temp_dir)}") - for dir_name in os.listdir(temp_dir): - if dir_name.startswith("."): - continue - dir_path = os.path.join(temp_dir, dir_name) - if not os.path.isdir(dir_path): - raise ValueError(f"Unknown file type found in temporary directory: {dir_path}") - if extract_dir is None: - extract_dir = dir_path - else: - raise ValueError(f"Multiple root directories found: {extract_dir}, {dir_path}") - return extract_dir - - -def _rat_apply_exclusions(extract_dir: str, excluded_paths: list[str], temp_dir: str) -> None: - """Apply exclusions to the extracted directory.""" - # Exclusions are difficult using the command line version of RAT - # Each line is interpreted as a literal AND a glob AND a regex - # Then, if ANY of those three match a filename, the file is excluded - # You cannot specify which syntax to use; all three are always tried - # You cannot specify that you want to match against the whole path - # Therefore, we take a different approach - # We interpret the exclusion file as a glob file in .gitignore format - # Then, we simply remove any files that match the glob - exclusion_lines = [] - for excluded_path in excluded_paths: - abs_excluded_path = os.path.join(temp_dir, excluded_path) - if not os.path.exists(abs_excluded_path): - log.error(f"Exclusion file not found: {abs_excluded_path}") - continue - if not os.path.isfile(abs_excluded_path): - log.error(f"Exclusion file is not a file: {abs_excluded_path}") +def _count_files_outside_directory(temp_dir: str, scan_root: str) -> int: + """Count regular files that exist outside the scan_root directory.""" + count = 0 + scan_root_rel = os.path.relpath(scan_root, temp_dir) + if scan_root_rel == ".": + scan_root_rel = "" + + for root, _dirs, files in os.walk(temp_dir): + rel_root = os.path.relpath(root, temp_dir) + if rel_root == ".": + rel_root = "" + + if _is_inside_directory(rel_root, scan_root_rel): continue - with open(abs_excluded_path, encoding="utf-8") as f: - exclusion_lines.extend(f.readlines()) - matcher = util.create_path_matcher( - exclusion_lines, pathlib.Path(extract_dir) / ".ignore", pathlib.Path(extract_dir) - ) - for root, _dirs, files in os.walk(extract_dir): - for file in files: - abs_path = os.path.join(root, file) - if matcher(abs_path): - log.info(f"Removing {abs_path} because it matches the exclusion") - os.remove(abs_path) + + # for filename in files: + # if not filename.startswith("."): + # count += 1 + count += len(files) + + return count + + +def _is_inside_directory(path: str, directory: str) -> bool: + """Check whether path is inside directory, or is the directory itself.""" + if directory == "": + return True + if path == directory: + return True + return path.startswith(directory + os.sep) def _summary_message(valid: bool, unapproved_licenses: int, unknown_licenses: int) -> str: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
