Copilot commented on code in PR #12867:
URL: https://github.com/apache/trafficserver/pull/12867#discussion_r2779503778


##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None

Review Comment:
   Variable test_start_line is not used.
   ```suggestion
   
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running

Review Comment:
   The usage header says `./autest-parallel.py --list` works standalone, but 
`--ats-bin` is marked required, so `--list` will currently error unless callers 
also pass `--ats-bin`. Consider making `--ats-bin` optional when `--list` is 
set, or update the usage text/docs to reflect the requirement.
   ```suggestion
       ./autest-parallel.py --list --ats-bin /path/to/ats/bin  # Just list 
tests without running
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
       except IOError:
           # If the serial tests file cannot be read, treat it as if no serial 
tests were specified.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # If the passed count is malformed, ignore it and leave the 
default value.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the failed count is malformed, ignore it and leave the 
default value.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the skipped count is malformed, ignore it and leave the 
default value.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the warnings count is malformed, ignore it and leave 
the default value.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the exceptions count is malformed, ignore it and leave 
the default value.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the unknown count is malformed, ignore it and leave the 
default value.
   ```



##########
tests/gold_tests/autest-site/ports.py:
##########
@@ -154,6 +159,12 @@ def _setup_port_queue(amount=1000):
         # The queue has already been populated.
         host.WriteDebug('_setup_port_queue', f"Queue was previously populated. 
Queue size: {g_ports.qsize()}")
         return
+
+    # Get port offset for parallel execution support
+    port_offset = int(os.environ.get('AUTEST_PORT_OFFSET', 0))
+    if port_offset > 0:
+        host.WriteVerbose('_setup_port_queue', f"Using port offset: 
{port_offset}")

Review Comment:
   `AUTEST_PORT_OFFSET` is parsed with `int(...)` without validation; a 
non-integer value will raise `ValueError` and abort port selection. Also, large 
offsets can cause both fill loops to add zero ports (starting beyond 65535 / 
beyond `dmin`), leading to an empty queue and more bind-collisions. Consider 
handling parse errors (default to 0 with warning) and validating/clamping 
offsets to a safe range.



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)

Review Comment:
   The docstring says serial test entries can be simple names that “match any 
test containing this” and can be full paths, but the implementation normalizes 
everything to a single basename and then does exact set membership. Either 
update the docs to reflect exact-basename matching, or implement the documented 
behavior (substring/path-aware matching) so `serial_tests.txt` entries are 
unambiguous.



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None

Review Comment:
   Variable current_test is not used.
   ```suggestion
   
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+
+    # Extract failed test names
+    failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE)
+    for match in failed_pattern.finditer(clean_output):
+        result['failed_tests'].append(match.group(1))
+
+    return result
+
+
+def run_single_test(test: str, script_dir: Path, sandbox: Path, ats_bin: str, 
build_root: str, extra_args: List[str],
+                    env: dict) -> Tuple[str, float, str, str]:
+    """
+    Run a single test and return its timing.
+
+    Returns:
+        Tuple of (test_name, duration, status, output)
+        status is one of: "PASS", "FAIL", "SKIP"
+    """
+    cmd = [
+        'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', 
'--ats-bin', ats_bin, '--build-root', build_root, '--sandbox',
+        str(sandbox / test), '--filters', test
+    ]
+    cmd.extend(extra_args)
+
+    start = time.time()
+    try:
+        proc = subprocess.run(
+            cmd,
+            cwd=script_dir,
+            capture_output=True,
+            text=True,
+            env=env,
+            timeout=600  # 10 minute timeout per test
+        )
+        duration = time.time() - start
+        output = proc.stdout + proc.stderr
+        parsed = parse_autest_output(output)
+        # Determine status:
+        # - SKIP: test was skipped (missing dependency, unsupported feature)
+        # - PASS: test ran and passed
+        # - FAIL: test failed, had exceptions, or nothing ran at all
+        if parsed['skipped'] > 0 and parsed['passed'] == 0 and 
parsed['failed'] == 0:
+            status = "SKIP"
+        elif (parsed['failed'] == 0 and parsed['exceptions'] == 0 and 
proc.returncode == 0 and
+              (parsed['passed'] > 0 or parsed['skipped'] > 0)):
+            status = "PASS"
+        else:
+            status = "FAIL"
+        return (test, duration, status, output)
+    except subprocess.TimeoutExpired:
+        return (test, 600.0, "FAIL", "TIMEOUT")
+    except Exception as e:
+        return (test, time.time() - start, "FAIL", str(e))
+
+
+def run_worker(
+        worker_id: int,
+        tests: List[str],
+        script_dir: Path,
+        sandbox_base: Path,
+        ats_bin: str,
+        build_root: str,
+        extra_args: List[str],
+        port_offset_step: int = 1000,
+        verbose: bool = False,
+        collect_timings: bool = False) -> TestResult:
+    """
+    Run autest on a subset of tests with isolated sandbox and port range.
+
+    Args:
+        worker_id: Worker identifier (0, 1, 2, ...)
+        tests: List of test names to run
+        script_dir: Directory containing autest.sh
+        sandbox_base: Base sandbox directory
+        ats_bin: Path to ATS bin directory
+        build_root: Path to the build directory (for test plugins etc.)
+        extra_args: Additional arguments to pass to autest
+        port_offset_step: Port offset between workers
+        verbose: Whether to print verbose output
+        collect_timings: If True, run tests one at a time to collect accurate 
timing
+
+    Returns:
+        TestResult with pass/fail counts and per-test timings
+    """
+    start_time = time.time()
+    result = TestResult(worker_id=worker_id, tests=tests)
+
+    # Create worker-specific sandbox
+    sandbox = sandbox_base / f"worker-{worker_id}"
+    sandbox.mkdir(parents=True, exist_ok=True)
+
+    # Calculate port offset for this worker
+    port_offset = worker_id * port_offset_step
+
+    # Set up environment with port offset
+    env = os.environ.copy()
+    env['AUTEST_PORT_OFFSET'] = str(port_offset)
+
+    if collect_timings:
+        # Run tests one at a time to collect accurate timing
+        all_output = []
+        total_tests = len(tests)
+        for idx, test in enumerate(tests, 1):
+            test_name, duration, status, output = run_single_test(test, 
script_dir, sandbox, ats_bin, build_root, extra_args, env)
+            result.test_timings[test_name] = duration
+            all_output.append(output)
+
+            if status == "PASS":
+                result.passed += 1
+            elif status == "SKIP":
+                result.skipped += 1
+            else:
+                result.failed += 1
+                result.failed_tests.append(test_name)
+
+            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            # Fixed-width format: date time status duration worker progress 
test_name
+            print(f"{timestamp} {status:4s} {duration:6.1f}s 
Worker:{worker_id:2d} {idx:2d}/{total_tests:2d} {test}", flush=True)
+
+        result.output = "\n".join(all_output)
+        result.return_code = 0 if result.failed == 0 else 1
+    else:
+        # Run all tests in batch (faster but no per-test timing)
+        cmd = [
+            'uv',
+            'run',
+            'autest',
+            'run',
+            '--directory',
+            'gold_tests',
+            '--ats-bin',
+            ats_bin,
+            '--build-root',
+            build_root,
+            '--sandbox',
+            str(sandbox),
+        ]
+
+        # Add test filters
+        cmd.append('--filters')
+        cmd.extend(tests)
+
+        # Add any extra arguments
+        cmd.extend(extra_args)
+
+        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        print(f"{timestamp} Worker:{worker_id:2d} Starting batch of 
{len(tests)} tests (port offset {port_offset})", flush=True)
+        if verbose:
+            print(
+                f"             Worker:{worker_id:2d} Tests: {', 
'.join(tests[:5])}"
+                f"{'...' if len(tests) > 5 else ''}",
+                flush=True)
+
+        try:
+            if verbose:
+                # Stream output in real-time so the user sees test progress
+                proc = subprocess.Popen(
+                    cmd,
+                    cwd=script_dir,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.STDOUT,
+                    text=True,
+                    env=env,
+                )
+                output_lines = []
+                for line in proc.stdout:
+                    output_lines.append(line)
+                    # Print lines that show test progress
+                    clean = strip_ansi(line).strip()
+                    if clean.startswith('Running Test') or 'Passed' in clean 
or 'Failed' in clean:
+                        if clean.startswith('Running Test'):
+                            ts = datetime.now().strftime("%H:%M:%S")
+                            print(f"  [{ts}] Worker:{worker_id:2d} {clean}", 
flush=True)
+                proc.wait(timeout=3600)

Review Comment:
   In verbose mode, the read loop `for line in proc.stdout:` can block 
indefinitely if the child hangs without producing more output; the subsequent 
`proc.wait(timeout=3600)` won’t be reached, so the timeout is ineffective. Use 
a non-blocking read strategy and enforce the timeout (e.g., 
`communicate(timeout=...)` + terminate/kill on timeout) to prevent the parallel 
runner from hanging forever.



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # If the summary line does not end with an integer count, 
ignore it
                   # and leave the default value in 'result' unchanged.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the summary line does not end with an integer count, 
ignore it
                   # and leave the default value in 'result' unchanged.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the summary line does not end with an integer count, 
ignore it
                   # and leave the default value in 'result' unchanged.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the summary line does not end with an integer count, 
ignore it
                   # and leave the default value in 'result' unchanged.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the summary line does not end with an integer count, 
ignore it
                   # and leave the default value in 'result' unchanged.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the summary line does not end with an integer count, 
ignore it
                   # and leave the default value in 'result' unchanged.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Ignore lines where the passed count is not a valid 
integer; leave default value.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the failed count is not a valid 
integer; leave default value.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the skipped count is not a valid 
integer; leave default value.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the warning count is not a valid 
integer; leave default value.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the exception count is not a valid 
integer; leave default value.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the unknown count is not a valid 
integer; leave default value.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+
+    # Extract failed test names
+    failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE)
+    for match in failed_pattern.finditer(clean_output):
+        result['failed_tests'].append(match.group(1))
+
+    return result
+
+
+def run_single_test(test: str, script_dir: Path, sandbox: Path, ats_bin: str, 
build_root: str, extra_args: List[str],
+                    env: dict) -> Tuple[str, float, str, str]:
+    """
+    Run a single test and return its timing.
+
+    Returns:
+        Tuple of (test_name, duration, status, output)
+        status is one of: "PASS", "FAIL", "SKIP"
+    """
+    cmd = [
+        'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', 
'--ats-bin', ats_bin, '--build-root', build_root, '--sandbox',
+        str(sandbox / test), '--filters', test

Review Comment:
   This constructs the autest command as `uv run autest run --directory ... 
--filters ...`. In this repo the supported entrypoint is `tests/autest.sh`, 
which executes `uv run env ... autest -D gold_tests ...` and documents 
`--filter=...` (singular). Using a different CLI shape/flags here is likely to 
break with the pinned `autest==1.10.4`. Reuse the same invocation as 
`autest.sh` (or exec `./autest.sh` directly) so arguments/env behavior stays 
compatible.
   ```suggestion
           './autest.sh',
           f'--filter={test}',
           '--sandbox', str(sandbox / test),
           '--ats-bin', ats_bin,
           '--build-root', build_root,
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # If the "Passed" count cannot be parsed, leave the default 
value.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the "Failed" count cannot be parsed, leave the default 
value.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the "Skipped" count cannot be parsed, leave the default 
value.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the "Warning" count cannot be parsed, leave the default 
value.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the "Exception" count cannot be parsed, leave the 
default value.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If the "Unknown" count cannot be parsed, leave the default 
value.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Ignore malformed "Passed" summary lines; keep default 
count.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Failed" summary lines; keep default 
count.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Skipped" summary lines; keep default 
count.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Warning" summary lines; keep default 
count.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Exception" summary lines; keep default 
count.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Unknown" summary lines; keep default 
count.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,947 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be:
+    - Simple names: test_name (matches any test containing this)
+    - Full paths: subdir/test_name.test.py
+
+    Returns:
+        Set of test names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # Track test start times to calculate duration
+    # Autest output format:
+    #   Running Test: test_name
+    #   ... test output ...
+    #   Test: test_name: Passed/Failed
+    current_test = None
+    test_start_line = None
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Ignore malformed "Passed" count; keep default value.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Failed" count; keep default value.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Skipped" count; keep default value.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Warning" count; keep default value.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Exception" count; keep default value.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Unknown" count; keep default value.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to