This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new d5701f437 Stop client process and clean up if client disconnects
d5701f437 is described below

commit d5701f437183c3742aab5812e298d5bb80cb0c15
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Wed Aug 23 16:02:39 2023 -0400

    Stop client process and clean up if client disconnects
    
    Previously, when processing long running streaming requests, such as _find 
with
    a highly selective selector, when no rows are emitted for a while, the 
client
    could disconnect but the request process, and all the associated workers on
    remote nodes would continue running, consuming server resources.
    
    We already handle remote (RPC) process cleanup if the coordinator crashes, 
we
    just need a way to kill that coordinator if the connection is closed in the
    case when no data may be emitted for a long time. This is what the current
    commit accomplishes.
    
    It turns out there is no simple way to detect passive mode socket 
disconnects
    in current versions of Erlang/OTP. The socket at the kernel level may be 
closed
    (in close_wait state), however `inet:info/1` will continue reporting it as
    `connected`. The only ways to obtain an accurate connection state is to try 
to
    write, read, or query the TCP info state.
    
    Here we attempt to query the state with tcp_info. Unfortunately, not all
    versions of supported OTP platforms have this option, so it probably never
    became an officially supported inet socket option, so we use `raw` socket
    option for it. However, it turns out most of CouchDB supported platforms do
    have that option. The only platform this is not working currently is 
Windows.
    In that case, or future platform cases we default to the previous behavior,
    assuming the socket is still open.
    
    Co-Authored-By: Robert Newson <[email protected]>
---
 rel/overlay/etc/default.ini                  |   8 +
 src/chttpd/priv/stats_descriptions.cfg       |   4 +
 src/chttpd/src/chttpd.erl                    |   5 +
 src/chttpd/src/chttpd_util.erl               | 125 ++++++++++++++-
 src/chttpd/test/eunit/chttpd_util_test.erl   |  89 +++++++++++
 src/docs/src/config/http.rst                 |  22 +++
 src/fabric/src/fabric_db_update_listener.erl |  16 +-
 src/fabric/src/fabric_streams.erl            | 221 +++++++++++++++++----------
 src/fabric/src/fabric_view_changes.erl       |   3 +-
 9 files changed, 408 insertions(+), 85 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index c3124a643..1c94502b1 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -206,6 +206,14 @@ bind_address = 127.0.0.1
 ; response header.
 ;server_header_versions = true
 
+; How often to check for client disconnects while processing streaming
+; requests such as _all_docs, _find, _changes and views
+;disconnect_check_msec = 30000
+
+; The amount of jitter to apply to the disconnect_check_msec. That's to avoid a
+; stampede in case when there are lot of concurrent clients connecting.
+;disconnect_check_jitter_msec = 15000
+
 ;[jwt_auth]
 ; List of claims to validate
 ; can be the name of a claim like "exp" or a tuple if the claim requires
diff --git a/src/chttpd/priv/stats_descriptions.cfg 
b/src/chttpd/priv/stats_descriptions.cfg
index f54231ce3..901d99a26 100644
--- a/src/chttpd/priv/stats_descriptions.cfg
+++ b/src/chttpd/priv/stats_descriptions.cfg
@@ -18,6 +18,10 @@
     {type, counter},
     {desc, <<"number of aborted requests">>}
 ]}.
