This is an automated email from the ASF dual-hosted git repository.

cmcfarlen pushed a commit to branch 10.2.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit c3ce46c671d1e58cfc807248b138a60ac9927df6
Author: Brian Neradt <[email protected]>
AuthorDate: Tue Jun 2 15:34:47 2026 -0500

    Fix flaky Fedora 44 AuTest helpers (#13220)
    
    Fedora 44 CI exposed several AuTests whose helper processes stayed
    alive after useful assertions finished, or whose asynchronous output
    could be checked before it was fully produced. Those tests could report
    killed helpers, missing log markers, or worker-wide timeouts even when
    ATS itself completed the intended checks.
    
    This makes affected helpers finish after expected traffic, waits for
    transaction data sink output before asserting on traffic.out, relaxes
    the connect curl wording for newer curl, and removes dummy long-running
    ATS commands from plugin verification tests. This keeps AuTest
    responsible for test behavior instead of cleanup races and lets the
    Fedora 44 shards report real failures.
    
    (cherry picked from commit 1f6675a0d8aa1fed0988fbf31c18651c48e4188c)
---
 .../command_argument/verify_global_plugin.test.py  | 12 ++--
 .../command_argument/verify_remap_plugin.test.py   | 12 ++--
 tests/gold_tests/connect/connect.test.py           |  3 +-
 tests/gold_tests/h2/grpc/grpc_server.py            | 43 +++++++++---
 tests/gold_tests/h2/trickle_server.py              | 24 +++++--
 tests/gold_tests/pipeline/pipeline_server.py       |  7 +-
 .../transform/transaction_data_sink.test.py        | 17 +++++
 tests/gold_tests/tunnel/dumb_proxy.py              | 77 +++++++++++-----------
 8 files changed, 126 insertions(+), 69 deletions(-)

diff --git a/tests/gold_tests/command_argument/verify_global_plugin.test.py 
b/tests/gold_tests/command_argument/verify_global_plugin.test.py
index e9aa44ce52..05a80af7c8 100644
--- a/tests/gold_tests/command_argument/verify_global_plugin.test.py
+++ b/tests/gold_tests/command_argument/verify_global_plugin.test.py
@@ -39,12 +39,12 @@ def create_ts_process():
     # Ideally we would set the test run's Processes.Default to ts, but deep
     # copy of processes is not currently implemented in autest. Therefore we
     # replace the command which ts runs with a dummy command, and pull in
-    # piecemeal the values from ts that we want into the test run.
-    ts.Command = "sleep 100"
-    # sleep will return -2 when autest kills it. We set the expectation for the
-    # -2 return code here so the test doesn't fail because of this.
-    ts.ReturnCode = -2
-    # Clear the ready criteria because sleep is ready as soon as it is running.
+    # piecemeal the values from ts that we want into the test run. The helper
+    # only needs to create the ATS environment and runroot, so let it exit
+    # normally instead of depending on AuTest's cleanup signal behavior.
+    ts.Command = "true"
+    ts.ReturnCode = 0
+    # Clear the ready criteria because the helper is ready as soon as it runs.
     ts.Ready = None
     return ts
 
diff --git a/tests/gold_tests/command_argument/verify_remap_plugin.test.py 
b/tests/gold_tests/command_argument/verify_remap_plugin.test.py
index 3b68147962..554029f871 100644
--- a/tests/gold_tests/command_argument/verify_remap_plugin.test.py
+++ b/tests/gold_tests/command_argument/verify_remap_plugin.test.py
@@ -39,12 +39,12 @@ def create_ts_process():
     # Ideally we would set the test run's Processes.Default to ts, but deep
     # copy of processes is not currently implemented in autest. Therefore we
     # replace the command which ts runs with a dummy command, and pull in
-    # piecemeal the values from ts that we want into the test run.
-    ts.Command = "sleep 100"
-    # sleep will return -2 when autest kills it. We set the expectation for the
-    # -2 return code here so the test doesn't fail because of this.
-    ts.ReturnCode = -2
-    # Clear the ready criteria because sleep is ready as soon as it is running.
+    # piecemeal the values from ts that we want into the test run. The helper
+    # only needs to create the ATS environment and runroot, so let it exit
+    # normally instead of depending on AuTest's cleanup signal behavior.
+    ts.Command = "true"
+    ts.ReturnCode = 0
+    # Clear the ready criteria because the helper is ready as soon as it runs.
     ts.Ready = None
     return ts
 
diff --git a/tests/gold_tests/connect/connect.test.py 
b/tests/gold_tests/connect/connect.test.py
index a1674e4a34..37ea4db84f 100644
--- a/tests/gold_tests/connect/connect.test.py
+++ b/tests/gold_tests/connect/connect.test.py
@@ -88,7 +88,8 @@ logging:
         tr.MakeCurlCommand(f"-v --fail -s -p -x 
127.0.0.1:{self.ts.Variables.port} 'http://foo.com/get'", ts=self.ts)
         tr.Processes.Default.Streams.stderr = "gold/connect_0_stderr.gold"
         tr.Processes.Default.Streams.stderr = Testers.ContainsExpression(
-            f'Connected to 127.0.0.1.*{self.ts.Variables.port}', 'Curl should 
connect through the ATS proxy port.')
+            rf'(Connected to|Established connection to) 
127\.0\.0\.1.*{self.ts.Variables.port}',
+            'Curl should connect through the ATS proxy port.')
         tr.Processes.Default.ReturnCode = 0
         tr.Processes.Default.TimeOut = 3
         self.__checkProcessAfter(tr)
diff --git a/tests/gold_tests/h2/grpc/grpc_server.py 
b/tests/gold_tests/h2/grpc/grpc_server.py
index 63dbd4f564..2a435db65f 100644
--- a/tests/gold_tests/h2/grpc/grpc_server.py
+++ b/tests/gold_tests/h2/grpc/grpc_server.py
@@ -27,50 +27,73 @@ import simple_pb2
 import simple_pb2_grpc
 
 global_message_counter: int = 0
+MESSAGE_TIMEOUT_SECONDS = 60
 
 
 class Talker(simple_pb2_grpc.TalkerServicer):
     """A gRPC servicer."""
 
-    async def MakeRequest(self, request: simple_pb2.SimpleRequest, context: 
grpc.aio.ServicerContext):
-        """An example gRPC method."""
+    def __init__(self, num_expected_messages: int, done_event: asyncio.Event):
+        self._num_expected_messages = num_expected_messages
+        self._done_event = done_event
+
+    def _record_message(self) -> None:
         global global_message_counter
+
         global_message_counter += 1
+        if global_message_counter >= self._num_expected_messages:
+            asyncio.get_running_loop().call_soon(self._done_event.set)
+
+    async def MakeRequest(self, request: simple_pb2.SimpleRequest, context: 
grpc.aio.ServicerContext):
+        """An example gRPC method."""
+        self._record_message()
         print(f'Received request: {request.message}')
         response = simple_pb2.SimpleResponse(message=f"Echo: 
{request.message}")
         return response
 
     async def MakeAnotherRequest(self, request: simple_pb2.SimpleRequest, 
context: grpc.aio.ServicerContext):
         """An example gRPC method."""
-        global global_message_counter
-        global_message_counter += 1
+        self._record_message()
         print(f'Received another request: {request.message}')
         response = simple_pb2.SimpleResponse(message=f"Another echo: 
{request.message}")
         return response
 
 
-async def run_grpc_server(port: int, server_cert: str, server_key: str) -> int:
+async def run_grpc_server(port: int, server_cert: str, server_key: str, 
num_expected_messages: int) -> int:
     """Run the gRPC server.
 
     :param port: The port on which to listen.
     :param server_cert: The public TLS certificate to use.
     :param server_key: The private TLS key to use.
+    :param num_expected_messages: The number of messages expected from clients.
     :return: The exit code.
     """
     credentials = grpc.ssl_server_credentials([(server_key, server_cert)])
     server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
-    simple_pb2_grpc.add_TalkerServicer_to_server(Talker(), server)
+    done_event = asyncio.Event()
+    simple_pb2_grpc.add_TalkerServicer_to_server(Talker(num_expected_messages, 
done_event), server)
     server_endpoint = f'127.0.0.1:{port}'
     server.add_secure_port(server_endpoint, credentials)
     print(f'Listening on: {server_endpoint}')
+    timed_out = False
     try:
         await server.start()
-        await server.wait_for_termination()
+        try:
+            await asyncio.wait_for(done_event.wait(), 
timeout=MESSAGE_TIMEOUT_SECONDS)
+        except asyncio.TimeoutError:
+            timed_out = True
+            print(f'Timed out waiting for {num_expected_messages} messages 
after {MESSAGE_TIMEOUT_SECONDS} seconds.')
     except asyncio.exceptions.CancelledError:
         print('Shutting down the server.')
     finally:
-        await server.stop(0)
-    return 0
+        await server.stop(5)
+
+    if not timed_out and global_message_counter == num_expected_messages:
+        print(f'Received {num_expected_messages} messages as expected.')
+        return 0
+    else:
+        print(f'Expected {num_expected_messages} messages, but received 
{global_message_counter}.')
+        return 1
 
 
 def parse_args() -> argparse.Namespace:
@@ -91,7 +114,7 @@ def main() -> int:
     """
     args = parse_args()
     try:
-        return asyncio.run(run_grpc_server(args.port, args.server_crt.read(), 
args.server_key.read()))
+        return asyncio.run(run_grpc_server(args.port, args.server_crt.read(), 
args.server_key.read(), args.num_expected_messages))
     except KeyboardInterrupt:
         pass
 
diff --git a/tests/gold_tests/h2/trickle_server.py 
b/tests/gold_tests/h2/trickle_server.py
index 36f42bd907..2cbd36d69d 100644
--- a/tests/gold_tests/h2/trickle_server.py
+++ b/tests/gold_tests/h2/trickle_server.py
@@ -22,11 +22,10 @@ import logging
 import math
 import statistics
 import sys
+import threading
 import time
 from OpenSSL.SSL import Error as SSLError
 from OpenSSL.SSL import SysCallError as SSLSysCallError
-import threading
-
 import eventlet
 from eventlet.green.OpenSSL import SSL, crypto
 from h2.config import H2Configuration
@@ -85,6 +84,7 @@ class Http2ConnectionManager:
         self.listening_conn: H2Connection = 
H2Connection(config=listening_config)
         self.requests: Dict[int, RequestInfo] = {}
         self.completed_stream_ids: Set[int] = set()
+        self._sent_response = False
 
         # Delay times in ms between each data frame.
         self._data_delays: List[int] = []
@@ -137,13 +137,15 @@ class Http2ConnectionManager:
         # Clean up any responses we sent.
         for stream_id in responded_streams:
             del responses[stream_id]
+        if responded_streams:
+            self._sent_response = True
 
     def _receive_data(self, responses: Dict[int, ResponseInfo]) -> 
Optional[bytes]:
         """Receive data from the socket.
 
         :param responses: A dictionary of stream IDs to responses that have 
accumulated.
 
-        :return: The data received, or None if the connection for the socket 
has closed.
+        :return: The data received, or None if the socket closed or the 
response was sent.
         """
         data: Optional[bytes] = None
         while not data:
@@ -156,6 +158,8 @@ class Http2ConnectionManager:
             except TimeoutError:
                 # Take time to send any responses we've generated.
                 self._send_responses(responses)
+                if self._sent_response:
+                    return None
 
                 # Loop back around to receive more data.
                 logging.debug('Timeout, waiting again for more data.')
@@ -245,7 +249,7 @@ class Http2ConnectionManager:
         while True:
             data = self._receive_data(responses)
             if not data:
-                # Connection ended.
+                # Connection ended or the measured transaction completed.
                 break
 
             logging.debug(f'Giving {len(data)} bytes to the h2 connection')
@@ -260,6 +264,8 @@ class Http2ConnectionManager:
             except (SSLError, SSLSysCallError) as e:
                 logging.debug(f'Ignoring end-loop sock.sendall exception for 
now: {e}')
                 pass
+            if self._sent_response:
+                return
 
     def get_data_delays(self) -> List[int]:
         """Get the DATA frame timing list.
@@ -268,6 +274,10 @@ class Http2ConnectionManager:
         """
         return self._data_delays
 
+    def sent_response(self) -> bool:
+        """Return whether a response was sent on this connection."""
+        return self._sent_response
+
     def _process_request(self, request: RequestInfo) -> ResponseInfo:
         """Handle a request from a client.
 
@@ -356,7 +366,6 @@ def run_server(listen_port, https_pem, ca_pem) -> List[int]:
     listening_socket = eventlet.listen(('0.0.0.0', listen_port))
     listening_socket = SSL.Connection(context, listening_socket)
     logging.info(f"Serving HTTP/2 Proxy on 127.0.0.1:{listen_port} with pem 
'{https_pem}'")
-    pool = eventlet.GreenPool()
 
     manager = None
     while True:
@@ -365,7 +374,10 @@ def run_server(listen_port, https_pem, ca_pem) -> 
List[int]:
             manager = Http2ConnectionManager(new_connection_socket)
             manager.cert_file = https_pem
             manager.ca_file = ca_pem
-            pool.spawn_n(manager.run_forever)
+            manager.run_forever()
+            data_delays = manager.get_data_delays()
+            if data_delays or manager.sent_response():
+                return data_delays
         except KeyboardInterrupt as e:
             logging.debug("Handling KeyboardInterrupt")
             if manager is not None:
diff --git a/tests/gold_tests/pipeline/pipeline_server.py 
b/tests/gold_tests/pipeline/pipeline_server.py
index cf13fa5696..85214b8c08 100644
--- a/tests/gold_tests/pipeline/pipeline_server.py
+++ b/tests/gold_tests/pipeline/pipeline_server.py
@@ -83,7 +83,11 @@ def receive_requests(sock: socket.socket) -> None:
     end_of_second_request: bytes = b'12345'
     end_of_third_request: bytes = b'\r\n\r\n'
     while not received_third_request:
-        data = sock.recv(1024)
+        try:
+            data = sock.recv(1024)
+        except socket.timeout:
+            print("Timed out waiting for an unexpected third request.")
+            break
         if not data:
             print("Socket closed.")
             break
@@ -126,6 +130,7 @@ def receive_requests(sock: socket.socket) -> None:
                 print()
                 time.sleep(0.01)
                 sock.sendall(second_response_bytes)
+                sock.settimeout(1.0)
                 continue
 
             elif processing_third_request:
diff --git 
a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py 
b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
index 46d71b84d9..de008facbe 100644
--- a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
+++ b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
@@ -80,6 +80,23 @@ class TransactionDataSyncTest:
         tr.Processes.Default.StartBefore(self.ts)
         tr.AddVerifierClientProcess(
             "client", self.replay_file, http_ports=[self.ts.Variables.port], 
https_ports=[self.ts.Variables.ssl_port])
+        tr.StillRunningAfter = self.server
+        tr.StillRunningAfter = self.nameserver
+        tr.StillRunningAfter = self.ts
+
+        tr = Test.AddTestRun("Wait for transaction data sink output")
+        timeout = 30
+        watcher = tr.Processes.Process("watcher")
+        watcher.Command = f"sleep {timeout}"
+        watcher.Ready = When.FileContains(self.ts.Disk.traffic_out.Name, 
"http2_response_body_dumped")
+        watcher.TimeOut = timeout
+        tr.TimeOut = timeout
+        tr.StillRunningAfter = self.server
+        tr.StillRunningAfter = self.nameserver
+        tr.StillRunningAfter = self.ts
+        tr.Processes.Default.StartBefore(watcher)
+        tr.Processes.Default.Command = "echo 
await_transaction_data_sink_output"
+        tr.Processes.Default.ReturnCode = 0
 
 
 TransactionDataSyncTest().run()
diff --git a/tests/gold_tests/tunnel/dumb_proxy.py 
b/tests/gold_tests/tunnel/dumb_proxy.py
index 6e0d152a47..36ed60a03a 100644
--- a/tests/gold_tests/tunnel/dumb_proxy.py
+++ b/tests/gold_tests/tunnel/dumb_proxy.py
@@ -24,9 +24,6 @@ import threading
 import argparse
 
 LOCAL_HOST = '127.0.0.1'
-TIMEOUT = 0.5
-# Create a thread-local data object to store the number of bytes transferred.
-thread_local_data = threading.local()
 
 
 def parse_args() -> argparse.Namespace:
@@ -40,39 +37,38 @@ def parse_args() -> argparse.Namespace:
     return parser.parse_args()
 
 
-def initialize_thread_local_data():
-    thread_local_data.client_to_server_bytes = 0
-    thread_local_data.server_to_client_bytes = 0
-
-
-def forward(source, destination, is_client_to_server):
+def forward(source, destination, is_client_to_server, byte_counts):
     """Forward traffic from source to destination.
 
     :param source: socket to read from.
     :param destination: socket to write to.
     :param is_client_to_server: True if forwarding from client to server.
+    :param byte_counts: A dictionary to record bytes sent in each direction.
     """
-    # Initialize thread-local data.
-    initialize_thread_local_data()
+    bytes_transferred = 0
 
-    while True:
-        try:
+    try:
+        while True:
             data = source.recv(4096)
             if not data:
                 break
             destination.sendall(data)
-        except Exception as e:
-            # Catching all exceptions.
-            break
+            bytes_transferred += len(data)
+    except OSError:
+        pass
+    finally:
+        try:
+            destination.shutdown(socket.SHUT_WR)
+        except OSError:
+            pass
+    # Forwarding done. Print the number of bytes transferred in the direction.
+    if bytes_transferred > 0:
         if is_client_to_server:
-            thread_local_data.client_to_server_bytes += len(data)
+            byte_counts["client-to-server"] = bytes_transferred
+            print(f"client-to-server: {bytes_transferred}", flush=True)
         else:
-            thread_local_data.server_to_client_bytes += len(data)
-    # Forwarding done. Print the number of bytes transferred in the direction.
-    if thread_local_data.client_to_server_bytes > 0:
-        print(f"client-to-server: {thread_local_data.client_to_server_bytes}")
-    elif thread_local_data.server_to_client_bytes > 0:
-        print(f"server-to-client: {thread_local_data.server_to_client_bytes}")
+            byte_counts["server-to-client"] = bytes_transferred
+            print(f"server-to-client: {bytes_transferred}", flush=True)
 
 
 def start_bidirectional_forwarding(client_socket, forwarding_port):
@@ -80,44 +76,47 @@ def start_bidirectional_forwarding(client_socket, 
forwarding_port):
 
     :param client_socket: socket connected to the client.
     :param forwarding_port: server port to forward to.
+    :return: The number of bytes forwarded in both directions.
     """
     CLIENT_TO_SERVER = True
     SERVER_TO_CLIENT = False
+    byte_counts = {}
     with client_socket, socket.socket(socket.AF_INET, socket.SOCK_STREAM) as 
server_socket:
-        client_socket.settimeout(TIMEOUT)
-        server_socket.settimeout(TIMEOUT)
-        server_socket.connect((LOCAL_HOST, forwarding_port))
+        try:
+            server_socket.connect((LOCAL_HOST, forwarding_port))
+        except OSError:
+            return 0
+
         # Spawn a thread to forward traffic from client to server.
-        client_to_server = threading.Thread(target=forward, 
args=(client_socket, server_socket, CLIENT_TO_SERVER))
+        client_to_server = threading.Thread(target=forward, 
args=(client_socket, server_socket, CLIENT_TO_SERVER, byte_counts))
         client_to_server.start()
 
-        # Forward traffic from server to client in the current thread.
-        forward(server_socket, client_socket, SERVER_TO_CLIENT)
+        server_to_client = threading.Thread(target=forward, 
args=(server_socket, client_socket, SERVER_TO_CLIENT, byte_counts))
+        server_to_client.start()
         client_to_server.join()
+        server_to_client.join()
+    return sum(byte_counts.values())
 
 
 def main() -> int:
     """Run the proxy."""
-    print(f"Starting proxy...")
+    print("Starting proxy...", flush=True)
     args = parse_args()
     with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listen_socket:
         listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         listen_socket.bind((LOCAL_HOST, args.listening_port))
         listen_socket.listen()
-        print(f"Proxy listening on {LOCAL_HOST}:{args.listening_port}")
+        print(f"Proxy listening on {LOCAL_HOST}:{args.listening_port}", 
flush=True)
         try:
             while True:
                 client_sock, client_addr = listen_socket.accept()
-                print(f"Accepted connection from {client_addr}")
-                # Handle each client connection in a new thread.
-                client_thread = 
threading.Thread(target=start_bidirectional_forwarding, args=(client_sock, 
args.forwarding_port))
-                client_thread.start()
-        except Exception:
-            # Catching all exceptions.
-            pass
+                print(f"Accepted connection from {client_addr}", flush=True)
+                if start_bidirectional_forwarding(client_sock, 
args.forwarding_port) > 0:
+                    break
         except KeyboardInterrupt:
-            print("Caught KeyboardInterrupt, terminating the program")
+            print("Caught KeyboardInterrupt, terminating the program", 
flush=True)
             return 0
+    return 0
 
 
 if __name__ == "__main__":

Reply via email to