This is an automated email from the ASF dual-hosted git repository. cmcfarlen pushed a commit to branch 10.0.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit 71787ff1b0b23cdf58ff9dfdc267ba29ea869af1 Author: Brian Neradt <[email protected]> AuthorDate: Tue Mar 5 11:27:54 2024 -0500 NOT_WRITE_AVAIL: schedule event to resend data frames (#11094) When sending a DATA frames, we check that we are not exceeding our write buffer's high water mark. If so, we don't write the frame. Unfortunately, before this patch, no event was scheduled to come back and try to resend any unwritten frames. This led to dropped frames at times. This patch adds a scheduled event to later re-try sending the unsent frames. This patch also updates the grpc.test.py to send 100 messages instead of 1 which, at least on my local system, reproduced this issue. (cherry picked from commit df078fb14745be946b4599fc0d64a397c761763e) --- include/proxy/http2/Http2CommonSession.h | 10 +++-- include/proxy/http2/Http2ConnectionState.h | 6 ++- include/proxy/http2/Http2Stream.h | 4 +- src/proxy/http2/Http2ConnectionState.cc | 42 ++++++++++++++++---- src/proxy/http2/Http2Stream.cc | 2 +- tests/gold_tests/h2/grpc/grpc.test.py | 25 +++++++++--- tests/gold_tests/h2/grpc/grpc_client.py | 61 ++++++++++++++++++++++++------ tests/gold_tests/h2/grpc/grpc_server.py | 54 +++++++++++++++++++------- tests/gold_tests/h2/grpc/simple.proto | 6 ++- 9 files changed, 162 insertions(+), 48 deletions(-) diff --git a/include/proxy/http2/Http2CommonSession.h b/include/proxy/http2/Http2CommonSession.h index 9f9b7e53b3..f24ee8b094 100644 --- a/include/proxy/http2/Http2CommonSession.h +++ b/include/proxy/http2/Http2CommonSession.h @@ -34,15 +34,17 @@ // HTTP2_SESSION_EVENT_INIT Http2CommonSession * HTTP/2 session is born // HTTP2_SESSION_EVENT_FINI Http2CommonSession * HTTP/2 session is ended // HTTP2_SESSION_EVENT_RECV Http2Frame * Received a frame -// HTTP2_SESSION_EVENT_XMIT Http2Frame * Send this frame +// HTTP2_SESSION_EVENT_PRIO Http2Frame * Send this priority frame +// HTTP2_SESSION_EVENT_DATA Http2Frame * Send the data frames in the stream #define HTTP2_SESSION_EVENT_INIT (HTTP2_SESSION_EVENTS_START + 1) #define HTTP2_SESSION_EVENT_FINI (HTTP2_SESSION_EVENTS_START + 2) #define HTTP2_SESSION_EVENT_RECV (HTTP2_SESSION_EVENTS_START + 3) #define HTTP2_SESSION_EVENT_XMIT (HTTP2_SESSION_EVENTS_START + 4) -#define HTTP2_SESSION_EVENT_SHUTDOWN_INIT (HTTP2_SESSION_EVENTS_START + 5) -#define HTTP2_SESSION_EVENT_SHUTDOWN_CONT (HTTP2_SESSION_EVENTS_START + 6) -#define HTTP2_SESSION_EVENT_REENABLE (HTTP2_SESSION_EVENTS_START + 7) +#define HTTP2_SESSION_EVENT_DATA (HTTP2_SESSION_EVENTS_START + 5) +#define HTTP2_SESSION_EVENT_SHUTDOWN_INIT (HTTP2_SESSION_EVENTS_START + 6) +#define HTTP2_SESSION_EVENT_SHUTDOWN_CONT (HTTP2_SESSION_EVENTS_START + 7) +#define HTTP2_SESSION_EVENT_REENABLE (HTTP2_SESSION_EVENTS_START + 8) enum class Http2SessionCod : int { NOT_PROVIDED, diff --git a/include/proxy/http2/Http2ConnectionState.h b/include/proxy/http2/Http2ConnectionState.h index a455105164..83ff405649 100644 --- a/include/proxy/http2/Http2ConnectionState.h +++ b/include/proxy/http2/Http2ConnectionState.h @@ -155,8 +155,9 @@ public: Http2ErrorCode get_shutdown_reason() const; // HTTP/2 frame sender - void schedule_stream(Http2Stream *stream); + void schedule_stream_to_send_priority_frames(Http2Stream *stream); void send_data_frames_depends_on_priority(); + void schedule_stream_to_send_data_frames(Http2Stream *stream); void send_data_frames(Http2Stream *stream); Http2SendDataFrameResult send_a_data_frame(Http2Stream *stream, size_t &payload_length); void send_headers_frame(Http2Stream *stream); @@ -387,7 +388,8 @@ private: // "If the END_HEADERS bit is not set, this frame MUST be followed by // another CONTINUATION frame." Http2StreamId continued_stream_id = 0; - bool _scheduled = false; + bool _priority_scheduled = false; + bool _data_scheduled = false; bool fini_received = false; bool in_destroy = false; int recursion = 0; diff --git a/include/proxy/http2/Http2Stream.h b/include/proxy/http2/Http2Stream.h index 72515f1fb6..677cf6d087 100644 --- a/include/proxy/http2/Http2Stream.h +++ b/include/proxy/http2/Http2Stream.h @@ -204,8 +204,8 @@ private: NetTimeout _timeout{}; HTTPParser http_parser; - EThread *_thread = nullptr; - Http2StreamId _id; + EThread *_thread = nullptr; + Http2StreamId _id = -1; Http2StreamState _state = Http2StreamState::HTTP2_STREAM_STATE_IDLE; int64_t _http_sm_id = -1; diff --git a/src/proxy/http2/Http2ConnectionState.cc b/src/proxy/http2/Http2ConnectionState.cc index 232c4db825..703586c383 100644 --- a/src/proxy/http2/Http2ConnectionState.cc +++ b/src/proxy/http2/Http2ConnectionState.cc @@ -22,6 +22,7 @@ */ #include "../../iocore/net/P_Net.h" +#include "iocore/eventsystem/Lock.h" #include "proxy/http2/HTTP2.h" #include "proxy/http2/Http2ConnectionState.h" #include "proxy/http2/Http2ClientSession.h" @@ -35,6 +36,7 @@ #include "iocore/net/TLSSNISupport.h" #include "tscore/ink_assert.h" +#include "tscore/ink_memory.h" #include "tsutil/PostScript.h" #include "tsutil/LocalBuffer.h" @@ -1447,7 +1449,14 @@ Http2ConnectionState::main_event_handler(int event, void *edata) REMEMBER(event, this->recursion); SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); send_data_frames_depends_on_priority(); - _scheduled = false; + _priority_scheduled = false; + } break; + + case HTTP2_SESSION_EVENT_DATA: { + REMEMBER(event, this->recursion); + SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); + this->restart_streams(); + _data_scheduled = false; } break; // Initiate a graceful shutdown @@ -2014,9 +2023,9 @@ Http2ConnectionState::update_initial_local_rwnd(Http2WindowSize new_size) } void -Http2ConnectionState::schedule_stream(Http2Stream *stream) +Http2ConnectionState::schedule_stream_to_send_priority_frames(Http2Stream *stream) { - Http2StreamDebug(session, stream->get_id(), "Scheduled"); + Http2StreamDebug(session, stream->get_id(), "Scheduling sending priority frames"); Http2DependencyTree::Node *node = stream->priority_node; ink_release_assert(node != nullptr); @@ -2024,14 +2033,29 @@ Http2ConnectionState::schedule_stream(Http2Stream *stream) SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); dependency_tree->activate(node); - if (!_scheduled) { - _scheduled = true; + if (!_priority_scheduled) { + _priority_scheduled = true; SET_HANDLER(&Http2ConnectionState::main_event_handler); this_ethread()->schedule_imm_local((Continuation *)this, HTTP2_SESSION_EVENT_XMIT); } } +void +Http2ConnectionState::schedule_stream_to_send_data_frames(Http2Stream *stream) +{ + Http2StreamDebug(session, stream->get_id(), "Scheduling sending data frames"); + + SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); + + if (!_data_scheduled) { + _data_scheduled = true; + + SET_HANDLER(&Http2ConnectionState::main_event_handler); + this_ethread()->schedule_in((Continuation *)this, HRTIME_MSECOND, HTTP2_SESSION_EVENT_DATA); + } +} + void Http2ConnectionState::send_data_frames_depends_on_priority() { @@ -2214,6 +2238,9 @@ Http2ConnectionState::send_data_frames(Http2Stream *stream) } else { ink_release_assert(!"What case is this?"); } + } else if (result == Http2SendDataFrameResult::NOT_WRITE_AVAIL) { + // Schedule an even to wake up and try to resend the stream. + schedule_stream_to_send_data_frames(stream); } } if (!more_data && result != Http2SendDataFrameResult::DONE) { @@ -2230,14 +2257,15 @@ Http2ConnectionState::send_headers_frame(Http2Stream *stream) int payload_length = 0; uint8_t flags = 0x00; - Http2StreamDebug(session, stream->get_id(), "Send HEADERS frame"); - // For outbound streams, set the ID if it has not yet already been set // Need to defer setting the stream ID to avoid another later created stream // sending out first. This may cause the peer to issue a stream or connection // error (new stream less that the greatest we have seen so far) this->set_stream_id(stream); + // Keep this debug below set_stream_id so that the id is correct. + Http2StreamDebug(session, stream->get_id(), "Send HEADERS frame"); + HTTPHdr *send_hdr = stream->get_send_header(); if (stream->expect_send_trailer()) { // Which is a no-op conversion diff --git a/src/proxy/http2/Http2Stream.cc b/src/proxy/http2/Http2Stream.cc index d958151fc4..0ee5cd36d9 100644 --- a/src/proxy/http2/Http2Stream.cc +++ b/src/proxy/http2/Http2Stream.cc @@ -938,7 +938,7 @@ Http2Stream::send_body(bool call_update) SCOPED_MUTEX_LOCK(lock, _proxy_ssn->mutex, this_ethread()); if (Http2::stream_priority_enabled) { - connection_state.schedule_stream(this); + connection_state.schedule_stream_to_send_priority_frames(this); // signal_write_event() will be called from `Http2ConnectionState::send_data_frames_depends_on_priority()` // when write_vio is consumed } else { diff --git a/tests/gold_tests/h2/grpc/grpc.test.py b/tests/gold_tests/h2/grpc/grpc.test.py index b7fc962aa3..72c5249810 100644 --- a/tests/gold_tests/h2/grpc/grpc.test.py +++ b/tests/gold_tests/h2/grpc/grpc.test.py @@ -24,6 +24,8 @@ import sys class TestGrpc(): """Test basic gRPC traffic.""" + num_client_connections = 50 + def __init__(self, description: str): """Configure a TestRun for gRPC traffic. @@ -64,8 +66,16 @@ class TestGrpc(): 'proxy.config.ssl.client.verify.server.policy': 'PERMISSIVE', 'proxy.config.dns.nameservers': f"127.0.0.1:{dns_port}", 'proxy.config.dns.resolv_conf': "NULL", - "proxy.config.diags.debug.enabled": 1, + + # Disable debug logging to avoid excessive log file size. I keep + # it here for convenience of use during manual debugging. + "proxy.config.diags.debug.enabled": 0, "proxy.config.diags.debug.tags": "http", + + # The Python gRPC module uses many WINDO_UPDATE frames of small + # sizes, so we have to disable the min_avg_window_update to + # avoid ATS generating ERRORS logs and GOAWAY frames for them. + "proxy.config.http2.min_avg_window_update": 0, }) return self._ts @@ -84,8 +94,11 @@ class TestGrpc(): self._server.Setup.Copy(server_key) port = get_port(self._server, 'port') - command = (f'{sys.executable} {tr.RunDirectory}/grpc_server.py {port} ' - 'server.pem server.key') + # Each connection performs two requests, so multiply the number of + # connections by 2 to get the expected number of transactions. + command = ( + f'{sys.executable} {tr.RunDirectory}/grpc_server.py {port} ' + f'server.pem server.key {TestGrpc.num_client_connections * 2}') self._server.Command = command self._server.ReturnCode = 0 return self._server @@ -100,10 +113,12 @@ class TestGrpc(): ts_cert = os.path.join(self._ts.Variables.SSLDir, 'server.pem') # The cert is for example.com, so we must use that domain. hostname = 'example.com' - command = (f'{sys.executable} {tr.RunDirectory}/grpc_client.py ' - f'{hostname} {proxy_port} {ts_cert}') + command = ( + f'{sys.executable} {tr.RunDirectory}/grpc_client.py ' + f'{hostname} {proxy_port} {ts_cert} {TestGrpc.num_client_connections}') tr.Processes.Default.Command = command tr.Processes.Default.ReturnCode = 0 + tr.TimeOut = 10 def _compile_protobuf_files(self) -> None: """Compile the protobuf files.""" diff --git a/tests/gold_tests/h2/grpc/grpc_client.py b/tests/gold_tests/h2/grpc/grpc_client.py index db5990bee6..ba8818a0c7 100644 --- a/tests/gold_tests/h2/grpc/grpc_client.py +++ b/tests/gold_tests/h2/grpc/grpc_client.py @@ -17,6 +17,7 @@ # limitations under the License. import argparse +import asyncio import grpc import os import sys @@ -24,8 +25,11 @@ import sys import simple_pb2 import simple_pb2_grpc +global_message_counter: int = 0 +global_num_completed_connections: int = 0 -def run_grpc_client(hostname: str, proxy_port: int, proxy_cert: bytes) -> int: + +async def run_grpc_client(hostname: str, proxy_port: int, proxy_cert: bytes) -> int: """Run the gRPC client. :param hostname: The hostname to which to connect. @@ -33,26 +37,61 @@ def run_grpc_client(hostname: str, proxy_port: int, proxy_cert: bytes) -> int: :param proxy_cert: The public TLS certificate to verify ATS against. :return: The exit code. """ + global global_message_counter + global global_num_completed_connections credentials = grpc.ssl_channel_credentials(root_certificates=proxy_cert) channel_options = (('grpc.ssl_target_name_override', hostname),) destination_endpoint = f'127.0.0.1:{proxy_port}' - channel = grpc.secure_channel(destination_endpoint, credentials, options=channel_options) - print(f'Connecting to: {destination_endpoint}') - stub = simple_pb2_grpc.SimpleStub(channel) - - message = simple_pb2.SimpleRequest(message="Client request message") - response = stub.SimpleMethod(message) - print(f'Response received from server: {response.message}') + async with grpc.aio.secure_channel(destination_endpoint, credentials, options=channel_options) as channel: + print(f'Connecting to: {destination_endpoint}') + stub = simple_pb2_grpc.TalkerStub(channel) + + print(f'Creating two messages to send for counter: {global_message_counter}') + message_1 = simple_pb2.SimpleRequest(message=f'Client request message: {global_message_counter}.1') + message_2 = simple_pb2.SimpleRequest(message=f'Client request message: {global_message_counter}.2') + my_message_count = global_message_counter + global_message_counter += 1 + print(f'Sending request: {my_message_count}.1') + response = await stub.MakeRequest(message_1) + print(f'Response {my_message_count}.1 received from server: {response.message}') + print(f'Sending the second request: {my_message_count}.2') + message = simple_pb2.SimpleRequest(message=f'Client request message: {global_message_counter}.2') + response = await stub.MakeAnotherRequest(message_2) + print(f'Response {my_message_count}.2 received from server: {response.message}') + global_num_completed_connections += 1 return 0 +async def run_grpc_clients(hostname: str, proxy_port: int, proxy_cert: bytes, num_connections: int) -> int: + """Run the gRPC client. + + :param hostname: The hostname to which to connect. + :param proxy_port: The ATS port to which to connect. + :param proxy_cert: The public TLS certificate to verify ATS against. + :param num_connections: The number of client connections to create. + :return: The exit code. + """ + tasks: list[asyncio.Task] = [] + for i in range(num_connections): + print(f'Creating client {i}') + tasks.append(run_grpc_client(hostname, proxy_port, proxy_cert)) + await asyncio.gather(*tasks) + if global_num_completed_connections != num_connections: + print(f'Expected {num_connections} responses, but got {global_num_completed_connections}') + return 1 + else: + print(f'Got the expected {num_connections} responses.') + return 0 + + def parse_args() -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('hostname', help='The hostname to which to connect.') - parser.add_argument('proxy_port', type=int, help='The ATS port to which to connect.') - parser.add_argument('proxy_cert', type=argparse.FileType('rb'), help='The public TLS certificate to use.') + parser.add_argument('proxy_port', metavar='proxy-port', type=int, help='The ATS port to which to connect.') + parser.add_argument('proxy_cert', metavar='proxy-cert', type=argparse.FileType('rb'), help='The public TLS certificate to use.') + parser.add_argument('num_connections', metavar='num-connections', type=int, help='The number of connections to create.') return parser.parse_args() @@ -64,7 +103,7 @@ def main() -> int: args = parse_args() try: - return run_grpc_client(args.hostname, args.proxy_port, args.proxy_cert.read()) + return asyncio.run(run_grpc_clients(args.hostname, args.proxy_port, args.proxy_cert.read(), args.num_connections)) except grpc.RpcError as e: print(f'RPC failed with code {e.code()}: {e.details()}') return 1 diff --git a/tests/gold_tests/h2/grpc/grpc_server.py b/tests/gold_tests/h2/grpc/grpc_server.py index 109b303cc3..63dbd4f564 100644 --- a/tests/gold_tests/h2/grpc/grpc_server.py +++ b/tests/gold_tests/h2/grpc/grpc_server.py @@ -17,6 +17,7 @@ # limitations under the License. import argparse +import asyncio from concurrent import futures import grpc import sys @@ -25,18 +26,30 @@ import time import simple_pb2 import simple_pb2_grpc +global_message_counter: int = 0 -class SimpleServicer(simple_pb2_grpc.SimpleServicer): + +class Talker(simple_pb2_grpc.TalkerServicer): """A gRPC servicer.""" - def SimpleMethod(self, request, context): + async def MakeRequest(self, request: simple_pb2.SimpleRequest, context: grpc.aio.ServicerContext): """An example gRPC method.""" - print(f'Request received from client: {request.message}') + global global_message_counter + global_message_counter += 1 + print(f'Received request: {request.message}') response = simple_pb2.SimpleResponse(message=f"Echo: {request.message}") return response + async def MakeAnotherRequest(self, request: simple_pb2.SimpleRequest, context: grpc.aio.ServicerContext): + """An example gRPC method.""" + global global_message_counter + global_message_counter += 1 + print(f'Received another request: {request.message}') + response = simple_pb2.SimpleResponse(message=f"Another echo: {request.message}") + return response -def run_grpc_server(port: int, server_cert: str, server_key: str) -> int: + +async def run_grpc_server(port: int, server_cert: str, server_key: str) -> int: """Run the gRPC server. :param port: The port on which to listen. @@ -45,17 +58,18 @@ def run_grpc_server(port: int, server_cert: str, server_key: str) -> int: :return: The exit code. """ credentials = grpc.ssl_server_credentials([(server_key, server_cert)]) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - simple_pb2_grpc.add_SimpleServicer_to_server(SimpleServicer(), server) + server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10)) + simple_pb2_grpc.add_TalkerServicer_to_server(Talker(), server) server_endpoint = f'127.0.0.1:{port}' server.add_secure_port(server_endpoint, credentials) print(f'Listening on: {server_endpoint}') - server.start() try: - server.wait_for_termination() - except KeyboardInterrupt: - print("Keyboard interrupt received, exiting.") - return 0 + await server.start() + await server.wait_for_termination() + except asyncio.exceptions.CancelledError: + print('Shutting down the server.') + finally: + await server.stop(0) return 0 @@ -63,8 +77,10 @@ def parse_args() -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('port', type=int, help='The port on which to listen.') - parser.add_argument('server_crt', type=argparse.FileType('rb'), help="The public TLS certificate to use.") - parser.add_argument('server_key', type=argparse.FileType('rb'), help="The private TLS key to use.") + parser.add_argument('server_crt', metavar='server-crt', type=argparse.FileType('rb'), help="The public TLS certificate to use.") + parser.add_argument('server_key', metavar='server-key', type=argparse.FileType('rb'), help="The private TLS key to use.") + parser.add_argument( + 'num_expected_messages', metavar='num-expected-messages', type=int, help="The number of expected messages from the client.") return parser.parse_args() @@ -74,7 +90,17 @@ def main() -> int: :return: The exit code. """ args = parse_args() - return run_grpc_server(args.port, args.server_crt.read(), args.server_key.read()) + try: + return asyncio.run(run_grpc_server(args.port, args.server_crt.read(), args.server_key.read())) + except KeyboardInterrupt: + pass + + if global_message_counter == args.num_expected_messages: + print(f'Received {args.num_expected_messages} messages as expected.') + return 0 + else: + print(f'Expected {args.num_expected_messages} messages, but received {global_message_counter}.') + return 1 if __name__ == '__main__': diff --git a/tests/gold_tests/h2/grpc/simple.proto b/tests/gold_tests/h2/grpc/simple.proto index d8f2aa106b..c2f058f044 100644 --- a/tests/gold_tests/h2/grpc/simple.proto +++ b/tests/gold_tests/h2/grpc/simple.proto @@ -25,8 +25,10 @@ syntax = "proto3"; package simple; -service Simple { - rpc SimpleMethod(SimpleRequest) returns (SimpleResponse) {} +service Talker { + rpc MakeRequest(SimpleRequest) returns (SimpleResponse) {} + rpc MakeAnotherRequest(SimpleRequest) returns (SimpleResponse) {} + rpc MakeStreamedRequest(stream SimpleRequest) returns (stream SimpleResponse) {} } message SimpleRequest {