+{[couchdb, httpd, abandoned_streaming_requests], [
+    {type, counter},
+    {desc, <<"number of abandoned streaming requests">>}
+]}.
 {[couchdb, dbinfo], [
     {type, histogram},
     {desc, <<"distribution of latencies for calls to retrieve DB info">>}
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index c8e6fdc97..7f432f8f1 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -320,6 +320,9 @@ handle_request_int(MochiReq) ->
     erlang:put(dont_log_request, true),
     erlang:put(dont_log_response, true),
 
+    % Save client socket so that it can be monitored for disconnects
+    chttpd_util:mochiweb_socket_set(MochiReq:get(socket)),
+
     {HttpReq2, Response} =
         case before_request(HttpReq0) of
             {ok, HttpReq1} ->
@@ -328,6 +331,8 @@ handle_request_int(MochiReq) ->
                 {HttpReq0, Response0}
         end,
 
+    chttpd_util:mochiweb_socket_clean(),
+
     {Status, Code, Reason, Resp} = split_response(Response),
 
     HttpResp = #httpd_resp{
diff --git a/src/chttpd/src/chttpd_util.erl b/src/chttpd/src/chttpd_util.erl
index 955beca57..05adef717 100644
--- a/src/chttpd/src/chttpd_util.erl
+++ b/src/chttpd/src/chttpd_util.erl
@@ -22,9 +22,18 @@
     get_chttpd_auth_config_integer/2,
     get_chttpd_auth_config_boolean/2,
     maybe_add_csp_header/3,
-    get_db_info/1
+    get_db_info/1,
+    mochiweb_socket_set/1,
+    mochiweb_socket_clean/0,
+    mochiweb_socket_get/0,
+    mochiweb_socket_check_msec/0,
+    stop_client_process_if_disconnected/2
 ]).
 
+-define(MOCHIWEB_SOCKET, mochiweb_connection_socket).
+-define(DISCONNECT_CHECK_MSEC, 30000).
+-define(DISCONNECT_CHECK_JITTER_MSEC, 15000).
+
 get_chttpd_config(Key) ->
     config:get("chttpd", Key, config:get("httpd", Key)).
 
@@ -110,3 +119,117 @@ get_db_info(DbName) ->
     catch
         _Tag:Error -> {error, Error}
     end.
