This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch monitor-client-socket-for-disconnections
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit dff9b12a817fcad59e67c66ba1bf1de6a97dd1b7
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Wed Aug 23 16:02:39 2023 -0400

    Stop client process and clean up if client disconnects
    
    Previously, when processing long running streaming requests, such as _find 
with
    a highly selective selector, when no rows are emitted for a while, the 
client
    could disconnect but the request process, and all the associated workers on
    remote nodes would continue running, consuming server resources.
    
    We already handle remote (RPC) process cleanup if the coordinator crashes, 
we
    just need a way to kill that coordinator if the connection is closed in the
    case when no data may be emitted for a long time. This is what the current
    commit accomplishes.
    
    It turns out there is no simple way to detect passive mode socket 
disconnects
    in current versions of Erlang/OTP. The socket at the kernel level may be 
closed
    (in close_wait state), however `inet:info/1` will continue reporting it as
    `connected`. The only ways to obtain an accurate connection state is to try 
to
    write, read, or query the TCP info state.
    
    Here we attempt to query the state with tcp_info. Unfortunately, not all
    versions of supported OTP platforms have this option, so it probably never
    became an officially supported inet socket option, so we use `raw` socket
    option for it. However, it turns out most of CouchDB supported platforms do
    have that option. The only platform this is not working currently is 
Windows.
    In that case, or future platform cases we default to the previous behavior,
    assuming the socket is still open.
    
    Co-Authored-By: Robert Newson <[email protected]>
---
 rel/overlay/etc/default.ini                  |   4 +
 src/chttpd/src/chttpd.erl                    |   5 +
 src/chttpd/src/chttpd_util.erl               | 132 ++++++++++++++++++++++++++-
 src/docs/src/config/http.rst                 |  11 +++
 src/fabric/src/fabric_db_update_listener.erl |  16 ++--
 src/fabric/src/fabric_streams.erl            |  15 ++-
 src/fabric/src/fabric_view_changes.erl       |   3 +-
 7 files changed, 173 insertions(+), 13 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index c3124a643..396244754 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -206,6 +206,10 @@ bind_address = 127.0.0.1
 ; response header.
 ;server_header_versions = true
 
+; How often to check for client disconnects while processing streaming
+; requests such as _all_docs, _find, _changes and views
+;disconnect_check_msec = 30000
+
 ;[jwt_auth]
 ; List of claims to validate
 ; can be the name of a claim like "exp" or a tuple if the claim requires
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index c8e6fdc97..7f432f8f1 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -320,6 +320,9 @@ handle_request_int(MochiReq) ->
     erlang:put(dont_log_request, true),
     erlang:put(dont_log_response, true),
 
+    % Save client socket so that it can be monitored for disconnects
+    chttpd_util:mochiweb_socket_set(MochiReq:get(socket)),
+
     {HttpReq2, Response} =
         case before_request(HttpReq0) of
             {ok, HttpReq1} ->
@@ -328,6 +331,8 @@ handle_request_int(MochiReq) ->
                 {HttpReq0, Response0}
         end,
 
+    chttpd_util:mochiweb_socket_clean(),
+
     {Status, Code, Reason, Resp} = split_response(Response),
 
     HttpResp = #httpd_resp{
diff --git a/src/chttpd/src/chttpd_util.erl b/src/chttpd/src/chttpd_util.erl
index 955beca57..a303d2e0b 100644
--- a/src/chttpd/src/chttpd_util.erl
+++ b/src/chttpd/src/chttpd_util.erl
@@ -22,9 +22,17 @@
     get_chttpd_auth_config_integer/2,
     get_chttpd_auth_config_boolean/2,
     maybe_add_csp_header/3,
-    get_db_info/1
+    get_db_info/1,
+    mochiweb_socket_set/1,
+    mochiweb_socket_clean/0,
+    mochiweb_socket_get/0,
+    mochiweb_socket_check_msec/0,
+    stop_client_process_if_disconnected/2
 ]).
 
