This is an automated email from the ASF dual-hosted git repository.
bneradt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 1f6675a0d8 Fix flaky Fedora 44 AuTest helpers (#13220)
1f6675a0d8 is described below
commit 1f6675a0d8aa1fed0988fbf31c18651c48e4188c
Author: Brian Neradt <[email protected]>
AuthorDate: Tue Jun 2 15:34:47 2026 -0500
Fix flaky Fedora 44 AuTest helpers (#13220)
Fedora 44 CI exposed several AuTests whose helper processes stayed
alive after useful assertions finished, or whose asynchronous output
could be checked before it was fully produced. Those tests could report
killed helpers, missing log markers, or worker-wide timeouts even when
ATS itself completed the intended checks.
This makes affected helpers finish after expected traffic, waits for
transaction data sink output before asserting on traffic.out, relaxes
the connect curl wording for newer curl, and removes dummy long-running
ATS commands from plugin verification tests. This keeps AuTest
responsible for test behavior instead of cleanup races and lets the
Fedora 44 shards report real failures.
---
.../command_argument/verify_global_plugin.test.py | 12 ++--
.../command_argument/verify_remap_plugin.test.py | 12 ++--
tests/gold_tests/connect/connect.test.py | 3 +-
tests/gold_tests/h2/grpc/grpc_server.py | 43 +++++++++---
tests/gold_tests/h2/trickle_server.py | 24 +++++--
tests/gold_tests/pipeline/pipeline_server.py | 7 +-
.../transform/transaction_data_sink.test.py | 17 +++++
tests/gold_tests/tunnel/dumb_proxy.py | 77 +++++++++++-----------
8 files changed, 126 insertions(+), 69 deletions(-)
diff --git a/tests/gold_tests/command_argument/verify_global_plugin.test.py
b/tests/gold_tests/command_argument/verify_global_plugin.test.py
index e9aa44ce52..05a80af7c8 100644
--- a/tests/gold_tests/command_argument/verify_global_plugin.test.py
+++ b/tests/gold_tests/command_argument/verify_global_plugin.test.py
@@ -39,12 +39,12 @@ def create_ts_process():
# Ideally we would set the test run's Processes.Default to ts, but deep
# copy of processes is not currently implemented in autest. Therefore we
# replace the command which ts runs with a dummy command, and pull in
- # piecemeal the values from ts that we want into the test run.
- ts.Command = "sleep 100"
- # sleep will return -2 when autest kills it. We set the expectation for the
- # -2 return code here so the test doesn't fail because of this.
- ts.ReturnCode = -2
- # Clear the ready criteria because sleep is ready as soon as it is running.
+ # piecemeal the values from ts that we want into the test run. The helper
+ # only needs to create the ATS environment and runroot, so let it exit
+ # normally instead of depending on AuTest's cleanup signal behavior.
+ ts.Command = "true"
+ ts.ReturnCode = 0
+ # Clear the ready criteria because the helper is ready as soon as it runs.
ts.Ready = None
return ts
diff --git a/tests/gold_tests/command_argument/verify_remap_plugin.test.py
b/tests/gold_tests/command_argument/verify_remap_plugin.test.py
index 3b68147962..554029f871 100644
--- a/tests/gold_tests/command_argument/verify_remap_plugin.test.py
+++ b/tests/gold_tests/command_argument/verify_remap_plugin.test.py
@@ -39,12 +39,12 @@ def create_ts_process():
# Ideally we would set the test run's Processes.Default to ts, but deep
# copy of processes is not currently implemented in autest. Therefore we
# replace the command which ts runs with a dummy command, and pull in
- # piecemeal the values from ts that we want into the test run.
- ts.Command = "sleep 100"
- # sleep will return -2 when autest kills it. We set the expectation for the
- # -2 return code here so the test doesn't fail because of this.
- ts.ReturnCode = -2
- # Clear the ready criteria because sleep is ready as soon as it is running.
+ # piecemeal the values from ts that we want into the test run. The helper
+ # only needs to create the ATS environment and runroot, so let it exit
+ # normally instead of depending on AuTest's cleanup signal behavior.
+ ts.Command = "true"
+ ts.ReturnCode = 0
+ # Clear the ready criteria because the helper is ready as soon as it runs.
ts.Ready = None
return ts
diff --git a/tests/gold_tests/connect/connect.test.py
b/tests/gold_tests/connect/connect.test.py
index 838ba3543f..371addc978 100644
--- a/tests/gold_tests/connect/connect.test.py
+++ b/tests/gold_tests/connect/connect.test.py
@@ -88,7 +88,8 @@ logging:
tr.MakeCurlCommand(f"-v --fail -s -p -x
127.0.0.1:{self.ts.Variables.port} 'http://foo.com/get'", ts=self.ts)
tr.Processes.Default.Streams.stderr = "gold/connect_0_stderr.gold"
tr.Processes.Default.Streams.stderr = Testers.ContainsExpression(
- f'Connected to 127.0.0.1.*{self.ts.Variables.port}', 'Curl should
connect through the ATS proxy port.')
+ rf'(Connected to|Established connection to)
127\.0\.0\.1.*{self.ts.Variables.port}',
+ 'Curl should connect through the ATS proxy port.')
tr.Processes.Default.ReturnCode = 0
tr.Processes.Default.TimeOut = 3
self.__checkProcessAfter(tr)
diff --git a/tests/gold_tests/h2/grpc/grpc_server.py
b/tests/gold_tests/h2/grpc/grpc_server.py
index 63dbd4f564..2a435db65f 100644
--- a/tests/gold_tests/h2/grpc/grpc_server.py
+++ b/tests/gold_tests/h2/grpc/grpc_server.py
@@ -27,50 +27,73 @@ import simple_pb2
import simple_pb2_grpc
global_message_counter: int = 0
+MESSAGE_TIMEOUT_SECONDS = 60
class Talker(simple_pb2_grpc.TalkerServicer):
"""A gRPC servicer."""
- async def MakeRequest(self, request: simple_pb2.SimpleRequest, context:
grpc.aio.ServicerContext):
- """An example gRPC method."""
+ def __init__(self, num_expected_messages: int, done_event: asyncio.Event):
+ self._num_expected_messages = num_expected_messages
+ self._done_event = done_event
+
+ def _record_message(self) -> None:
global global_message_counter
+
global_message_counter += 1
+ if global_message_counter >= self._num_expected_messages:
+ asyncio.get_running_loop().call_soon(self._done_event.set)
+
+ async def MakeRequest(self, request: simple_pb2.SimpleRequest, context:
grpc.aio.ServicerContext):
+ """An example gRPC method."""
+ self._record_message()
print(f'Received request: {request.message}')
response = simple_pb2.SimpleResponse(message=f"Echo:
{request.message}")
return response
async def MakeAnotherRequest(self, request: simple_pb2.SimpleRequest,
context: grpc.aio.ServicerContext):
"""An example gRPC method."""
- global global_message_counter
- global_message_counter += 1
+ self._record_message()
print(f'Received another request: {request.message}')
response = simple_pb2.SimpleResponse(message=f"Another echo:
{request.message}")
return response
-async def run_grpc_server(port: int, server_cert: str, server_key: str) -> int:
+async def run_grpc_server(port: int, server_cert: str, server_key: str,
num_expected_messages: int) -> int:
"""Run the gRPC server.
:param port: The port on which to listen.
:param server_cert: The public TLS certificate to use.
:param server_key: The private TLS key to use.
+ :param num_expected_messages: The number of messages expected from clients.
:return: The exit code.
"""
credentials = grpc.ssl_server_credentials([(server_key, server_cert)])
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
- simple_pb2_grpc.add_TalkerServicer_to_server(Talker(), server)
+ done_event = asyncio.Event()
+ simple_pb2_grpc.add_TalkerServicer_to_server(Talker(num_expected_messages,
done_event), server)
server_endpoint = f'127.0.0.1:{port}'
server.add_secure_port(server_endpoint, credentials)
print(f'Listening on: {server_endpoint}')
+ timed_out = False
try:
await server.start()
- await server.wait_for_termination()
+ try:
+ await asyncio.wait_for(done_event.wait(),
timeout=MESSAGE_TIMEOUT_SECONDS)
+ except asyncio.TimeoutError:
+ timed_out = True
+ print(f'Timed out waiting for {num_expected_messages} messages
after {MESSAGE_TIMEOUT_SECONDS} seconds.')
except asyncio.exceptions.CancelledError:
print('Shutting down the server.')
finally:
- await server.stop(0)
- return 0
+ await server.stop(5)
+
+ if not timed_out and global_message_counter == num_expected_messages:
+ print(f'Received {num_expected_messages} messages as expected.')
+ return 0
+ else:
+ print(f'Expected {num_expected_messages} messages, but received
{global_message_counter}.')
+ return 1
def parse_args() -> argparse.Namespace:
@@ -91,7 +114,7 @@ def main() -> int:
"""
args = parse_args()
try:
- return asyncio.run(run_grpc_server(args.port, args.server_crt.read(),
args.server_key.read()))
+ return asyncio.run(run_grpc_server(args.port, args.server_crt.read(),
args.server_key.read(), args.num_expected_messages))
except KeyboardInterrupt:
pass
diff --git a/tests/gold_tests/h2/trickle_server.py
b/tests/gold_tests/h2/trickle_server.py
index 36f42bd907..2cbd36d69d 100644
--- a/tests/gold_tests/h2/trickle_server.py
+++ b/tests/gold_tests/h2/trickle_server.py
@@ -22,11 +22,10 @@ import logging
import math
import statistics
import sys
+import threading
import time
from OpenSSL.SSL import Error as SSLError
from OpenSSL.SSL import SysCallError as SSLSysCallError
-import threading
-
import eventlet
from eventlet.green.OpenSSL import SSL, crypto
from h2.config import H2Configuration
@@ -85,6 +84,7 @@ class Http2ConnectionManager:
self.listening_conn: H2Connection =
H2Connection(config=listening_config)
self.requests: Dict[int, RequestInfo] = {}
self.completed_stream_ids: Set[int] = set()
+ self._sent_response = False
# Delay times in ms between each data frame.
self._data_delays: List[int] = []
@@ -137,13 +137,15 @@ class Http2ConnectionManager:
# Clean up any responses we sent.
for stream_id in responded_streams:
del responses[stream_id]
+ if responded_streams:
+ self._sent_response = True
def _receive_data(self, responses: Dict[int, ResponseInfo]) ->
Optional[bytes]:
"""Receive data from the socket.
:param responses: A dictionary of stream IDs to responses that have
accumulated.
- :return: The data received, or None if the connection for the socket
has closed.
+ :return: The data received, or None if the socket closed or the
response was sent.
"""
data: Optional[bytes] = None
while not data:
@@ -156,6 +158,8 @@ class Http2ConnectionManager:
except TimeoutError:
# Take time to send any responses we've generated.
self._send_responses(responses)
+ if self._sent_response:
+ return None
# Loop back around to receive more data.
logging.debug('Timeout, waiting again for more data.')
@@ -245,7 +249,7 @@ class Http2ConnectionManager:
while True:
data = self._receive_data(responses)
if not data:
- # Connection ended.
+ # Connection ended or the measured transaction completed.
break
logging.debug(f'Giving {len(data)} bytes to the h2 connection')
@@ -260,6 +264,8 @@ class Http2ConnectionManager:
except (SSLError, SSLSysCallError) as e:
logging.debug(f'Ignoring end-loop sock.sendall exception for
now: {e}')
pass
+ if self._sent_response:
+ return
def get_data_delays(self) -> List[int]:
"""Get the DATA frame timing list.
@@ -268,6 +274,10 @@ class Http2ConnectionManager:
"""
return self._data_delays
+ def sent_response(self) -> bool:
+ """Return whether a response was sent on this connection."""
+ return self._sent_response
+
def _process_request(self, request: RequestInfo) -> ResponseInfo:
"""Handle a request from a client.
@@ -356,7 +366,6 @@ def run_server(listen_port, https_pem, ca_pem) -> List[int]:
listening_socket = eventlet.listen(('0.0.0.0', listen_port))
listening_socket = SSL.Connection(context, listening_socket)
logging.info(f"Serving HTTP/2 Proxy on 127.0.0.1:{listen_port} with pem
'{https_pem}'")
- pool = eventlet.GreenPool()
manager = None
while True:
@@ -365,7 +374,10 @@ def run_server(listen_port, https_pem, ca_pem) ->
List[int]:
manager = Http2ConnectionManager(new_connection_socket)
manager.cert_file = https_pem
manager.ca_file = ca_pem
- pool.spawn_n(manager.run_forever)
+ manager.run_forever()
+ data_delays = manager.get_data_delays()
+ if data_delays or manager.sent_response():
+ return data_delays
except KeyboardInterrupt as e:
logging.debug("Handling KeyboardInterrupt")
if manager is not None:
diff --git a/tests/gold_tests/pipeline/pipeline_server.py
b/tests/gold_tests/pipeline/pipeline_server.py
index cf13fa5696..85214b8c08 100644
--- a/tests/gold_tests/pipeline/pipeline_server.py
+++ b/tests/gold_tests/pipeline/pipeline_server.py
@@ -83,7 +83,11 @@ def receive_requests(sock: socket.socket) -> None:
end_of_second_request: bytes = b'12345'
end_of_third_request: bytes = b'\r\n\r\n'
while not received_third_request:
- data = sock.recv(1024)
+ try:
+ data = sock.recv(1024)
+ except socket.timeout:
+ print("Timed out waiting for an unexpected third request.")
+ break
if not data:
print("Socket closed.")
break
@@ -126,6 +130,7 @@ def receive_requests(sock: socket.socket) -> None:
print()
time.sleep(0.01)
sock.sendall(second_response_bytes)
+ sock.settimeout(1.0)
continue
elif processing_third_request:
diff --git
a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
index a632521da6..b6260f33a6 100644
--- a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
+++ b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
@@ -86,6 +86,23 @@ ssl_multicert:
tr.Processes.Default.StartBefore(self.ts)
tr.AddVerifierClientProcess(
"client", self.replay_file, http_ports=[self.ts.Variables.port],
https_ports=[self.ts.Variables.ssl_port])
+ tr.StillRunningAfter = self.server
+ tr.StillRunningAfter = self.nameserver
+ tr.StillRunningAfter = self.ts
+
+ tr = Test.AddTestRun("Wait for transaction data sink output")
+ timeout = 30
+ watcher = tr.Processes.Process("watcher")
+ watcher.Command = f"sleep {timeout}"
+ watcher.Ready = When.FileContains(self.ts.Disk.traffic_out.Name,
"http2_response_body_dumped")
+ watcher.TimeOut = timeout
+ tr.TimeOut = timeout
+ tr.StillRunningAfter = self.server
+ tr.StillRunningAfter = self.nameserver
+ tr.StillRunningAfter = self.ts
+ tr.Processes.Default.StartBefore(watcher)
+ tr.Processes.Default.Command = "echo
await_transaction_data_sink_output"
+ tr.Processes.Default.ReturnCode = 0
TransactionDataSyncTest().run()
diff --git a/tests/gold_tests/tunnel/dumb_proxy.py
b/tests/gold_tests/tunnel/dumb_proxy.py
index 6e0d152a47..36ed60a03a 100644
--- a/tests/gold_tests/tunnel/dumb_proxy.py
+++ b/tests/gold_tests/tunnel/dumb_proxy.py
@@ -24,9 +24,6 @@ import threading
import argparse
LOCAL_HOST = '127.0.0.1'
-TIMEOUT = 0.5
-# Create a thread-local data object to store the number of bytes transferred.
-thread_local_data = threading.local()
def parse_args() -> argparse.Namespace:
@@ -40,39 +37,38 @@ def parse_args() -> argparse.Namespace:
return parser.parse_args()
-def initialize_thread_local_data():
- thread_local_data.client_to_server_bytes = 0
- thread_local_data.server_to_client_bytes = 0
-
-
-def forward(source, destination, is_client_to_server):
+def forward(source, destination, is_client_to_server, byte_counts):
"""Forward traffic from source to destination.
:param source: socket to read from.
:param destination: socket to write to.
:param is_client_to_server: True if forwarding from client to server.
+ :param byte_counts: A dictionary to record bytes sent in each direction.
"""
- # Initialize thread-local data.
- initialize_thread_local_data()
+ bytes_transferred = 0
- while True:
- try:
+ try:
+ while True:
data = source.recv(4096)
if not data:
break
destination.sendall(data)
- except Exception as e:
- # Catching all exceptions.
- break
+ bytes_transferred += len(data)
+ except OSError:
+ pass
+ finally:
+ try:
+ destination.shutdown(socket.SHUT_WR)
+ except OSError:
+ pass
+ # Forwarding done. Print the number of bytes transferred in the direction.
+ if bytes_transferred > 0:
if is_client_to_server:
- thread_local_data.client_to_server_bytes += len(data)
+ byte_counts["client-to-server"] = bytes_transferred
+ print(f"client-to-server: {bytes_transferred}", flush=True)
else:
- thread_local_data.server_to_client_bytes += len(data)
- # Forwarding done. Print the number of bytes transferred in the direction.
- if thread_local_data.client_to_server_bytes > 0:
- print(f"client-to-server: {thread_local_data.client_to_server_bytes}")
- elif thread_local_data.server_to_client_bytes > 0:
- print(f"server-to-client: {thread_local_data.server_to_client_bytes}")
+ byte_counts["server-to-client"] = bytes_transferred
+ print(f"server-to-client: {bytes_transferred}", flush=True)
def start_bidirectional_forwarding(client_socket, forwarding_port):
@@ -80,44 +76,47 @@ def start_bidirectional_forwarding(client_socket,
forwarding_port):
:param client_socket: socket connected to the client.
:param forwarding_port: server port to forward to.
+ :return: The number of bytes forwarded in both directions.
"""
CLIENT_TO_SERVER = True
SERVER_TO_CLIENT = False
+ byte_counts = {}
with client_socket, socket.socket(socket.AF_INET, socket.SOCK_STREAM) as
server_socket:
- client_socket.settimeout(TIMEOUT)
- server_socket.settimeout(TIMEOUT)
- server_socket.connect((LOCAL_HOST, forwarding_port))
+ try:
+ server_socket.connect((LOCAL_HOST, forwarding_port))
+ except OSError:
+ return 0
+
# Spawn a thread to forward traffic from client to server.
- client_to_server = threading.Thread(target=forward,
args=(client_socket, server_socket, CLIENT_TO_SERVER))
+ client_to_server = threading.Thread(target=forward,
args=(client_socket, server_socket, CLIENT_TO_SERVER, byte_counts))
client_to_server.start()
- # Forward traffic from server to client in the current thread.
- forward(server_socket, client_socket, SERVER_TO_CLIENT)
+ server_to_client = threading.Thread(target=forward,
args=(server_socket, client_socket, SERVER_TO_CLIENT, byte_counts))
+ server_to_client.start()
client_to_server.join()
+ server_to_client.join()
+ return sum(byte_counts.values())
def main() -> int:
"""Run the proxy."""
- print(f"Starting proxy...")
+ print("Starting proxy...", flush=True)
args = parse_args()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listen_socket:
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_socket.bind((LOCAL_HOST, args.listening_port))
listen_socket.listen()
- print(f"Proxy listening on {LOCAL_HOST}:{args.listening_port}")
+ print(f"Proxy listening on {LOCAL_HOST}:{args.listening_port}",
flush=True)
try:
while True:
client_sock, client_addr = listen_socket.accept()
- print(f"Accepted connection from {client_addr}")
- # Handle each client connection in a new thread.
- client_thread =
threading.Thread(target=start_bidirectional_forwarding, args=(client_sock,
args.forwarding_port))
- client_thread.start()
- except Exception:
- # Catching all exceptions.
- pass
+ print(f"Accepted connection from {client_addr}", flush=True)
+ if start_bidirectional_forwarding(client_sock,
args.forwarding_port) > 0:
+ break
except KeyboardInterrupt:
- print("Caught KeyboardInterrupt, terminating the program")
+ print("Caught KeyboardInterrupt, terminating the program",
flush=True)
return 0
+ return 0
if __name__ == "__main__":