+
+mochiweb_socket_set(Sock) ->
+    put(?MOCHIWEB_SOCKET, Sock).
+
+mochiweb_socket_clean() ->
+    erase(?MOCHIWEB_SOCKET).
+
+mochiweb_socket_get() ->
+    get(?MOCHIWEB_SOCKET).
+
+mochiweb_socket_check_msec() ->
+    MSec = config:get_integer(
+        "chttpd", "disconnect_check_msec", ?DISCONNECT_CHECK_MSEC
+    ),
+    JitterMSec = config:get_integer(
+        "chttpd", "disconnect_check_jitter_msec", ?DISCONNECT_CHECK_JITTER_MSEC
+    ),
+    max(100, MSec + rand:uniform(max(1, JitterMSec))).
+
+stop_client_process_if_disconnected(Pid, Sock) ->
+    case is_mochiweb_socket_closed(Sock) of
+        true ->
+            exit(Pid, {shutdown, client_disconnected}),
+            couch_stats:increment_counter([couchdb, httpd, 
abandoned_streaming_requests]),
+            ok;
+        false ->
+            ok
+    end.
+
+is_mochiweb_socket_closed(undefined) ->
+    false;
+is_mochiweb_socket_closed(Sock) ->
+    OsType = os:type(),
+    case tcp_info_opt(OsType) of
+        {raw, _, _, _} = InfoOpt ->
+            case mochiweb_socket:getopts(Sock, [InfoOpt]) of
+                {ok, [{raw, _, _, <<State:8/native, _/binary>>}]} ->
+                    tcp_is_closed(State, OsType);
+                {ok, []} ->
+                    false;
+                {error, einval} ->
+                    % Already cleaned up
+                    true;
+                {error, _} ->
+                    false
+            end;
+        undefined ->
+            false
+    end.
+
+% All OS-es have the tcpi_state (uint8) as first member of tcp_info struct
+
+tcp_info_opt({unix, linux}) ->
+    %% netinet/in.h
+    %%   IPPROTO_TCP = 6
+    %%
+    %% netinet/tcp.h
+    %%   #define TCP_INFO 11
+    %%
+    {raw, 6, 11, 1};
+tcp_info_opt({unix, darwin}) ->
+    %% netinet/in.h
+    %%   #define IPPROTO_TCP   6
+    %%
+    %% netinet/tcp.h
+    %%   #define TCP_CONNECTION_INFO  0x106
+    %%
+    {raw, 6, 16#106, 1};
+tcp_info_opt({unix, freebsd}) ->
+    %% sys/netinet/in.h
+    %%   #define  IPPROTO_TCP  6
+    %%
+    %% sys/netinet/tcp.h
+    %%   #define  TCP_INFO    32
+    %%
+    {raw, 6, 32, 1};
+tcp_info_opt({_, _}) ->
+    undefined.
+
+tcp_is_closed(State, {unix, linux}) ->
+    %% netinet/tcp.h
+    %%   enum
+    %%   {
+    %%     TCP_ESTABLISHED = 1,
+    %%     TCP_SYN_SENT,
+    %%     TCP_SYN_RECV,
+    %%     TCP_FIN_WAIT1,
+    %%     TCP_FIN_WAIT2,
+    %%     TCP_TIME_WAIT,
+    %%     TCP_CLOSE,
+    %%     TCP_CLOSE_WAIT,
+    %%     TCP_LAST_ACK,
+    %%     TCP_LISTEN,
+    %%     TCP_CLOSING
+    %%   }
+    %%
+    lists:member(State, [4, 5, 6, 7, 8, 9, 11]);
+tcp_is_closed(State, {unix, Type}) when Type =:= darwin; Type =:= freebsd ->
+    %% tcp_fsm.h states are the same on macos and freebsd
+    %%
+    %% netinet/tcp_fsm.h
+    %%   #define TCPS_CLOSED             0       /* closed */
+    %%   #define TCPS_LISTEN             1       /* listening for connection */
+    %%   #define TCPS_SYN_SENT           2       /* active, have sent syn */
+    %%   #define TCPS_SYN_RECEIVED       3       /* have send and received syn 
*/
+    %%   #define TCPS_ESTABLISHED        4       /* established */
+    %%   #define TCPS_CLOSE_WAIT         5       /* rcvd fin, waiting for 
close */
+    %%   #define TCPS_FIN_WAIT_1         6       /* have closed, sent fin */
+    %%   #define TCPS_CLOSING            7       /* closed xchd FIN; await FIN 
ACK */
+    %%   #define TCPS_LAST_ACK           8       /* had fin and close; await 
FIN ACK */
+    %%   #define TCPS_FIN_WAIT_2         9       /* have closed, fin is acked 
*/
+    %%   #define TCPS_TIME_WAIT          10      /* in 2*msl quiet wait after 
close */
+    %%
+    lists:member(State, [0, 5, 6, 7, 8, 9, 10]).
diff --git a/src/chttpd/test/eunit/chttpd_util_test.erl 
b/src/chttpd/test/eunit/chttpd_util_test.erl
index c381dac6e..76edffeac 100644
--- a/src/chttpd/test/eunit/chttpd_util_test.erl
+++ b/src/chttpd/test/eunit/chttpd_util_test.erl
@@ -113,3 +113,92 @@ test_auth_with_undefined_option(_) ->
     ?assertEqual("", chttpd_util:get_chttpd_auth_config("undefine", "")),
     ?assert(chttpd_util:get_chttpd_auth_config("undefine", true)),
     ?assertNot(chttpd_util:get_chttpd_auth_config("undefine", false)).
+
+chttpd_util_client_socker_monitor_test_() ->
+    {
+        setup,
+        fun test_util:start_couch/0,
+        fun test_util:stop_couch/1,
+        with([
+            ?TDEF(t_socket_set_get_clean),
+            ?TDEF(t_socket_check_config),
+            ?TDEF(t_closed_socket_kills_coordinator)
+        ])
+    }.
+
+t_socket_set_get_clean(_) ->
+    ?assertEqual(undefined, chttpd_util:mochiweb_socket_get()),
+    {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+    chttpd_util:mochiweb_socket_set(Sock),
+    ?assertEqual(Sock, chttpd_util:mochiweb_socket_get()),
+    chttpd_util:mochiweb_socket_clean(),
+    ?assertEqual(undefined, chttpd_util:mochiweb_socket_get()),
+    gen_tcp:close(Sock).
+
+t_socket_check_config(_) ->
+    config:set("chttpd", "disconnect_check_msec", "100", false),
+    config:set("chttpd", "disconnect_check_jitter_msec", "50", false),
+    lists:foreach(
+        fun(_) ->
+            MSec = chttpd_util:mochiweb_socket_check_msec(),
+            ?assert(is_integer(MSec)),
+            ?assert(MSec >= 100),
+            ?assert(MSec =< 150)
+        end,
+        lists:seq(1, 1000)
+    ),
+    config:delete("chttpd", "disconnect_check_msec", false),
+    config:delete("chttpd", "disconnect_check_jitter_msec", false).
+
+t_closed_socket_kills_coordinator(_) ->
+    {Pid, Ref} = spawn_coord(),
+    {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+
+    % Can call getopts many times in a row process should stay alive
+    lists:foreach(
+        fun(_) ->
+            ok = chttpd_util:stop_client_process_if_disconnected(Pid, Sock)
+        end,
+        lists:seq(1, 10000)
+    ),
+    ?assert(is_process_alive(Pid)),
+
+    gen_tcp:close(Sock),
+
+    ?assertEqual(ok, chttpd_util:stop_client_process_if_disconnected(Pid, 
Sock)),
+    case tcp_info_works() of
+        true ->
+            ?assertEqual({shutdown, client_disconnected}, 
wait_coord_death(Ref));
+        false ->
+            ?assert(is_process_alive(Pid)),
+            % Kill it manually
+            exit(Pid, kill)
+    end,
+
+    % Can call stop_client_... even if process may be dead and the socket is 
closed
+    lists:foreach(
+        fun(_) ->
+            ok = chttpd_util:stop_client_process_if_disconnected(Pid, Sock)
+        end,
+        lists:seq(1, 10000)
+    ).
+
+spawn_coord() ->
+    spawn_monitor(fun() ->
+        receive
+            die -> ok
+        end
+    end).
+
+wait_coord_death(Ref) ->
+    receive
+        {'DOWN', Ref, _, _, Reason} -> Reason
+    end.
+
+tcp_info_works() ->
+    case os:type() of
+        {unix, OsName} ->
+            lists:member(OsName, [linux, freebsd, darwin]);
+        {_, _} ->
+            false
+    end.
diff --git a/src/docs/src/config/http.rst b/src/docs/src/config/http.rst
index cbe36c4e3..334ce50b5 100644
--- a/src/docs/src/config/http.rst
+++ b/src/docs/src/config/http.rst
@@ -266,6 +266,28 @@ HTTP Server Options
             [chttpd]
             server_header_versions = true
 
+    .. config:option:: disconnect_check_msec :: Client disconnection check 
interval
+
+        .. versionadded:: 3.4
+
+        How often, in milliseconds, to check for client disconnects while
+        processing streaming requests such as _all_docs, _find, _changes and
+        views. ::
+
+            [chttpd]
+            disconnect_check_msec = 30000
+
+    .. config:option:: disconnect_check_jitter_msec :: Client disconnection 
check jitter
+
+        .. versionadded:: 3.4
+
+        How much random jitter to apply to the ``disconnect_check_msec``
+        period. This is to avoid stampede in case of a large number of
+        concurrent clients. ::
+
+            [chttpd]
+            disconnect_check_jitter_msec = 15000
+
 .. config:section:: httpd :: HTTP Server Options
 
     .. versionchanged:: 3.2 These options were moved to [chttpd] section:
diff --git a/src/fabric/src/fabric_db_update_listener.erl 
b/src/fabric/src/fabric_db_update_listener.erl
index 78ccf5a4d..d4a49f37d 100644
--- a/src/fabric/src/fabric_db_update_listener.erl
+++ b/src/fabric/src/fabric_db_update_listener.erl
@@ -12,7 +12,7 @@
 
 -module(fabric_db_update_listener).
 
--export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).
+-export([go/5, start_update_notifier/1, stop/1, wait_db_updated/1]).
 -export([handle_db_event/3]).
 
 -include_lib("fabric/include/fabric.hrl").
@@ -36,12 +36,12 @@
     shards
 }).
 
