Copilot commented on code in PR #12867:
URL: https://github.com/apache/trafficserver/pull/12867#discussion_r2779554040


##########
tests/gold_tests/autest-site/ports.py:
##########
@@ -154,6 +159,18 @@ def _setup_port_queue(amount=1000):
         # The queue has already been populated.
         host.WriteDebug('_setup_port_queue', f"Queue was previously populated. 
Queue size: {g_ports.qsize()}")
         return
+
+    # Get port offset for parallel execution support
+    try:
+        port_offset = int(os.environ.get('AUTEST_PORT_OFFSET', 0))
+    except ValueError:
+        host.WriteWarning("AUTEST_PORT_OFFSET is not a valid integer, 
defaulting to 0")
+        port_offset = 0
+    # Clamp to a safe range to avoid exceeding the valid port space
+    port_offset = max(0, min(port_offset, 60000))
+    if port_offset > 0:
+        host.WriteVerbose('_setup_port_queue', f"Using port offset: 
{port_offset}")

Review Comment:
   Clamping `AUTEST_PORT_OFFSET` to 60000 doesn’t ensure the port queue can 
still be populated. For common `ip_local_port_range` values (e.g. dmin≈32768, 
dmax≈60999), offsets >= ~30767 make `port = 2001 + offset` start above `dmin`, 
and offsets >= ~4536 make `port = dmax+1+offset` start above 65535; in both 
cases the queue stays empty and `get_port()` falls back to bind(), which 
defeats the intended collision avoidance for parallel runs. Consider validating 
the offset against the computed `dmin/dmax` and `amount` (and warning/erroring 
or adjusting) so each worker is guaranteed a non-overlapping, fillable range.



##########
tests/README.md:
##########
@@ -40,6 +40,41 @@ The corresponding `autest.sh` command is:
 
     $ ./autest.sh --filter=something_descriptive
 
+# Running tests in parallel
+
+For faster test execution, a parallel test runner is available that distributes
+tests across multiple workers. This is especially useful on machines with many
+CPU cores.
+
+    $ python3 autest-parallel.py -j 16 --ats-bin <install>/bin --build-root 
<build-dir> --sandbox /tmp/autest-parallel
+
+Key options:
+
+* `-j N` - Number of parallel workers (default: number of CPU cores)
+* `--ats-bin` - Path to the ATS install bin directory
+* `--build-root` - Path to the build directory (for test plugins)
+* `--sandbox` - Directory for test sandboxes (default: `/tmp/autest-parallel`)
+* `-v` - Verbose output with real-time test progress per worker
+* `--collect-timings` - Run tests individually to collect per-test timing data
+* `--list` - List all tests and exit (useful for checking test discovery)
+
+The parallel runner uses port offsets to ensure each worker gets a unique port
+range, preventing conflicts between concurrent test instances. Tests known to
+require serial execution (listed in `serial_tests.txt`) are run sequentially
+after the parallel phase completes.
+
+## Timing-based load balancing
+
+If a `test-timings.json` file exists (generated by a previous run with
+`--collect-timings`), the runner uses the Longest Processing Time (LPT)
+algorithm to distribute tests across workers for balanced execution times.
+Without timing data, tests are distributed round-robin.
+
+## Adding serial tests
+
+If a test cannot run in parallel (e.g., it uses hardcoded global resources),
+add its path relative to `gold_tests/` to `serial_tests.txt`.

Review Comment:
   The new docs say to add a test’s path relative to `gold_tests/` into 
`serial_tests.txt`, but the current implementation in `autest-parallel.py` 
strips entries down to a basename when matching. Either update the 
implementation to honor relative paths (preferred, avoids ambiguity), or adjust 
this documentation to reflect the actual matching behavior.
   ```suggestion
   add its test file name (basename, e.g., `something_descriptive.test.py`) to 
`serial_tests.txt` (matching is done by basename, not by path).
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:

Review Comment:
   `discover_tests()` reduces each test to only its basename (e.g. 
`thread_config`), and `load_serial_tests()` strips paths down to the same 
basename. This contradicts `serial_tests.txt` / README which specify paths 
relative to `gold_tests/`, and it makes it impossible to disambiguate tests 
with the same filename in different subdirs (also causes sandbox name 
collisions). Consider representing tests as their relative path from 
`gold_tests/` without the `.test.py` suffix, and match serial tests using that 
same canonical identifier.



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Ignore malformed "Passed" count; keep default value of 0.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Failed" count; keep default value of 0.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Skipped" count; keep default value of 0.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Warning" count; keep default value of 0.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Exception" count; keep default value of 
0.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Unknown" count; keep default value of 0.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Ignore malformed "Passed" count; retain the default value.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Failed" count; retain the default value.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Skipped" count; retain the default value.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Warning" count; retain the default value.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Exception" count; retain the default 
value.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore malformed "Unknown" count; retain the default value.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Ignore lines where the summary count value is not a valid 
integer.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the summary count value is not a valid 
integer.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the summary count value is not a valid 
integer.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the summary count value is not a valid 
integer.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the summary count value is not a valid 
integer.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore lines where the summary count value is not a valid 
integer.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+
+    # Extract failed test names
+    failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE)
+    for match in failed_pattern.finditer(clean_output):
+        result['failed_tests'].append(match.group(1))
+
+    return result
+
+
+def run_single_test(test: str, script_dir: Path, sandbox: Path, ats_bin: str, 
build_root: str, extra_args: List[str],
+                    env: dict) -> Tuple[str, float, str, str]:
+    """
+    Run a single test and return its timing.
+
+    Returns:
+        Tuple of (test_name, duration, status, output)
+        status is one of: "PASS", "FAIL", "SKIP"
+    """
+    cmd = [
+        'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', 
'--ats-bin', ats_bin, '--build-root', build_root, '--sandbox',
+        str(sandbox / test), '--filters', test

Review Comment:
   The autest invocation constructed here doesn’t match how this repo runs 
AuTest elsewhere. The CMake autest target runs `uv run autest --directory 
<gold_tests> ...` (no `autest run` subcommand), and the docs refer to 
`--filter`, not `--filters`. As written, `uv run autest run ... --filters ...` 
is very likely to fail under the pinned `autest==1.10.4`. Update the command 
construction to mirror `tests/CMakeLists.txt` / `tests/autest.sh` (and pass 
per-test filters using the supported `--filter` flag).
   ```suggestion
           'uv', 'run', 'autest', '--directory', 'gold_tests', '--ats-bin', 
ats_bin, '--build-root', build_root, '--sandbox',
           str(sandbox / test), '--filter', test
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+
+    # Extract failed test names
+    failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE)
+    for match in failed_pattern.finditer(clean_output):
+        result['failed_tests'].append(match.group(1))
+
+    return result
+
+
+def run_single_test(test: str, script_dir: Path, sandbox: Path, ats_bin: str, 
build_root: str, extra_args: List[str],
+                    env: dict) -> Tuple[str, float, str, str]:
+    """
+    Run a single test and return its timing.
+
+    Returns:
+        Tuple of (test_name, duration, status, output)
+        status is one of: "PASS", "FAIL", "SKIP"
+    """
+    cmd = [
+        'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', 
'--ats-bin', ats_bin, '--build-root', build_root, '--sandbox',
+        str(sandbox / test), '--filters', test
+    ]
+    cmd.extend(extra_args)
+
+    start = time.time()
+    try:
+        proc = subprocess.run(
+            cmd,
+            cwd=script_dir,
+            capture_output=True,
+            text=True,
+            env=env,
+            timeout=600  # 10 minute timeout per test
+        )
+        duration = time.time() - start
+        output = proc.stdout + proc.stderr
+        parsed = parse_autest_output(output)
+        # Determine status:
+        # - SKIP: test was skipped (missing dependency, unsupported feature)
+        # - PASS: test ran and passed
+        # - FAIL: test failed, had exceptions, or nothing ran at all
+        if parsed['skipped'] > 0 and parsed['passed'] == 0 and 
parsed['failed'] == 0:
+            status = "SKIP"
+        elif (parsed['failed'] == 0 and parsed['exceptions'] == 0 and 
proc.returncode == 0 and
+              (parsed['passed'] > 0 or parsed['skipped'] > 0)):
+            status = "PASS"
+        else:
+            status = "FAIL"
+        return (test, duration, status, output)
+    except subprocess.TimeoutExpired:
+        return (test, 600.0, "FAIL", "TIMEOUT")
+    except Exception as e:
+        return (test, time.time() - start, "FAIL", str(e))
+
+
+def run_worker(
+        worker_id: int,
+        tests: List[str],
+        script_dir: Path,
+        sandbox_base: Path,
+        ats_bin: str,
+        build_root: str,
+        extra_args: List[str],
+        port_offset_step: int = 1000,
+        verbose: bool = False,
+        collect_timings: bool = False) -> TestResult:
+    """
+    Run autest on a subset of tests with isolated sandbox and port range.
+
+    Args:
+        worker_id: Worker identifier (0, 1, 2, ...)
+        tests: List of test names to run
+        script_dir: Directory containing autest.sh
+        sandbox_base: Base sandbox directory
+        ats_bin: Path to ATS bin directory
+        build_root: Path to the build directory (for test plugins etc.)
+        extra_args: Additional arguments to pass to autest
+        port_offset_step: Port offset between workers
+        verbose: Whether to print verbose output
+        collect_timings: If True, run tests one at a time to collect accurate 
timing
+
+    Returns:
+        TestResult with pass/fail counts and per-test timings
+    """
+    start_time = time.time()
+    result = TestResult(worker_id=worker_id, tests=tests)
+
+    # Create worker-specific sandbox
+    sandbox = sandbox_base / f"worker-{worker_id}"
+    sandbox.mkdir(parents=True, exist_ok=True)
+
+    # Calculate port offset for this worker
+    port_offset = worker_id * port_offset_step
+
+    # Set up environment with port offset
+    env = os.environ.copy()
+    env['AUTEST_PORT_OFFSET'] = str(port_offset)
+

Review Comment:
   This runner doesn’t replicate the environment setup done by 
`tests/autest.sh` / the CMake `autest` target (e.g., setting `PYTHONPATH` to 
include `gold_tests/remap`, clearing proxy env vars, and ensuring Proxy 
Verifier is installed). Without that, parallel runs can behave differently or 
fail even when `./autest.sh` works. Consider invoking the generated `autest.sh` 
script per worker (or duplicating the same env setup in Python) so the parallel 
runner stays consistent with the supported workflow.



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Malformed 'Passed' summary line; ignore and keep default 
count.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Malformed 'Failed' summary line; ignore and keep default 
count.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Malformed 'Skipped' summary line; ignore and keep default 
count.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Malformed 'Warning' summary line; ignore and keep default 
count.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Malformed 'Exception' summary line; ignore and keep 
default count.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Malformed 'Unknown' summary line; ignore and keep default 
count.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # If parsing fails, keep the default count value.
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If parsing fails, keep the default count value.
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If parsing fails, keep the default count value.
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If parsing fails, keep the default count value.
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If parsing fails, keep the default count value.
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # If parsing fails, keep the default count value.
   ```



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+
+    # Extract failed test names
+    failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE)
+    for match in failed_pattern.finditer(clean_output):
+        result['failed_tests'].append(match.group(1))
+
+    return result
+
+
+def run_single_test(test: str, script_dir: Path, sandbox: Path, ats_bin: str, 
build_root: str, extra_args: List[str],
+                    env: dict) -> Tuple[str, float, str, str]:
+    """
+    Run a single test and return its timing.
+
+    Returns:
+        Tuple of (test_name, duration, status, output)
+        status is one of: "PASS", "FAIL", "SKIP"
+    """
+    cmd = [
+        'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', 
'--ats-bin', ats_bin, '--build-root', build_root, '--sandbox',
+        str(sandbox / test), '--filters', test
+    ]
+    cmd.extend(extra_args)
+
+    start = time.time()
+    try:
+        proc = subprocess.run(
+            cmd,
+            cwd=script_dir,
+            capture_output=True,
+            text=True,
+            env=env,
+            timeout=600  # 10 minute timeout per test
+        )
+        duration = time.time() - start
+        output = proc.stdout + proc.stderr
+        parsed = parse_autest_output(output)
+        # Determine status:
+        # - SKIP: test was skipped (missing dependency, unsupported feature)
+        # - PASS: test ran and passed
+        # - FAIL: test failed, had exceptions, or nothing ran at all
+        if parsed['skipped'] > 0 and parsed['passed'] == 0 and 
parsed['failed'] == 0:
+            status = "SKIP"
+        elif (parsed['failed'] == 0 and parsed['exceptions'] == 0 and 
proc.returncode == 0 and
+              (parsed['passed'] > 0 or parsed['skipped'] > 0)):
+            status = "PASS"
+        else:
+            status = "FAIL"
+        return (test, duration, status, output)
+    except subprocess.TimeoutExpired:
+        return (test, 600.0, "FAIL", "TIMEOUT")
+    except Exception as e:
+        return (test, time.time() - start, "FAIL", str(e))
+
+
+def run_worker(
+        worker_id: int,
+        tests: List[str],
+        script_dir: Path,
+        sandbox_base: Path,
+        ats_bin: str,
+        build_root: str,
+        extra_args: List[str],
+        port_offset_step: int = 1000,
+        verbose: bool = False,
+        collect_timings: bool = False) -> TestResult:
+    """
+    Run autest on a subset of tests with isolated sandbox and port range.
+
+    Args:
+        worker_id: Worker identifier (0, 1, 2, ...)
+        tests: List of test names to run
+        script_dir: Directory containing autest.sh
+        sandbox_base: Base sandbox directory
+        ats_bin: Path to ATS bin directory
+        build_root: Path to the build directory (for test plugins etc.)
+        extra_args: Additional arguments to pass to autest
+        port_offset_step: Port offset between workers
+        verbose: Whether to print verbose output
+        collect_timings: If True, run tests one at a time to collect accurate 
timing
+
+    Returns:
+        TestResult with pass/fail counts and per-test timings
+    """
+    start_time = time.time()
+    result = TestResult(worker_id=worker_id, tests=tests)
+
+    # Create worker-specific sandbox
+    sandbox = sandbox_base / f"worker-{worker_id}"
+    sandbox.mkdir(parents=True, exist_ok=True)
+
+    # Calculate port offset for this worker
+    port_offset = worker_id * port_offset_step
+
+    # Set up environment with port offset
+    env = os.environ.copy()
+    env['AUTEST_PORT_OFFSET'] = str(port_offset)
+
+    if collect_timings:
+        # Run tests one at a time to collect accurate timing
+        all_output = []
+        total_tests = len(tests)
+        for idx, test in enumerate(tests, 1):
+            test_name, duration, status, output = run_single_test(test, 
script_dir, sandbox, ats_bin, build_root, extra_args, env)
+            result.test_timings[test_name] = duration
+            all_output.append(output)
+
+            if status == "PASS":
+                result.passed += 1
+            elif status == "SKIP":
+                result.skipped += 1
+            else:
+                result.failed += 1
+                result.failed_tests.append(test_name)
+
+            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            # Fixed-width format: date time status duration worker progress 
test_name
+            print(f"{timestamp} {status:4s} {duration:6.1f}s 
Worker:{worker_id:2d} {idx:2d}/{total_tests:2d} {test}", flush=True)
+
+        result.output = "\n".join(all_output)
+        result.return_code = 0 if result.failed == 0 else 1
+    else:
+        # Run all tests in batch (faster but no per-test timing)
+        cmd = [
+            'uv',
+            'run',
+            'autest',
+            'run',
+            '--directory',
+            'gold_tests',
+            '--ats-bin',
+            ats_bin,
+            '--build-root',
+            build_root,
+            '--sandbox',
+            str(sandbox),
+        ]
+
+        # Add test filters
+        cmd.append('--filters')
+        cmd.extend(tests)
+
+        # Add any extra arguments
+        cmd.extend(extra_args)
+
+        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        print(f"{timestamp} Worker:{worker_id:2d} Starting batch of 
{len(tests)} tests (port offset {port_offset})", flush=True)
+        if verbose:
+            print(
+                f"             Worker:{worker_id:2d} Tests: {', 
'.join(tests[:5])}"
+                f"{'...' if len(tests) > 5 else ''}",
+                flush=True)
+
+        try:
+            if verbose:
+                # Stream output in real-time so the user sees test progress.
+                # We use Popen + line-by-line read so partial results are 
visible
+                # even if the overall run takes a long time.
+                proc = subprocess.Popen(
+                    cmd,
+                    cwd=script_dir,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.STDOUT,
+                    text=True,
+                    env=env,
+                )

Review Comment:
   In `-v/--verbose` mode the worker uses `subprocess.Popen` and then blocks 
reading `proc.stdout` without any overall timeout. Unlike the non-verbose path 
(which uses `subprocess.run(..., timeout=3600)`), a hung autest process can 
stall forever. Add a timeout/poll loop (and terminate the process on timeout) 
so verbose mode has the same 1-hour safety bound.



##########
tests/autest-parallel.py:
##########
@@ -0,0 +1,950 @@
+#!/usr/bin/env python3
+'''
+Parallel autest runner for Apache Traffic Server.
+
+This script runs autest tests in parallel by spawning multiple autest 
processes,
+each with a different port offset to avoid port conflicts.
+
+Usage:
+    ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel
+    ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb
+    ./autest-parallel.py --list  # Just list tests without running
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import fnmatch
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+# Default timing file location
+DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json"
+# Default serial tests file location
+DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt"
+# Default estimate for unknown tests (seconds)
+DEFAULT_TEST_TIME = 15.0
+
+
+@dataclass
+class TestResult:
+    """Result from running a single autest process."""
+    worker_id: int
+    tests: List[str]
+    passed: int = 0
+    failed: int = 0
+    skipped: int = 0
+    warnings: int = 0
+    exceptions: int = 0
+    unknown: int = 0
+    duration: float = 0.0
+    failed_tests: List[str] = field(default_factory=list)
+    test_timings: Dict[str, float] = field(default_factory=dict)
+    output: str = ""
+    return_code: int = 0
+    is_serial: bool = False
+
+
+def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = 
None) -> List[str]:
+    """
+    Discover all .test.py files in the test directory.
+
+    Args:
+        test_dir: Path to gold_tests directory
+        filter_patterns: Optional list of glob patterns to filter tests
+
+    Returns:
+        List of test names (without .test.py extension)
+    """
+    tests = []
+    for test_file in test_dir.rglob("*.test.py"):
+        # Extract test name (filename without .test.py)
+        test_name = test_file.stem.replace('.test', '')
+
+        # Apply filters if provided
+        if filter_patterns:
+            if any(fnmatch.fnmatch(test_name, pattern) for pattern in 
filter_patterns):
+                tests.append(test_name)
+        else:
+            tests.append(test_name)
+
+    return sorted(tests)
+
+
+def load_serial_tests(serial_file: Path) -> set:
+    """
+    Load list of tests that must run serially from a file.
+
+    The file format is one test name per line, with # for comments.
+    Test names can be full paths like ``subdir/test_name.test.py``.
+    The .test.py extension is stripped, and only the basename (stem) is
+    used for matching against discovered test names.
+
+    Returns:
+        Set of test base names that must run serially
+    """
+    serial_tests = set()
+    if not serial_file.exists():
+        return serial_tests
+
+    try:
+        with open(serial_file) as f:
+            for line in f:
+                line = line.strip()
+                # Skip empty lines and comments
+                if not line or line.startswith('#'):
+                    continue
+                # Remove .test.py extension if present
+                if line.endswith('.test.py'):
+                    line = line[:-8]  # Remove .test.py
+                # Extract just the test name from path
+                test_name = Path(line).stem.replace('.test', '')
+                serial_tests.add(test_name)
+    except IOError:
+        pass  # File is optional; missing file means no serial tests
+
+    return serial_tests
+
+
+def load_timings(timing_file: Path) -> Dict[str, float]:
+    """Load test timing data from JSON file."""
+    if timing_file.exists():
+        try:
+            with open(timing_file) as f:
+                return json.load(f)
+        except (json.JSONDecodeError, IOError):
+            pass  # Timing data is optional; fall back to equal partitioning
+    return {}
+
+
+def save_timings(timing_file: Path, timings: Dict[str, float]):
+    """Save test timing data to JSON file."""
+    try:
+        with open(timing_file, 'w') as f:
+            json.dump(timings, f, indent=2, sort_keys=True)
+    except IOError as e:
+        print(f"Warning: Could not save timings: {e}", file=sys.stderr)
+
+
+def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]:
+    """
+    Partition tests into roughly equal groups for parallel execution.
+    Simple round-robin partitioning (used when no timing data available).
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+
+    Returns:
+        List of test lists, one per worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    partitions = [[] for _ in range(min(num_jobs, len(tests)))]
+    for i, test in enumerate(tests):
+        partitions[i % len(partitions)].append(test)
+
+    return [p for p in partitions if p]  # Remove empty partitions
+
+
+def partition_tests_by_time(tests: List[str], num_jobs: int, timings: 
Dict[str, float]) -> Tuple[List[List[str]], List[float]]:
+    """
+    Partition tests using LPT (Longest Processing Time first) algorithm.
+    This balances the load across workers based on expected test duration.
+
+    Args:
+        tests: List of test names
+        num_jobs: Number of parallel workers
+        timings: Dictionary of test name -> expected duration in seconds
+
+    Returns:
+        Tuple of (partitions, expected_durations) where:
+        - partitions: List of test lists, one per worker
+        - expected_durations: Expected total duration for each worker
+    """
+    if num_jobs <= 0:
+        num_jobs = 1
+
+    num_workers = min(num_jobs, len(tests))
+
+    # Get timing for each test, use default for unknown
+    test_times = []
+    unknown_tests = []
+    for test in tests:
+        if test in timings:
+            test_times.append((test, timings[test]))
+        else:
+            unknown_tests.append(test)
+
+    # Sort known tests by time (longest first) for LPT algorithm
+    test_times.sort(key=lambda x: x[1], reverse=True)
+
+    # Initialize workers
+    partitions = [[] for _ in range(num_workers)]
+    worker_loads = [0.0] * num_workers
+
+    # Assign known tests using LPT: assign to worker with least load
+    for test, duration in test_times:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += duration
+
+    # Distribute unknown tests evenly across workers with least load
+    # Sort unknown tests and distribute them one at a time to balance
+    for test in unknown_tests:
+        min_worker = min(range(num_workers), key=lambda w: worker_loads[w])
+        partitions[min_worker].append(test)
+        worker_loads[min_worker] += DEFAULT_TEST_TIME
+
+    return partitions, worker_loads
+
+
+def strip_ansi(text: str) -> str:
+    """Remove ANSI escape codes from text."""
+    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+    return ansi_escape.sub('', text)
+
+
+def parse_autest_output(output: str) -> dict:
+    """
+    Parse autest output to extract pass/fail counts and per-test timings.
+
+    Args:
+        output: Raw autest output string
+
+    Returns:
+        Dictionary with counts for passed, failed, skipped, etc. and 
test_timings
+    """
+    result = {
+        'passed': 0,
+        'failed': 0,
+        'skipped': 0,
+        'warnings': 0,
+        'exceptions': 0,
+        'unknown': 0,
+        'failed_tests': [],
+        'test_timings': {}
+    }
+
+    # Strip ANSI codes for easier parsing
+    clean_output = strip_ansi(output)
+    lines = clean_output.split('\n')
+
+    # First pass: find test results and their line positions
+    test_results = []  # (line_num, test_name, result)
+    for i, line in enumerate(lines):
+        line_stripped = line.strip()
+
+        # Match "Running Test: test_name" or "Test: test_name: Passed/Failed"
+        running_match = re.match(r'Running Test:\s+(\S+)', line_stripped)
+        result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', 
line_stripped, re.IGNORECASE)
+
+        if running_match:
+            test_results.append((i, running_match.group(1), 'start'))
+        elif result_match:
+            test_results.append((i, result_match.group(1), 
result_match.group(2).lower()))
+
+    # Calculate per-test timing based on line positions
+    # (rough approximation - actual timing would be better from autest if 
available)
+    for i, (line_num, test_name, status) in enumerate(test_results):
+        if status == 'start':
+            # Find the corresponding end
+            for j in range(i + 1, len(test_results)):
+                end_line, end_name, end_status = test_results[j]
+                if end_name == test_name and end_status != 'start':
+                    # We don't have actual time, but we'll track it from the 
worker
+                    break
+
+    # Parse the summary section
+    for line in lines:
+        line = line.strip()
+        if 'Passed:' in line:
+            try:
+                result['passed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Failed:' in line and 'test' not in line.lower():
+            try:
+                result['failed'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Skipped:' in line:
+            try:
+                result['skipped'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Warning:' in line:
+            try:
+                result['warnings'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Exception:' in line:
+            try:
+                result['exceptions'] = int(line.split(':')[-1].strip())
+            except ValueError:
+                pass
+        elif 'Unknown:' in line:
+            try:
+                result['unknown'] = int(line.split(':')[-1].strip())
+            except ValueError:

Review Comment:
   'except' clause does nothing but pass and there is no explanatory comment.
   ```suggestion
               except ValueError:
                   # Ignore parse errors; keep default count of 0
                   pass
           elif 'Failed:' in line and 'test' not in line.lower():
               try:
                   result['failed'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore parse errors; keep default count of 0
                   pass
           elif 'Skipped:' in line:
               try:
                   result['skipped'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore parse errors; keep default count of 0
                   pass
           elif 'Warning:' in line:
               try:
                   result['warnings'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore parse errors; keep default count of 0
                   pass
           elif 'Exception:' in line:
               try:
                   result['exceptions'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore parse errors; keep default count of 0
                   pass
           elif 'Unknown:' in line:
               try:
                   result['unknown'] = int(line.split(':')[-1].strip())
               except ValueError:
                   # Ignore parse errors; keep default count of 0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to