+-define(MOCHIWEB_SOCKET, mochiweb_connection_socket).
+-define(DISCONNECT_CHECK_MSEC, 30000).
+
 get_chttpd_config(Key) ->
     config:get("chttpd", Key, config:get("httpd", Key)).
 
@@ -110,3 +118,125 @@ get_db_info(DbName) ->
     catch
         _Tag:Error -> {error, Error}
     end.
+
+mochiweb_socket_set(Sock) ->
+    put(?MOCHIWEB_SOCKET, Sock).
+
+mochiweb_socket_get() ->
+    get(?MOCHIWEB_SOCKET).
+
+mochiweb_socket_clean() ->
+    erase(?MOCHIWEB_SOCKET).
+
+mochiweb_socket_check_msec() ->
+    MSec = config:get_integer("chttpd", "disconnect_check_msec", 
?DISCONNECT_CHECK_MSEC),
+    % Add some jitter to avoid a stampede in case of a larger number of 
concurrent connection
+    MSec + rand:uniform(MSec).
+
+stop_client_process_if_disconnected(Pid, Sock) ->
+    case is_mochiweb_socket_closed(Sock) of
+        true ->
+            couch_log:warning("client socket ~p disconnected, terminating ~p", 
[Sock, Pid]),
+            exit(Pid, {shutdown, client_disconnected});
+        false ->
+            ok
+    end.
+
+is_mochiweb_socket_closed(undefined) ->
+    false;
+is_mochiweb_socket_closed(Sock) ->
+    OsType = os:type(),
+    case tcp_info_opt(OsType) of
+        {raw, _, _, _} = InfoOpt ->
+            case mochiweb_socket:getopts(Sock, [InfoOpt]) of
+                {ok, [{raw, _, _, <<State:8/native, _/binary>>}]} ->
+                    tcp_is_closed(State, OsType);
+                {ok, []} ->
+                    false;
+                {error, einval} ->
+                    % Already cleaned up
+                    true;
+                {error, _} ->
+                    false
+            end;
+        undefined ->
+            false
+    end.
+
+% All OS-es have the tcpi_state (uint8) as first member of tcp_info struct
+
+tcp_info_opt({unix, linux}) ->
+    %% include/netinet/in.h
+    %%   IPPROTO_TCP = 6
+    %%
+    %% include/netinet/tcp.h
+    %%   #define TCP_INFO 11
+    %%
+    {raw, 6, 11, 1};
+tcp_info_opt({unix, darwin}) ->
+    %% include/netinet/in.h
+    %%   #define IPPROTO_TCP   6
+    %%
+    %% netinet/tcp.h
+    %%   #define TCP_CONNECTION_INFO  0x106
+    %%
+    {raw, 6, 16#106, 1};
+tcp_info_opt({unix, freebsd}) ->
+    %% sys/netinet/in.h
+    %%  #define  IPPROTO_TCP  6 /* tcp */
+    %%
+    %% sys/netinet/tcp.h
+    %%  #define  TCP_INFO    32 /* retrieve tcp_info structure */
+    %%
+    {raw, 6, 32, 1};
+tcp_info_opt({_, _}) ->
+    undefined.
+
+tcp_is_closed(State, {unix, linux}) ->
+    %% netinet/tcp.h
+    %%   enum
+    %%   {
+    %%     TCP_ESTABLISHED = 1,
+    %%     TCP_SYN_SENT,
+    %%     TCP_SYN_RECV,
+    %%     TCP_FIN_WAIT1,
+    %%     TCP_FIN_WAIT2,
+    %%     TCP_TIME_WAIT,
+    %%     TCP_CLOSE,
+    %%     TCP_CLOSE_WAIT,
+    %%     TCP_LAST_ACK,
+    %%     TCP_LISTEN,
+    %%     TCP_CLOSING
+    %%   }
+    %%
+    lists:member(State, [4, 5, 6, 7, 8, 9, 10]);
+tcp_is_closed(State, {unix, darwin}) ->
+    %% netinet/tcp_fsm.h
+    %%   #define TCPS_CLOSED             0       /* closed */
+    %%   #define TCPS_LISTEN             1       /* listening for connection */
+    %%   #define TCPS_SYN_SENT           2       /* active, have sent syn */
+    %%   #define TCPS_SYN_RECEIVED       3       /* have send and received syn 
*/
+    %%   #define TCPS_ESTABLISHED        4       /* established */
+    %%   #define TCPS_CLOSE_WAIT         5       /* rcvd fin, waiting for 
close */
+    %%   #define TCPS_FIN_WAIT_1         6       /* have closed, sent fin */
+    %%   #define TCPS_CLOSING            7       /* closed xchd FIN; await FIN 
ACK */
+    %%   #define TCPS_LAST_ACK           8       /* had fin and close; await 
FIN ACK */
+    %%   #define TCPS_FIN_WAIT_2         9       /* have closed, fin is acked 
*/
+    %%   #define TCPS_TIME_WAIT          10      /* in 2*msl quiet wait after 
close */
+    %%
+    lists:member(State, [0, 5, 6, 7, 8, 9, 10]);
+tcp_is_closed(State, {unix, freebsd}) ->
+    %% netinet/tcp_fsm.h
+    %%   #define TCPS_CLOSED 0 /* closed */
+    %%   #define TCPS_LISTEN 1 /* listening for connection */
+    %%   #define TCPS_SYN_SENT 2  /* active, have sent syn */
+    %%   #define TCPS_SYN_RECEIVED 3 /* have sent and received syn */
+    %%   #define TCPS_ESTABLISHED 4 /* established */
+    %%   #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for close */
+    %%   #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */
+    %%   #define TCPS_CLOSING 7/* closed xchd FIN; await FIN ACK */
+    %%   #define TCPS_LAST_ACK 8/* had fin and close; await FIN ACK */
+    %%   #define TCPS_FIN_WAIT_2 9/* have closed, fin is acked */
+    %%   #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after close */
+    %%
+    lists:member(State, [0, 5, 6, 7, 8, 9, 10]).
diff --git a/src/docs/src/config/http.rst b/src/docs/src/config/http.rst
index cbe36c4e3..bfba6b75d 100644
--- a/src/docs/src/config/http.rst
+++ b/src/docs/src/config/http.rst
@@ -266,6 +266,17 @@ HTTP Server Options
             [chttpd]
             server_header_versions = true
 
+    .. config:option:: disconnect_check_msec :: Client disconnection check 
interval
+
+        .. versionadded:: 3.4
+
+        How often, in milliseconds, to check for client disconnects while
+        processing streaming requests such as _all_docs, _find, _changes and
+        views. ::
+
+            [chttpd]
+            disconnect_check_msec = 30000
+
 .. config:section:: httpd :: HTTP Server Options
 
     .. versionchanged:: 3.2 These options were moved to [chttpd] section:
diff --git a/src/fabric/src/fabric_db_update_listener.erl 
b/src/fabric/src/fabric_db_update_listener.erl
index 78ccf5a4d..d4a49f37d 100644
--- a/src/fabric/src/fabric_db_update_listener.erl
+++ b/src/fabric/src/fabric_db_update_listener.erl
@@ -12,7 +12,7 @@
 
 -module(fabric_db_update_listener).
 
--export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).
+-export([go/5, start_update_notifier/1, stop/1, wait_db_updated/1]).
 -export([handle_db_event/3]).
 
 -include_lib("fabric/include/fabric.hrl").