-go(Parent, ParentRef, DbName, Timeout) ->
+go(Parent, ParentRef, DbName, Timeout, ClientSock) ->
     Shards = mem3:shards(DbName),
     Notifiers = start_update_notifiers(Shards),
     MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- 
Notifiers]),
     RexiMon = rexi_monitor:start(MonRefs),
-    MonPid = start_cleanup_monitor(self(), Notifiers),
+    MonPid = start_cleanup_monitor(self(), Notifiers, ClientSock),
     %% This is not a common pattern for rexi but to enable the calling
     %% process to communicate via handle_message/3 we "fake" it as a
     %% a spawned worker.
@@ -97,16 +97,17 @@ handle_db_event(_DbName, deleted, St) ->
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
 
-start_cleanup_monitor(Parent, Notifiers) ->
+start_cleanup_monitor(Parent, Notifiers, ClientSock) ->
     spawn(fun() ->
         Ref = erlang:monitor(process, Parent),
-        cleanup_monitor(Parent, Ref, Notifiers)
+        cleanup_monitor(Parent, Ref, Notifiers, ClientSock)
     end).
 
 stop_cleanup_monitor(MonPid) ->
     MonPid ! {self(), stop}.
 
-cleanup_monitor(Parent, Ref, Notifiers) ->
+cleanup_monitor(Parent, Ref, Notifiers, ClientSock) ->
+    CheckMSec = chttpd_util:mochiweb_socket_check_msec(),
     receive
         {'DOWN', Ref, _, _, _} ->
             stop_update_notifiers(Notifiers);
