This is an automated email from the ASF dual-hosted git repository. cmcfarlen pushed a commit to branch 10.2.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit c3ce46c671d1e58cfc807248b138a60ac9927df6 Author: Brian Neradt <[email protected]> AuthorDate: Tue Jun 2 15:34:47 2026 -0500 Fix flaky Fedora 44 AuTest helpers (#13220) Fedora 44 CI exposed several AuTests whose helper processes stayed alive after useful assertions finished, or whose asynchronous output could be checked before it was fully produced. Those tests could report killed helpers, missing log markers, or worker-wide timeouts even when ATS itself completed the intended checks. This makes affected helpers finish after expected traffic, waits for transaction data sink output before asserting on traffic.out, relaxes the connect curl wording for newer curl, and removes dummy long-running ATS commands from plugin verification tests. This keeps AuTest responsible for test behavior instead of cleanup races and lets the Fedora 44 shards report real failures. (cherry picked from commit 1f6675a0d8aa1fed0988fbf31c18651c48e4188c) --- .../command_argument/verify_global_plugin.test.py | 12 ++-- .../command_argument/verify_remap_plugin.test.py | 12 ++-- tests/gold_tests/connect/connect.test.py | 3 +- tests/gold_tests/h2/grpc/grpc_server.py | 43 +++++++++--- tests/gold_tests/h2/trickle_server.py | 24 +++++-- tests/gold_tests/pipeline/pipeline_server.py | 7 +- .../transform/transaction_data_sink.test.py | 17 +++++ tests/gold_tests/tunnel/dumb_proxy.py | 77 +++++++++++----------- 8 files changed, 126 insertions(+), 69 deletions(-) diff --git a/tests/gold_tests/command_argument/verify_global_plugin.test.py b/tests/gold_tests/command_argument/verify_global_plugin.test.py index e9aa44ce52..05a80af7c8 100644 --- a/tests/gold_tests/command_argument/verify_global_plugin.test.py +++ b/tests/gold_tests/command_argument/verify_global_plugin.test.py @@ -39,12 +39,12 @@ def create_ts_process(): # Ideally we would set the test run's Processes.Default to ts, but deep # copy of processes is not currently implemented in autest. Therefore we # replace the command which ts runs with a dummy command, and pull in - # piecemeal the values from ts that we want into the test run. - ts.Command = "sleep 100" - # sleep will return -2 when autest kills it. We set the expectation for the - # -2 return code here so the test doesn't fail because of this. - ts.ReturnCode = -2 - # Clear the ready criteria because sleep is ready as soon as it is running. + # piecemeal the values from ts that we want into the test run. The helper + # only needs to create the ATS environment and runroot, so let it exit + # normally instead of depending on AuTest's cleanup signal behavior. + ts.Command = "true" + ts.ReturnCode = 0 + # Clear the ready criteria because the helper is ready as soon as it runs. ts.Ready = None return ts diff --git a/tests/gold_tests/command_argument/verify_remap_plugin.test.py b/tests/gold_tests/command_argument/verify_remap_plugin.test.py index 3b68147962..554029f871 100644 --- a/tests/gold_tests/command_argument/verify_remap_plugin.test.py +++ b/tests/gold_tests/command_argument/verify_remap_plugin.test.py @@ -39,12 +39,12 @@ def create_ts_process(): # Ideally we would set the test run's Processes.Default to ts, but deep # copy of processes is not currently implemented in autest. Therefore we # replace the command which ts runs with a dummy command, and pull in - # piecemeal the values from ts that we want into the test run. - ts.Command = "sleep 100" - # sleep will return -2 when autest kills it. We set the expectation for the - # -2 return code here so the test doesn't fail because of this. - ts.ReturnCode = -2 - # Clear the ready criteria because sleep is ready as soon as it is running. + # piecemeal the values from ts that we want into the test run. The helper + # only needs to create the ATS environment and runroot, so let it exit + # normally instead of depending on AuTest's cleanup signal behavior. + ts.Command = "true" + ts.ReturnCode = 0 + # Clear the ready criteria because the helper is ready as soon as it runs. ts.Ready = None return ts diff --git a/tests/gold_tests/connect/connect.test.py b/tests/gold_tests/connect/connect.test.py index a1674e4a34..37ea4db84f 100644 --- a/tests/gold_tests/connect/connect.test.py +++ b/tests/gold_tests/connect/connect.test.py @@ -88,7 +88,8 @@ logging: tr.MakeCurlCommand(f"-v --fail -s -p -x 127.0.0.1:{self.ts.Variables.port} 'http://foo.com/get'", ts=self.ts) tr.Processes.Default.Streams.stderr = "gold/connect_0_stderr.gold" tr.Processes.Default.Streams.stderr = Testers.ContainsExpression( - f'Connected to 127.0.0.1.*{self.ts.Variables.port}', 'Curl should connect through the ATS proxy port.') + rf'(Connected to|Established connection to) 127\.0\.0\.1.*{self.ts.Variables.port}', + 'Curl should connect through the ATS proxy port.') tr.Processes.Default.ReturnCode = 0 tr.Processes.Default.TimeOut = 3 self.__checkProcessAfter(tr) diff --git a/tests/gold_tests/h2/grpc/grpc_server.py b/tests/gold_tests/h2/grpc/grpc_server.py index 63dbd4f564..2a435db65f 100644 --- a/tests/gold_tests/h2/grpc/grpc_server.py +++ b/tests/gold_tests/h2/grpc/grpc_server.py @@ -27,50 +27,73 @@ import simple_pb2 import simple_pb2_grpc global_message_counter: int = 0 +MESSAGE_TIMEOUT_SECONDS = 60 class Talker(simple_pb2_grpc.TalkerServicer): """A gRPC servicer.""" - async def MakeRequest(self, request: simple_pb2.SimpleRequest, context: grpc.aio.ServicerContext): - """An example gRPC method.""" + def __init__(self, num_expected_messages: int, done_event: asyncio.Event): + self._num_expected_messages = num_expected_messages + self._done_event = done_event + + def _record_message(self) -> None: global global_message_counter + global_message_counter += 1 + if global_message_counter >= self._num_expected_messages: + asyncio.get_running_loop().call_soon(self._done_event.set) + + async def MakeRequest(self, request: simple_pb2.SimpleRequest, context: grpc.aio.ServicerContext): + """An example gRPC method.""" + self._record_message() print(f'Received request: {request.message}') response = simple_pb2.SimpleResponse(message=f"Echo: {request.message}") return response async def MakeAnotherRequest(self, request: simple_pb2.SimpleRequest, context: grpc.aio.ServicerContext): """An example gRPC method.""" - global global_message_counter - global_message_counter += 1 + self._record_message() print(f'Received another request: {request.message}') response = simple_pb2.SimpleResponse(message=f"Another echo: {request.message}") return response -async def run_grpc_server(port: int, server_cert: str, server_key: str) -> int: +async def run_grpc_server(port: int, server_cert: str, server_key: str, num_expected_messages: int) -> int: """Run the gRPC server. :param port: The port on which to listen. :param server_cert: The public TLS certificate to use. :param server_key: The private TLS key to use. + :param num_expected_messages: The number of messages expected from clients. :return: The exit code. """ credentials = grpc.ssl_server_credentials([(server_key, server_cert)]) server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10)) - simple_pb2_grpc.add_TalkerServicer_to_server(Talker(), server) + done_event = asyncio.Event() + simple_pb2_grpc.add_TalkerServicer_to_server(Talker(num_expected_messages, done_event), server) server_endpoint = f'127.0.0.1:{port}' server.add_secure_port(server_endpoint, credentials) print(f'Listening on: {server_endpoint}') + timed_out = False try: await server.start() - await server.wait_for_termination() + try: + await asyncio.wait_for(done_event.wait(), timeout=MESSAGE_TIMEOUT_SECONDS) + except asyncio.TimeoutError: + timed_out = True + print(f'Timed out waiting for {num_expected_messages} messages after {MESSAGE_TIMEOUT_SECONDS} seconds.') except asyncio.exceptions.CancelledError: print('Shutting down the server.') finally: - await server.stop(0) - return 0 + await server.stop(5) + + if not timed_out and global_message_counter == num_expected_messages: + print(f'Received {num_expected_messages} messages as expected.') + return 0 + else: + print(f'Expected {num_expected_messages} messages, but received {global_message_counter}.') + return 1 def parse_args() -> argparse.Namespace: @@ -91,7 +114,7 @@ def main() -> int: """ args = parse_args() try: - return asyncio.run(run_grpc_server(args.port, args.server_crt.read(), args.server_key.read())) + return asyncio.run(run_grpc_server(args.port, args.server_crt.read(), args.server_key.read(), args.num_expected_messages)) except KeyboardInterrupt: pass diff --git a/tests/gold_tests/h2/trickle_server.py b/tests/gold_tests/h2/trickle_server.py index 36f42bd907..2cbd36d69d 100644 --- a/tests/gold_tests/h2/trickle_server.py +++ b/tests/gold_tests/h2/trickle_server.py @@ -22,11 +22,10 @@ import logging import math import statistics import sys +import threading import time from OpenSSL.SSL import Error as SSLError from OpenSSL.SSL import SysCallError as SSLSysCallError -import threading - import eventlet from eventlet.green.OpenSSL import SSL, crypto from h2.config import H2Configuration @@ -85,6 +84,7 @@ class Http2ConnectionManager: self.listening_conn: H2Connection = H2Connection(config=listening_config) self.requests: Dict[int, RequestInfo] = {} self.completed_stream_ids: Set[int] = set() + self._sent_response = False # Delay times in ms between each data frame. self._data_delays: List[int] = [] @@ -137,13 +137,15 @@ class Http2ConnectionManager: # Clean up any responses we sent. for stream_id in responded_streams: del responses[stream_id] + if responded_streams: + self._sent_response = True def _receive_data(self, responses: Dict[int, ResponseInfo]) -> Optional[bytes]: """Receive data from the socket. :param responses: A dictionary of stream IDs to responses that have accumulated. - :return: The data received, or None if the connection for the socket has closed. + :return: The data received, or None if the socket closed or the response was sent. """ data: Optional[bytes] = None while not data: @@ -156,6 +158,8 @@ class Http2ConnectionManager: except TimeoutError: # Take time to send any responses we've generated. self._send_responses(responses) + if self._sent_response: + return None # Loop back around to receive more data. logging.debug('Timeout, waiting again for more data.') @@ -245,7 +249,7 @@ class Http2ConnectionManager: while True: data = self._receive_data(responses) if not data: - # Connection ended. + # Connection ended or the measured transaction completed. break logging.debug(f'Giving {len(data)} bytes to the h2 connection') @@ -260,6 +264,8 @@ class Http2ConnectionManager: except (SSLError, SSLSysCallError) as e: logging.debug(f'Ignoring end-loop sock.sendall exception for now: {e}') pass + if self._sent_response: + return def get_data_delays(self) -> List[int]: """Get the DATA frame timing list. @@ -268,6 +274,10 @@ class Http2ConnectionManager: """ return self._data_delays + def sent_response(self) -> bool: + """Return whether a response was sent on this connection.""" + return self._sent_response + def _process_request(self, request: RequestInfo) -> ResponseInfo: """Handle a request from a client. @@ -356,7 +366,6 @@ def run_server(listen_port, https_pem, ca_pem) -> List[int]: listening_socket = eventlet.listen(('0.0.0.0', listen_port)) listening_socket = SSL.Connection(context, listening_socket) logging.info(f"Serving HTTP/2 Proxy on 127.0.0.1:{listen_port} with pem '{https_pem}'") - pool = eventlet.GreenPool() manager = None while True: @@ -365,7 +374,10 @@ def run_server(listen_port, https_pem, ca_pem) -> List[int]: manager = Http2ConnectionManager(new_connection_socket) manager.cert_file = https_pem manager.ca_file = ca_pem - pool.spawn_n(manager.run_forever) + manager.run_forever() + data_delays = manager.get_data_delays() + if data_delays or manager.sent_response(): + return data_delays except KeyboardInterrupt as e: logging.debug("Handling KeyboardInterrupt") if manager is not None: diff --git a/tests/gold_tests/pipeline/pipeline_server.py b/tests/gold_tests/pipeline/pipeline_server.py index cf13fa5696..85214b8c08 100644 --- a/tests/gold_tests/pipeline/pipeline_server.py +++ b/tests/gold_tests/pipeline/pipeline_server.py @@ -83,7 +83,11 @@ def receive_requests(sock: socket.socket) -> None: end_of_second_request: bytes = b'12345' end_of_third_request: bytes = b'\r\n\r\n' while not received_third_request: - data = sock.recv(1024) + try: + data = sock.recv(1024) + except socket.timeout: + print("Timed out waiting for an unexpected third request.") + break if not data: print("Socket closed.") break @@ -126,6 +130,7 @@ def receive_requests(sock: socket.socket) -> None: print() time.sleep(0.01) sock.sendall(second_response_bytes) + sock.settimeout(1.0) continue elif processing_third_request: diff --git a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py index 46d71b84d9..de008facbe 100644 --- a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py +++ b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py @@ -80,6 +80,23 @@ class TransactionDataSyncTest: tr.Processes.Default.StartBefore(self.ts) tr.AddVerifierClientProcess( "client", self.replay_file, http_ports=[self.ts.Variables.port], https_ports=[self.ts.Variables.ssl_port]) + tr.StillRunningAfter = self.server + tr.StillRunningAfter = self.nameserver + tr.StillRunningAfter = self.ts + + tr = Test.AddTestRun("Wait for transaction data sink output") + timeout = 30 + watcher = tr.Processes.Process("watcher") + watcher.Command = f"sleep {timeout}" + watcher.Ready = When.FileContains(self.ts.Disk.traffic_out.Name, "http2_response_body_dumped") + watcher.TimeOut = timeout + tr.TimeOut = timeout + tr.StillRunningAfter = self.server + tr.StillRunningAfter = self.nameserver + tr.StillRunningAfter = self.ts + tr.Processes.Default.StartBefore(watcher) + tr.Processes.Default.Command = "echo await_transaction_data_sink_output" + tr.Processes.Default.ReturnCode = 0 TransactionDataSyncTest().run() diff --git a/tests/gold_tests/tunnel/dumb_proxy.py b/tests/gold_tests/tunnel/dumb_proxy.py index 6e0d152a47..36ed60a03a 100644 --- a/tests/gold_tests/tunnel/dumb_proxy.py +++ b/tests/gold_tests/tunnel/dumb_proxy.py @@ -24,9 +24,6 @@ import threading import argparse LOCAL_HOST = '127.0.0.1' -TIMEOUT = 0.5 -# Create a thread-local data object to store the number of bytes transferred. -thread_local_data = threading.local() def parse_args() -> argparse.Namespace: @@ -40,39 +37,38 @@ def parse_args() -> argparse.Namespace: return parser.parse_args() -def initialize_thread_local_data(): - thread_local_data.client_to_server_bytes = 0 - thread_local_data.server_to_client_bytes = 0 - - -def forward(source, destination, is_client_to_server): +def forward(source, destination, is_client_to_server, byte_counts): """Forward traffic from source to destination. :param source: socket to read from. :param destination: socket to write to. :param is_client_to_server: True if forwarding from client to server. + :param byte_counts: A dictionary to record bytes sent in each direction. """ - # Initialize thread-local data. - initialize_thread_local_data() + bytes_transferred = 0 - while True: - try: + try: + while True: data = source.recv(4096) if not data: break destination.sendall(data) - except Exception as e: - # Catching all exceptions. - break + bytes_transferred += len(data) + except OSError: + pass + finally: + try: + destination.shutdown(socket.SHUT_WR) + except OSError: + pass + # Forwarding done. Print the number of bytes transferred in the direction. + if bytes_transferred > 0: if is_client_to_server: - thread_local_data.client_to_server_bytes += len(data) + byte_counts["client-to-server"] = bytes_transferred + print(f"client-to-server: {bytes_transferred}", flush=True) else: - thread_local_data.server_to_client_bytes += len(data) - # Forwarding done. Print the number of bytes transferred in the direction. - if thread_local_data.client_to_server_bytes > 0: - print(f"client-to-server: {thread_local_data.client_to_server_bytes}") - elif thread_local_data.server_to_client_bytes > 0: - print(f"server-to-client: {thread_local_data.server_to_client_bytes}") + byte_counts["server-to-client"] = bytes_transferred + print(f"server-to-client: {bytes_transferred}", flush=True) def start_bidirectional_forwarding(client_socket, forwarding_port): @@ -80,44 +76,47 @@ def start_bidirectional_forwarding(client_socket, forwarding_port): :param client_socket: socket connected to the client. :param forwarding_port: server port to forward to. + :return: The number of bytes forwarded in both directions. """ CLIENT_TO_SERVER = True SERVER_TO_CLIENT = False + byte_counts = {} with client_socket, socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: - client_socket.settimeout(TIMEOUT) - server_socket.settimeout(TIMEOUT) - server_socket.connect((LOCAL_HOST, forwarding_port)) + try: + server_socket.connect((LOCAL_HOST, forwarding_port)) + except OSError: + return 0 + # Spawn a thread to forward traffic from client to server. - client_to_server = threading.Thread(target=forward, args=(client_socket, server_socket, CLIENT_TO_SERVER)) + client_to_server = threading.Thread(target=forward, args=(client_socket, server_socket, CLIENT_TO_SERVER, byte_counts)) client_to_server.start() - # Forward traffic from server to client in the current thread. - forward(server_socket, client_socket, SERVER_TO_CLIENT) + server_to_client = threading.Thread(target=forward, args=(server_socket, client_socket, SERVER_TO_CLIENT, byte_counts)) + server_to_client.start() client_to_server.join() + server_to_client.join() + return sum(byte_counts.values()) def main() -> int: """Run the proxy.""" - print(f"Starting proxy...") + print("Starting proxy...", flush=True) args = parse_args() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listen_socket: listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listen_socket.bind((LOCAL_HOST, args.listening_port)) listen_socket.listen() - print(f"Proxy listening on {LOCAL_HOST}:{args.listening_port}") + print(f"Proxy listening on {LOCAL_HOST}:{args.listening_port}", flush=True) try: while True: client_sock, client_addr = listen_socket.accept() - print(f"Accepted connection from {client_addr}") - # Handle each client connection in a new thread. - client_thread = threading.Thread(target=start_bidirectional_forwarding, args=(client_sock, args.forwarding_port)) - client_thread.start() - except Exception: - # Catching all exceptions. - pass + print(f"Accepted connection from {client_addr}", flush=True) + if start_bidirectional_forwarding(client_sock, args.forwarding_port) > 0: + break except KeyboardInterrupt: - print("Caught KeyboardInterrupt, terminating the program") + print("Caught KeyboardInterrupt, terminating the program", flush=True) return 0 + return 0 if __name__ == "__main__":