@@ -36,12 +36,12 @@
     shards
 }).
 
-go(Parent, ParentRef, DbName, Timeout) ->
+go(Parent, ParentRef, DbName, Timeout, ClientSock) ->
     Shards = mem3:shards(DbName),
     Notifiers = start_update_notifiers(Shards),
     MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- 
Notifiers]),
     RexiMon = rexi_monitor:start(MonRefs),
-    MonPid = start_cleanup_monitor(self(), Notifiers),
+    MonPid = start_cleanup_monitor(self(), Notifiers, ClientSock),
     %% This is not a common pattern for rexi but to enable the calling
     %% process to communicate via handle_message/3 we "fake" it as a
     %% a spawned worker.
@@ -97,16 +97,17 @@ handle_db_event(_DbName, deleted, St) ->
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
 
-start_cleanup_monitor(Parent, Notifiers) ->
+start_cleanup_monitor(Parent, Notifiers, ClientSock) ->
     spawn(fun() ->
         Ref = erlang:monitor(process, Parent),
-        cleanup_monitor(Parent, Ref, Notifiers)
+        cleanup_monitor(Parent, Ref, Notifiers, ClientSock)
     end).
 
 stop_cleanup_monitor(MonPid) ->
     MonPid ! {self(), stop}.
 
-cleanup_monitor(Parent, Ref, Notifiers) ->
+cleanup_monitor(Parent, Ref, Notifiers, ClientSock) ->
+    CheckMSec = chttpd_util:mochiweb_socket_check_msec(),
     receive
         {'DOWN', Ref, _, _, _} ->
             stop_update_notifiers(Notifiers);