@@ -116,6 +117,9 @@ cleanup_monitor(Parent, Ref, Notifiers) ->
             couch_log:error("Unkown message in ~w :: ~w", [?MODULE, Else]),
             stop_update_notifiers(Notifiers),
             exit(Parent, {unknown_message, Else})
+    after CheckMSec ->
+        chttpd_util:stop_client_process_if_disconnected(Parent, ClientSock),
+        cleanup_monitor(Parent, Ref, Notifiers, ClientSock)
     end.
 
 stop_update_notifiers(Notifiers) ->
diff --git a/src/fabric/src/fabric_streams.erl 
b/src/fabric/src/fabric_streams.erl
index 318824814..b1d3e4719 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -43,7 +43,8 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
         replacements = Replacements,
         ring_opts = RingOpts
     },
-    spawn_worker_cleaner(self(), Workers0),
+    ClientSock = chttpd_util:mochiweb_socket_get(),
+    spawn_worker_cleaner(self(), Workers0, ClientSock),
     Timeout = fabric_util:request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{ready = Workers}} ->
@@ -155,12 +156,12 @@ handle_stream_start(Else, _, _) ->
 % Spawn an auxiliary rexi worker cleaner. This will be used in cases
 % when the coordinator (request) process is forceably killed and doesn't
 % get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
+spawn_worker_cleaner(Coordinator, Workers, ClientSock) ->
     case get(?WORKER_CLEANER) of
         undefined ->
             Pid = spawn(fun() ->
                 erlang:monitor(process, Coordinator),
-                cleaner_loop(Coordinator, Workers)
+                cleaner_loop(Coordinator, Workers, ClientSock)
             end),
             put(?WORKER_CLEANER, Pid),
             Pid;
@@ -168,12 +169,16 @@ spawn_worker_cleaner(Coordinator, Workers) ->
             ExistingCleaner
     end.
 
-cleaner_loop(Pid, Workers) ->
+cleaner_loop(Pid, Workers, ClientSock) ->
+    CheckMSec = chttpd_util:mochiweb_socket_check_msec(),
     receive
         {add_worker, Pid, Worker} ->
-            cleaner_loop(Pid, [Worker | Workers]);
+            cleaner_loop(Pid, [Worker | Workers], ClientSock);
         {'DOWN', _, _, Pid, _} ->
             fabric_util:cleanup(Workers)
+    after CheckMSec ->
+        chttpd_util:stop_client_process_if_disconnected(Pid, ClientSock),
+        cleaner_loop(Pid, Workers, ClientSock)
     end.
 
 add_worker_to_cleaner(CoordinatorPid, Worker) ->
