This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch monitor-client-socket-for-disconnections in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit dff9b12a817fcad59e67c66ba1bf1de6a97dd1b7 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Wed Aug 23 16:02:39 2023 -0400 Stop client process and clean up if client disconnects Previously, when processing long running streaming requests, such as _find with a highly selective selector, when no rows are emitted for a while, the client could disconnect but the request process, and all the associated workers on remote nodes would continue running, consuming server resources. We already handle remote (RPC) process cleanup if the coordinator crashes, we just need a way to kill that coordinator if the connection is closed in the case when no data may be emitted for a long time. This is what the current commit accomplishes. It turns out there is no simple way to detect passive mode socket disconnects in current versions of Erlang/OTP. The socket at the kernel level may be closed (in close_wait state), however `inet:info/1` will continue reporting it as `connected`. The only ways to obtain an accurate connection state is to try to write, read, or query the TCP info state. Here we attempt to query the state with tcp_info. Unfortunately, not all versions of supported OTP platforms have this option, so it probably never became an officially supported inet socket option, so we use `raw` socket option for it. However, it turns out most of CouchDB supported platforms do have that option. The only platform this is not working currently is Windows. In that case, or future platform cases we default to the previous behavior, assuming the socket is still open. Co-Authored-By: Robert Newson <[email protected]> --- rel/overlay/etc/default.ini | 4 + src/chttpd/src/chttpd.erl | 5 + src/chttpd/src/chttpd_util.erl | 132 ++++++++++++++++++++++++++- src/docs/src/config/http.rst | 11 +++ src/fabric/src/fabric_db_update_listener.erl | 16 ++-- src/fabric/src/fabric_streams.erl | 15 ++- src/fabric/src/fabric_view_changes.erl | 3 +- 7 files changed, 173 insertions(+), 13 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index c3124a643..396244754 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -206,6 +206,10 @@ bind_address = 127.0.0.1 ; response header. ;server_header_versions = true +; How often to check for client disconnects while processing streaming +; requests such as _all_docs, _find, _changes and views +;disconnect_check_msec = 30000 + ;[jwt_auth] ; List of claims to validate ; can be the name of a claim like "exp" or a tuple if the claim requires diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index c8e6fdc97..7f432f8f1 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -320,6 +320,9 @@ handle_request_int(MochiReq) -> erlang:put(dont_log_request, true), erlang:put(dont_log_response, true), + % Save client socket so that it can be monitored for disconnects + chttpd_util:mochiweb_socket_set(MochiReq:get(socket)), + {HttpReq2, Response} = case before_request(HttpReq0) of {ok, HttpReq1} -> @@ -328,6 +331,8 @@ handle_request_int(MochiReq) -> {HttpReq0, Response0} end, + chttpd_util:mochiweb_socket_clean(), + {Status, Code, Reason, Resp} = split_response(Response), HttpResp = #httpd_resp{ diff --git a/src/chttpd/src/chttpd_util.erl b/src/chttpd/src/chttpd_util.erl index 955beca57..a303d2e0b 100644 --- a/src/chttpd/src/chttpd_util.erl +++ b/src/chttpd/src/chttpd_util.erl @@ -22,9 +22,17 @@ get_chttpd_auth_config_integer/2, get_chttpd_auth_config_boolean/2, maybe_add_csp_header/3, - get_db_info/1 + get_db_info/1, + mochiweb_socket_set/1, + mochiweb_socket_clean/0, + mochiweb_socket_get/0, + mochiweb_socket_check_msec/0, + stop_client_process_if_disconnected/2 ]). +-define(MOCHIWEB_SOCKET, mochiweb_connection_socket). +-define(DISCONNECT_CHECK_MSEC, 30000). + get_chttpd_config(Key) -> config:get("chttpd", Key, config:get("httpd", Key)). @@ -110,3 +118,125 @@ get_db_info(DbName) -> catch _Tag:Error -> {error, Error} end. + +mochiweb_socket_set(Sock) -> + put(?MOCHIWEB_SOCKET, Sock). + +mochiweb_socket_get() -> + get(?MOCHIWEB_SOCKET). + +mochiweb_socket_clean() -> + erase(?MOCHIWEB_SOCKET). + +mochiweb_socket_check_msec() -> + MSec = config:get_integer("chttpd", "disconnect_check_msec", ?DISCONNECT_CHECK_MSEC), + % Add some jitter to avoid a stampede in case of a larger number of concurrent connection + MSec + rand:uniform(MSec). + +stop_client_process_if_disconnected(Pid, Sock) -> + case is_mochiweb_socket_closed(Sock) of + true -> + couch_log:warning("client socket ~p disconnected, terminating ~p", [Sock, Pid]), + exit(Pid, {shutdown, client_disconnected}); + false -> + ok + end. + +is_mochiweb_socket_closed(undefined) -> + false; +is_mochiweb_socket_closed(Sock) -> + OsType = os:type(), + case tcp_info_opt(OsType) of + {raw, _, _, _} = InfoOpt -> + case mochiweb_socket:getopts(Sock, [InfoOpt]) of + {ok, [{raw, _, _, <<State:8/native, _/binary>>}]} -> + tcp_is_closed(State, OsType); + {ok, []} -> + false; + {error, einval} -> + % Already cleaned up + true; + {error, _} -> + false + end; + undefined -> + false + end. + +% All OS-es have the tcpi_state (uint8) as first member of tcp_info struct + +tcp_info_opt({unix, linux}) -> + %% include/netinet/in.h + %% IPPROTO_TCP = 6 + %% + %% include/netinet/tcp.h + %% #define TCP_INFO 11 + %% + {raw, 6, 11, 1}; +tcp_info_opt({unix, darwin}) -> + %% include/netinet/in.h + %% #define IPPROTO_TCP 6 + %% + %% netinet/tcp.h + %% #define TCP_CONNECTION_INFO 0x106 + %% + {raw, 6, 16#106, 1}; +tcp_info_opt({unix, freebsd}) -> + %% sys/netinet/in.h + %% #define IPPROTO_TCP 6 /* tcp */ + %% + %% sys/netinet/tcp.h + %% #define TCP_INFO 32 /* retrieve tcp_info structure */ + %% + {raw, 6, 32, 1}; +tcp_info_opt({_, _}) -> + undefined. + +tcp_is_closed(State, {unix, linux}) -> + %% netinet/tcp.h + %% enum + %% { + %% TCP_ESTABLISHED = 1, + %% TCP_SYN_SENT, + %% TCP_SYN_RECV, + %% TCP_FIN_WAIT1, + %% TCP_FIN_WAIT2, + %% TCP_TIME_WAIT, + %% TCP_CLOSE, + %% TCP_CLOSE_WAIT, + %% TCP_LAST_ACK, + %% TCP_LISTEN, + %% TCP_CLOSING + %% } + %% + lists:member(State, [4, 5, 6, 7, 8, 9, 10]); +tcp_is_closed(State, {unix, darwin}) -> + %% netinet/tcp_fsm.h + %% #define TCPS_CLOSED 0 /* closed */ + %% #define TCPS_LISTEN 1 /* listening for connection */ + %% #define TCPS_SYN_SENT 2 /* active, have sent syn */ + %% #define TCPS_SYN_RECEIVED 3 /* have send and received syn */ + %% #define TCPS_ESTABLISHED 4 /* established */ + %% #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for close */ + %% #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */ + %% #define TCPS_CLOSING 7 /* closed xchd FIN; await FIN ACK */ + %% #define TCPS_LAST_ACK 8 /* had fin and close; await FIN ACK */ + %% #define TCPS_FIN_WAIT_2 9 /* have closed, fin is acked */ + %% #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after close */ + %% + lists:member(State, [0, 5, 6, 7, 8, 9, 10]); +tcp_is_closed(State, {unix, freebsd}) -> + %% netinet/tcp_fsm.h + %% #define TCPS_CLOSED 0 /* closed */ + %% #define TCPS_LISTEN 1 /* listening for connection */ + %% #define TCPS_SYN_SENT 2 /* active, have sent syn */ + %% #define TCPS_SYN_RECEIVED 3 /* have sent and received syn */ + %% #define TCPS_ESTABLISHED 4 /* established */ + %% #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for close */ + %% #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */ + %% #define TCPS_CLOSING 7/* closed xchd FIN; await FIN ACK */ + %% #define TCPS_LAST_ACK 8/* had fin and close; await FIN ACK */ + %% #define TCPS_FIN_WAIT_2 9/* have closed, fin is acked */ + %% #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after close */ + %% + lists:member(State, [0, 5, 6, 7, 8, 9, 10]). diff --git a/src/docs/src/config/http.rst b/src/docs/src/config/http.rst index cbe36c4e3..bfba6b75d 100644 --- a/src/docs/src/config/http.rst +++ b/src/docs/src/config/http.rst @@ -266,6 +266,17 @@ HTTP Server Options [chttpd] server_header_versions = true + .. config:option:: disconnect_check_msec :: Client disconnection check interval + + .. versionadded:: 3.4 + + How often, in milliseconds, to check for client disconnects while + processing streaming requests such as _all_docs, _find, _changes and + views. :: + + [chttpd] + disconnect_check_msec = 30000 + .. config:section:: httpd :: HTTP Server Options .. versionchanged:: 3.2 These options were moved to [chttpd] section: diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl index 78ccf5a4d..d4a49f37d 100644 --- a/src/fabric/src/fabric_db_update_listener.erl +++ b/src/fabric/src/fabric_db_update_listener.erl @@ -12,7 +12,7 @@ -module(fabric_db_update_listener). --export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]). +-export([go/5, start_update_notifier/1, stop/1, wait_db_updated/1]). -export([handle_db_event/3]). -include_lib("fabric/include/fabric.hrl"). @@ -36,12 +36,12 @@ shards }). -go(Parent, ParentRef, DbName, Timeout) -> +go(Parent, ParentRef, DbName, Timeout, ClientSock) -> Shards = mem3:shards(DbName), Notifiers = start_update_notifiers(Shards), MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]), RexiMon = rexi_monitor:start(MonRefs), - MonPid = start_cleanup_monitor(self(), Notifiers), + MonPid = start_cleanup_monitor(self(), Notifiers, ClientSock), %% This is not a common pattern for rexi but to enable the calling %% process to communicate via handle_message/3 we "fake" it as a %% a spawned worker. @@ -97,16 +97,17 @@ handle_db_event(_DbName, deleted, St) -> handle_db_event(_DbName, _Event, St) -> {ok, St}. -start_cleanup_monitor(Parent, Notifiers) -> +start_cleanup_monitor(Parent, Notifiers, ClientSock) -> spawn(fun() -> Ref = erlang:monitor(process, Parent), - cleanup_monitor(Parent, Ref, Notifiers) + cleanup_monitor(Parent, Ref, Notifiers, ClientSock) end). stop_cleanup_monitor(MonPid) -> MonPid ! {self(), stop}. -cleanup_monitor(Parent, Ref, Notifiers) -> +cleanup_monitor(Parent, Ref, Notifiers, ClientSock) -> + CheckMSec = chttpd_util:mochiweb_socket_check_msec(), receive {'DOWN', Ref, _, _, _} -> stop_update_notifiers(Notifiers); @@ -116,6 +117,9 @@ cleanup_monitor(Parent, Ref, Notifiers) -> couch_log:error("Unkown message in ~w :: ~w", [?MODULE, Else]), stop_update_notifiers(Notifiers), exit(Parent, {unknown_message, Else}) + after CheckMSec -> + chttpd_util:stop_client_process_if_disconnected(Parent, ClientSock), + cleanup_monitor(Parent, Ref, Notifiers, ClientSock) end. stop_update_notifiers(Notifiers) -> diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 318824814..f4144666c 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -43,7 +43,8 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> replacements = Replacements, ring_opts = RingOpts }, - spawn_worker_cleaner(self(), Workers0), + ClientSock = chttpd_util:mochiweb_socket_get(), + spawn_worker_cleaner(self(), Workers0, ClientSock), Timeout = fabric_util:request_timeout(), case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of {ok, #stream_acc{ready = Workers}} -> @@ -155,12 +156,12 @@ handle_stream_start(Else, _, _) -> % Spawn an auxiliary rexi worker cleaner. This will be used in cases % when the coordinator (request) process is forceably killed and doesn't % get a chance to process its `after` fabric:clean/1 clause. -spawn_worker_cleaner(Coordinator, Workers) -> +spawn_worker_cleaner(Coordinator, Workers, ClientSock) -> case get(?WORKER_CLEANER) of undefined -> Pid = spawn(fun() -> erlang:monitor(process, Coordinator), - cleaner_loop(Coordinator, Workers) + cleaner_loop(Coordinator, Workers, ClientSock) end), put(?WORKER_CLEANER, Pid), Pid; @@ -168,12 +169,16 @@ spawn_worker_cleaner(Coordinator, Workers) -> ExistingCleaner end. -cleaner_loop(Pid, Workers) -> +cleaner_loop(Pid, Workers, ClientSock) -> + CheckMSec = chttpd_util:mochiweb_socket_check_msec(), receive {add_worker, Pid, Worker} -> - cleaner_loop(Pid, [Worker | Workers]); + cleaner_loop(Pid, [Worker | Workers], ClientSock); {'DOWN', _, _, Pid, _} -> fabric_util:cleanup(Workers) + after CheckMSec -> + chttpd_util:stop_client_process_if_disconnected(Pid, ClientSock), + cleaner_loop(Pid, Workers, ClientSock) end. add_worker_to_cleaner(CoordinatorPid, Worker) -> diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index 970a200d2..bf0661a17 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -39,11 +39,12 @@ go(DbName, Feed, Options, Callback, Acc0) when {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), Ref = make_ref(), Parent = self(), + ClientSock = chttpd_util:mochiweb_socket_get(), UpdateListener = { spawn_link( fabric_db_update_listener, go, - [Parent, Ref, DbName, Timeout] + [Parent, Ref, DbName, Timeout, ClientSock] ), Ref },