@@ -116,6 +117,9 @@ cleanup_monitor(Parent, Ref, Notifiers) ->
             couch_log:error("Unkown message in ~w :: ~w", [?MODULE, Else]),
             stop_update_notifiers(Notifiers),
             exit(Parent, {unknown_message, Else})
+    after CheckMSec ->
+        chttpd_util:stop_client_process_if_disconnected(Parent, ClientSock),
+        cleanup_monitor(Parent, Ref, Notifiers, ClientSock)
     end.
 
 stop_update_notifiers(Notifiers) ->
diff --git a/src/fabric/src/fabric_streams.erl 
b/src/fabric/src/fabric_streams.erl
index 318824814..f4144666c 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -43,7 +43,8 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
         replacements = Replacements,
         ring_opts = RingOpts
     },
-    spawn_worker_cleaner(self(), Workers0),
+    ClientSock = chttpd_util:mochiweb_socket_get(),
+    spawn_worker_cleaner(self(), Workers0, ClientSock),
     Timeout = fabric_util:request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{ready = Workers}} ->
@@ -155,12 +156,12 @@ handle_stream_start(Else, _, _) ->
 % Spawn an auxiliary rexi worker cleaner. This will be used in cases
 % when the coordinator (request) process is forceably killed and doesn't
 % get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
+spawn_worker_cleaner(Coordinator, Workers, ClientSock) ->
     case get(?WORKER_CLEANER) of
         undefined ->
             Pid = spawn(fun() ->
                 erlang:monitor(process, Coordinator),
-                cleaner_loop(Coordinator, Workers)
+                cleaner_loop(Coordinator, Workers, ClientSock)
             end),
             put(?WORKER_CLEANER, Pid),
             Pid;
@@ -168,12 +169,16 @@ spawn_worker_cleaner(Coordinator, Workers) ->
             ExistingCleaner
     end.
 
-cleaner_loop(Pid, Workers) ->
+cleaner_loop(Pid, Workers, ClientSock) ->
+    CheckMSec = chttpd_util:mochiweb_socket_check_msec(),
     receive
         {add_worker, Pid, Worker} ->
-            cleaner_loop(Pid, [Worker | Workers]);
+            cleaner_loop(Pid, [Worker | Workers], ClientSock);
         {'DOWN', _, _, Pid, _} ->
             fabric_util:cleanup(Workers)
+    after CheckMSec ->
+        chttpd_util:stop_client_process_if_disconnected(Pid, ClientSock),
+        cleaner_loop(Pid, Workers, ClientSock)
     end.
 
 add_worker_to_cleaner(CoordinatorPid, Worker) ->
diff --git a/src/fabric/src/fabric_view_changes.erl 
b/src/fabric/src/fabric_view_changes.erl
index 970a200d2..bf0661a17 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -39,11 +39,12 @@ go(DbName, Feed, Options, Callback, Acc0) when
             {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
             Ref = make_ref(),
             Parent = self(),
+            ClientSock = chttpd_util:mochiweb_socket_get(),
             UpdateListener = {
                 spawn_link(
                     fabric_db_update_listener,
                     go,
-                    [Parent, Ref, DbName, Timeout]
+                    [Parent, Ref, DbName, Timeout, ClientSock]
                 ),
                 Ref
             },

Reply via email to