@@ -186,96 +191,158 @@ add_worker_to_cleaner(CoordinatorPid, Worker) ->
 
 -ifdef(TEST).
 
--include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
 
 worker_cleaner_test_() ->
     {
         "Fabric spawn_worker_cleaner test",
         {
-            setup,
+            foreach,
             fun setup/0,
             fun teardown/1,
-            fun(_) ->
-                [
-                    should_clean_workers(),
-                    does_not_fire_if_cleanup_called(),
-                    should_clean_additional_worker_too()
-                ]
-            end
+            [
+                ?TDEF_FE(should_clean_workers),
+                ?TDEF_FE(does_not_fire_if_cleanup_called),
+                ?TDEF_FE(should_clean_additional_worker_too),
+                ?TDEF_FE(coordinator_is_killed_if_client_disconnects),
+                ?TDEF_FE(coordinator_is_not_killed_if_client_is_connected)
+            ]
         }
     }.
 
-should_clean_workers() ->
-    ?_test(begin
-        meck:reset(rexi),
-        erase(?WORKER_CLEANER),
-        Workers = [
-            #shard{node = 'n1', ref = make_ref()},
-            #shard{node = 'n2', ref = make_ref()}
-        ],
-        {Coord, _} = spawn_monitor(fun() ->
-            receive
-                die -> ok
-            end
-        end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
-        Coord ! die,
+should_clean_workers(_) ->
+    meck:reset(rexi),
+    erase(?WORKER_CLEANER),
+    Workers = [
+        #shard{node = 'n1', ref = make_ref()},
+        #shard{node = 'n2', ref = make_ref()}
+    ],
+    {Coord, _} = spawn_monitor(fun() ->
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
-        end,
-        ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
-    end).
+            die -> ok
+        end
+    end),
+    Cleaner = spawn_worker_cleaner(Coord, Workers, undefined),
+    Ref = erlang:monitor(process, Cleaner),
+    Coord ! die,
+    receive
+        {'DOWN', Ref, _, Cleaner, _} -> ok
+    end,
+    ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)).
 
-does_not_fire_if_cleanup_called() ->
-    ?_test(begin
-        meck:reset(rexi),
-        erase(?WORKER_CLEANER),
-        Workers = [
-            #shard{node = 'n1', ref = make_ref()},
-            #shard{node = 'n2', ref = make_ref()}
-        ],
-        {Coord, _} = spawn_monitor(fun() ->
-            receive
-                die -> ok
-            end
-        end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
-        cleanup(Workers),
-        Coord ! die,
+does_not_fire_if_cleanup_called(_) ->
+    meck:reset(rexi),
+    erase(?WORKER_CLEANER),
+    Workers = [
+        #shard{node = 'n1', ref = make_ref()},
+        #shard{node = 'n2', ref = make_ref()}
+    ],
+    {Coord, _} = spawn_monitor(fun() ->
+        receive
+            die -> ok
+        end
+    end),
+    Cleaner = spawn_worker_cleaner(Coord, Workers, undefined),
+    Ref = erlang:monitor(process, Cleaner),
+    cleanup(Workers),
+    Coord ! die,
+    receive
+        {'DOWN', Ref, _, _, _} -> ok
+    end,
+    % 2 calls would be from cleanup/1 function. If cleanup process fired
+    % too it would have been 4 calls total.
+    ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)).
+
+should_clean_additional_worker_too(_) ->
+    meck:reset(rexi),
+    erase(?WORKER_CLEANER),
+    Workers = [
+        #shard{node = 'n1', ref = make_ref()}
+    ],
+    {Coord, _} = spawn_monitor(fun() ->
         receive
-            {'DOWN', Ref, _, _, _} -> ok
-        end,
-        % 2 calls would be from cleanup/1 function. If cleanup process fired
-        % too it would have been 4 calls total.
-        ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
-    end).
+            die -> ok
+        end
+    end),
+    Cleaner = spawn_worker_cleaner(Coord, Workers, undefined),
+    add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
+    Ref = erlang:monitor(process, Cleaner),
+    Coord ! die,
+    receive
+        {'DOWN', Ref, _, Cleaner, _} -> ok
+    end,
+    ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)).
 
-should_clean_additional_worker_too() ->
-    ?_test(begin
-        meck:reset(rexi),
-        erase(?WORKER_CLEANER),
-        Workers = [
-            #shard{node = 'n1', ref = make_ref()}
-        ],
-        {Coord, _} = spawn_monitor(fun() ->
+coordinator_is_killed_if_client_disconnects(_) ->
+    meck:reset(rexi),
+    erase(?WORKER_CLEANER),
+    Workers = [
+        #shard{node = 'n1', ref = make_ref()},
+        #shard{node = 'n2', ref = make_ref()}
+    ],
+    {Coord, CoordRef} = spawn_monitor(fun() ->
+        receive
+            die -> ok
+        end
+    end),
+    {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+    % Close the socket and then expect coordinator to be killed
+    ok = gen_tcp:close(Sock),
+    Cleaner = spawn_worker_cleaner(Coord, Workers, Sock),
+    CleanerRef = erlang:monitor(process, Cleaner),
+    % Assert the correct behavior on the support platforms (all except Windows 
so far)
+    case os:type() of
+        {unix, Type} when Type =:= linux; Type =:= darwin; Type =:= freebsd ->
+            % Coordinator should be torn down
             receive
-                die -> ok
-            end
-        end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
-        Ref = erlang:monitor(process, Cleaner),
-        Coord ! die,
+                {'DOWN', CoordRef, _, _, Reason} ->
+                    ?assertEqual({shutdown, client_disconnected}, Reason)
+            end,
+            % Cleaner process itself should exit
+            receive
+                {'DOWN', CleanerRef, _, _, _} -> ok
+            end,
+            % Workers should have been killed
+            ?assertEqual(1, meck:num_calls(rexi, kill_all, 1));
+        {_, _} = OsType ->
+            ?debugFmt("~n * Client disconnect test not yet supported on ~p~n", 
[OsType])
+    end.
+
+coordinator_is_not_killed_if_client_is_connected(_) ->
+    meck:reset(rexi),
+    erase(?WORKER_CLEANER),
+    Workers = [
+        #shard{node = 'n1', ref = make_ref()},
+        #shard{node = 'n2', ref = make_ref()}
+    ],
+    {Coord, CoordRef} = spawn_monitor(fun() ->
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
-        end,
-        ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
-    end).
+            die -> ok
+        end
+    end),
+    {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+    Cleaner = spawn_worker_cleaner(Coord, Workers, Sock),
+    CleanerRef = erlang:monitor(process, Cleaner),
+    % Coordinator should stay up
+    receive
+        {'DOWN', CoordRef, _, Coord, _} ->
+            ?assert(false, {unexpected_coordinator_exit, Coord})
+    after 1000 ->
+        ?assert(is_process_alive(Coord))
+    end,
+    % Cleaner process stays up
+    ?assert(is_process_alive(Cleaner)),
+    % Tear everything down at the end of the test
+    gen_tcp:close(Sock),
+    Coord ! die,
+    receive
+        {'DOWN', CleanerRef, _, _, _} -> ok
+    end.
 
 setup() ->
-    ok = meck:expect(rexi, kill_all, fun(_) -> ok end).
+    ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
+    % Speed up disconnect socket timeout for the test to 200 msec
+    ok = meck:expect(chttpd_util, mochiweb_socket_check_msec, 0, 200).
 
 teardown(_) ->
     meck:unload().
diff --git a/src/fabric/src/fabric_view_changes.erl 
b/src/fabric/src/fabric_view_changes.erl
index 970a200d2..bf0661a17 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -39,11 +39,12 @@ go(DbName, Feed, Options, Callback, Acc0) when
             {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
             Ref = make_ref(),
             Parent = self(),
+            ClientSock = chttpd_util:mochiweb_socket_get(),
             UpdateListener = {
                 spawn_link(
                     fabric_db_update_listener,
                     go,
-                    [Parent, Ref, DbName, Timeout]
+                    [Parent, Ref, DbName, Timeout, ClientSock]
                 ),
                 Ref
             },

Reply via email to