This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new d5701f437 Stop client process and clean up if client disconnects
d5701f437 is described below
commit d5701f437183c3742aab5812e298d5bb80cb0c15
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Wed Aug 23 16:02:39 2023 -0400
Stop client process and clean up if client disconnects
Previously, when processing long running streaming requests, such as _find
with
a highly selective selector, when no rows are emitted for a while, the
client
could disconnect but the request process, and all the associated workers on
remote nodes would continue running, consuming server resources.
We already handle remote (RPC) process cleanup if the coordinator crashes,
we
just need a way to kill that coordinator if the connection is closed in the
case when no data may be emitted for a long time. This is what the current
commit accomplishes.
It turns out there is no simple way to detect passive mode socket
disconnects
in current versions of Erlang/OTP. The socket at the kernel level may be
closed
(in close_wait state), however `inet:info/1` will continue reporting it as
`connected`. The only ways to obtain an accurate connection state is to try
to
write, read, or query the TCP info state.
Here we attempt to query the state with tcp_info. Unfortunately, not all
versions of supported OTP platforms have this option, so it probably never
became an officially supported inet socket option, so we use `raw` socket
option for it. However, it turns out most of CouchDB supported platforms do
have that option. The only platform this is not working currently is
Windows.
In that case, or future platform cases we default to the previous behavior,
assuming the socket is still open.
Co-Authored-By: Robert Newson <[email protected]>
---
rel/overlay/etc/default.ini | 8 +
src/chttpd/priv/stats_descriptions.cfg | 4 +
src/chttpd/src/chttpd.erl | 5 +
src/chttpd/src/chttpd_util.erl | 125 ++++++++++++++-
src/chttpd/test/eunit/chttpd_util_test.erl | 89 +++++++++++
src/docs/src/config/http.rst | 22 +++
src/fabric/src/fabric_db_update_listener.erl | 16 +-
src/fabric/src/fabric_streams.erl | 221 +++++++++++++++++----------
src/fabric/src/fabric_view_changes.erl | 3 +-
9 files changed, 408 insertions(+), 85 deletions(-)
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index c3124a643..1c94502b1 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -206,6 +206,14 @@ bind_address = 127.0.0.1
; response header.
;server_header_versions = true
+; How often to check for client disconnects while processing streaming
+; requests such as _all_docs, _find, _changes and views
+;disconnect_check_msec = 30000
+
+; The amount of jitter to apply to the disconnect_check_msec. That's to avoid a
+; stampede in case when there are lot of concurrent clients connecting.
+;disconnect_check_jitter_msec = 15000
+
;[jwt_auth]
; List of claims to validate
; can be the name of a claim like "exp" or a tuple if the claim requires
diff --git a/src/chttpd/priv/stats_descriptions.cfg
b/src/chttpd/priv/stats_descriptions.cfg
index f54231ce3..901d99a26 100644
--- a/src/chttpd/priv/stats_descriptions.cfg
+++ b/src/chttpd/priv/stats_descriptions.cfg
@@ -18,6 +18,10 @@
{type, counter},
{desc, <<"number of aborted requests">>}
]}.
+{[couchdb, httpd, abandoned_streaming_requests], [
+ {type, counter},
+ {desc, <<"number of abandoned streaming requests">>}
+]}.
{[couchdb, dbinfo], [
{type, histogram},
{desc, <<"distribution of latencies for calls to retrieve DB info">>}
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index c8e6fdc97..7f432f8f1 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -320,6 +320,9 @@ handle_request_int(MochiReq) ->
erlang:put(dont_log_request, true),
erlang:put(dont_log_response, true),
+ % Save client socket so that it can be monitored for disconnects
+ chttpd_util:mochiweb_socket_set(MochiReq:get(socket)),
+
{HttpReq2, Response} =
case before_request(HttpReq0) of
{ok, HttpReq1} ->
@@ -328,6 +331,8 @@ handle_request_int(MochiReq) ->
{HttpReq0, Response0}
end,
+ chttpd_util:mochiweb_socket_clean(),
+
{Status, Code, Reason, Resp} = split_response(Response),
HttpResp = #httpd_resp{
diff --git a/src/chttpd/src/chttpd_util.erl b/src/chttpd/src/chttpd_util.erl
index 955beca57..05adef717 100644
--- a/src/chttpd/src/chttpd_util.erl
+++ b/src/chttpd/src/chttpd_util.erl
@@ -22,9 +22,18 @@
get_chttpd_auth_config_integer/2,
get_chttpd_auth_config_boolean/2,
maybe_add_csp_header/3,
- get_db_info/1
+ get_db_info/1,
+ mochiweb_socket_set/1,
+ mochiweb_socket_clean/0,
+ mochiweb_socket_get/0,
+ mochiweb_socket_check_msec/0,
+ stop_client_process_if_disconnected/2
]).
+-define(MOCHIWEB_SOCKET, mochiweb_connection_socket).
+-define(DISCONNECT_CHECK_MSEC, 30000).
+-define(DISCONNECT_CHECK_JITTER_MSEC, 15000).
+
get_chttpd_config(Key) ->
config:get("chttpd", Key, config:get("httpd", Key)).
@@ -110,3 +119,117 @@ get_db_info(DbName) ->
catch
_Tag:Error -> {error, Error}
end.
+
+mochiweb_socket_set(Sock) ->
+ put(?MOCHIWEB_SOCKET, Sock).
+
+mochiweb_socket_clean() ->
+ erase(?MOCHIWEB_SOCKET).
+
+mochiweb_socket_get() ->
+ get(?MOCHIWEB_SOCKET).
+
+mochiweb_socket_check_msec() ->
+ MSec = config:get_integer(
+ "chttpd", "disconnect_check_msec", ?DISCONNECT_CHECK_MSEC
+ ),
+ JitterMSec = config:get_integer(
+ "chttpd", "disconnect_check_jitter_msec", ?DISCONNECT_CHECK_JITTER_MSEC
+ ),
+ max(100, MSec + rand:uniform(max(1, JitterMSec))).
+
+stop_client_process_if_disconnected(Pid, Sock) ->
+ case is_mochiweb_socket_closed(Sock) of
+ true ->
+ exit(Pid, {shutdown, client_disconnected}),
+ couch_stats:increment_counter([couchdb, httpd,
abandoned_streaming_requests]),
+ ok;
+ false ->
+ ok
+ end.
+
+is_mochiweb_socket_closed(undefined) ->
+ false;
+is_mochiweb_socket_closed(Sock) ->
+ OsType = os:type(),
+ case tcp_info_opt(OsType) of
+ {raw, _, _, _} = InfoOpt ->
+ case mochiweb_socket:getopts(Sock, [InfoOpt]) of
+ {ok, [{raw, _, _, <<State:8/native, _/binary>>}]} ->
+ tcp_is_closed(State, OsType);
+ {ok, []} ->
+ false;
+ {error, einval} ->
+ % Already cleaned up
+ true;
+ {error, _} ->
+ false
+ end;
+ undefined ->
+ false
+ end.
+
+% All OS-es have the tcpi_state (uint8) as first member of tcp_info struct
+
+tcp_info_opt({unix, linux}) ->
+ %% netinet/in.h
+ %% IPPROTO_TCP = 6
+ %%
+ %% netinet/tcp.h
+ %% #define TCP_INFO 11
+ %%
+ {raw, 6, 11, 1};
+tcp_info_opt({unix, darwin}) ->
+ %% netinet/in.h
+ %% #define IPPROTO_TCP 6
+ %%
+ %% netinet/tcp.h
+ %% #define TCP_CONNECTION_INFO 0x106
+ %%
+ {raw, 6, 16#106, 1};
+tcp_info_opt({unix, freebsd}) ->
+ %% sys/netinet/in.h
+ %% #define IPPROTO_TCP 6
+ %%
+ %% sys/netinet/tcp.h
+ %% #define TCP_INFO 32
+ %%
+ {raw, 6, 32, 1};
+tcp_info_opt({_, _}) ->
+ undefined.
+
+tcp_is_closed(State, {unix, linux}) ->
+ %% netinet/tcp.h
+ %% enum
+ %% {
+ %% TCP_ESTABLISHED = 1,
+ %% TCP_SYN_SENT,
+ %% TCP_SYN_RECV,
+ %% TCP_FIN_WAIT1,
+ %% TCP_FIN_WAIT2,
+ %% TCP_TIME_WAIT,
+ %% TCP_CLOSE,
+ %% TCP_CLOSE_WAIT,
+ %% TCP_LAST_ACK,
+ %% TCP_LISTEN,
+ %% TCP_CLOSING
+ %% }
+ %%
+ lists:member(State, [4, 5, 6, 7, 8, 9, 11]);
+tcp_is_closed(State, {unix, Type}) when Type =:= darwin; Type =:= freebsd ->
+ %% tcp_fsm.h states are the same on macos and freebsd
+ %%
+ %% netinet/tcp_fsm.h
+ %% #define TCPS_CLOSED 0 /* closed */
+ %% #define TCPS_LISTEN 1 /* listening for connection */
+ %% #define TCPS_SYN_SENT 2 /* active, have sent syn */
+ %% #define TCPS_SYN_RECEIVED 3 /* have send and received syn
*/
+ %% #define TCPS_ESTABLISHED 4 /* established */
+ %% #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for
close */
+ %% #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */
+ %% #define TCPS_CLOSING 7 /* closed xchd FIN; await FIN
ACK */
+ %% #define TCPS_LAST_ACK 8 /* had fin and close; await
FIN ACK */
+ %% #define TCPS_FIN_WAIT_2 9 /* have closed, fin is acked
*/
+ %% #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after
close */
+ %%
+ lists:member(State, [0, 5, 6, 7, 8, 9, 10]).
diff --git a/src/chttpd/test/eunit/chttpd_util_test.erl
b/src/chttpd/test/eunit/chttpd_util_test.erl
index c381dac6e..76edffeac 100644
--- a/src/chttpd/test/eunit/chttpd_util_test.erl
+++ b/src/chttpd/test/eunit/chttpd_util_test.erl
@@ -113,3 +113,92 @@ test_auth_with_undefined_option(_) ->
?assertEqual("", chttpd_util:get_chttpd_auth_config("undefine", "")),
?assert(chttpd_util:get_chttpd_auth_config("undefine", true)),
?assertNot(chttpd_util:get_chttpd_auth_config("undefine", false)).
+
+chttpd_util_client_socker_monitor_test_() ->
+ {
+ setup,
+ fun test_util:start_couch/0,
+ fun test_util:stop_couch/1,
+ with([
+ ?TDEF(t_socket_set_get_clean),
+ ?TDEF(t_socket_check_config),
+ ?TDEF(t_closed_socket_kills_coordinator)
+ ])
+ }.
+
+t_socket_set_get_clean(_) ->
+ ?assertEqual(undefined, chttpd_util:mochiweb_socket_get()),
+ {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+ chttpd_util:mochiweb_socket_set(Sock),
+ ?assertEqual(Sock, chttpd_util:mochiweb_socket_get()),
+ chttpd_util:mochiweb_socket_clean(),
+ ?assertEqual(undefined, chttpd_util:mochiweb_socket_get()),
+ gen_tcp:close(Sock).
+
+t_socket_check_config(_) ->
+ config:set("chttpd", "disconnect_check_msec", "100", false),
+ config:set("chttpd", "disconnect_check_jitter_msec", "50", false),
+ lists:foreach(
+ fun(_) ->
+ MSec = chttpd_util:mochiweb_socket_check_msec(),
+ ?assert(is_integer(MSec)),
+ ?assert(MSec >= 100),
+ ?assert(MSec =< 150)
+ end,
+ lists:seq(1, 1000)
+ ),
+ config:delete("chttpd", "disconnect_check_msec", false),
+ config:delete("chttpd", "disconnect_check_jitter_msec", false).
+
+t_closed_socket_kills_coordinator(_) ->
+ {Pid, Ref} = spawn_coord(),
+ {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+
+ % Can call getopts many times in a row process should stay alive
+ lists:foreach(
+ fun(_) ->
+ ok = chttpd_util:stop_client_process_if_disconnected(Pid, Sock)
+ end,
+ lists:seq(1, 10000)
+ ),
+ ?assert(is_process_alive(Pid)),
+
+ gen_tcp:close(Sock),
+
+ ?assertEqual(ok, chttpd_util:stop_client_process_if_disconnected(Pid,
Sock)),
+ case tcp_info_works() of
+ true ->
+ ?assertEqual({shutdown, client_disconnected},
wait_coord_death(Ref));
+ false ->
+ ?assert(is_process_alive(Pid)),
+ % Kill it manually
+ exit(Pid, kill)
+ end,
+
+ % Can call stop_client_... even if process may be dead and the socket is
closed
+ lists:foreach(
+ fun(_) ->
+ ok = chttpd_util:stop_client_process_if_disconnected(Pid, Sock)
+ end,
+ lists:seq(1, 10000)
+ ).
+
+spawn_coord() ->
+ spawn_monitor(fun() ->
+ receive
+ die -> ok
+ end
+ end).
+
+wait_coord_death(Ref) ->
+ receive
+ {'DOWN', Ref, _, _, Reason} -> Reason
+ end.
+
+tcp_info_works() ->
+ case os:type() of
+ {unix, OsName} ->
+ lists:member(OsName, [linux, freebsd, darwin]);
+ {_, _} ->
+ false
+ end.
diff --git a/src/docs/src/config/http.rst b/src/docs/src/config/http.rst
index cbe36c4e3..334ce50b5 100644
--- a/src/docs/src/config/http.rst
+++ b/src/docs/src/config/http.rst
@@ -266,6 +266,28 @@ HTTP Server Options
[chttpd]
server_header_versions = true
+ .. config:option:: disconnect_check_msec :: Client disconnection check
interval
+
+ .. versionadded:: 3.4
+
+ How often, in milliseconds, to check for client disconnects while
+ processing streaming requests such as _all_docs, _find, _changes and
+ views. ::
+
+ [chttpd]
+ disconnect_check_msec = 30000
+
+ .. config:option:: disconnect_check_jitter_msec :: Client disconnection
check jitter
+
+ .. versionadded:: 3.4
+
+ How much random jitter to apply to the ``disconnect_check_msec``
+ period. This is to avoid stampede in case of a large number of
+ concurrent clients. ::
+
+ [chttpd]
+ disconnect_check_jitter_msec = 15000
+
.. config:section:: httpd :: HTTP Server Options
.. versionchanged:: 3.2 These options were moved to [chttpd] section:
diff --git a/src/fabric/src/fabric_db_update_listener.erl
b/src/fabric/src/fabric_db_update_listener.erl
index 78ccf5a4d..d4a49f37d 100644
--- a/src/fabric/src/fabric_db_update_listener.erl
+++ b/src/fabric/src/fabric_db_update_listener.erl
@@ -12,7 +12,7 @@
-module(fabric_db_update_listener).
--export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).
+-export([go/5, start_update_notifier/1, stop/1, wait_db_updated/1]).
-export([handle_db_event/3]).
-include_lib("fabric/include/fabric.hrl").
@@ -36,12 +36,12 @@
shards
}).
-go(Parent, ParentRef, DbName, Timeout) ->
+go(Parent, ParentRef, DbName, Timeout, ClientSock) ->
Shards = mem3:shards(DbName),
Notifiers = start_update_notifiers(Shards),
MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <-
Notifiers]),
RexiMon = rexi_monitor:start(MonRefs),
- MonPid = start_cleanup_monitor(self(), Notifiers),
+ MonPid = start_cleanup_monitor(self(), Notifiers, ClientSock),
%% This is not a common pattern for rexi but to enable the calling
%% process to communicate via handle_message/3 we "fake" it as a
%% a spawned worker.
@@ -97,16 +97,17 @@ handle_db_event(_DbName, deleted, St) ->
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
-start_cleanup_monitor(Parent, Notifiers) ->
+start_cleanup_monitor(Parent, Notifiers, ClientSock) ->
spawn(fun() ->
Ref = erlang:monitor(process, Parent),
- cleanup_monitor(Parent, Ref, Notifiers)
+ cleanup_monitor(Parent, Ref, Notifiers, ClientSock)
end).
stop_cleanup_monitor(MonPid) ->
MonPid ! {self(), stop}.
-cleanup_monitor(Parent, Ref, Notifiers) ->
+cleanup_monitor(Parent, Ref, Notifiers, ClientSock) ->
+ CheckMSec = chttpd_util:mochiweb_socket_check_msec(),
receive
{'DOWN', Ref, _, _, _} ->
stop_update_notifiers(Notifiers);
@@ -116,6 +117,9 @@ cleanup_monitor(Parent, Ref, Notifiers) ->
couch_log:error("Unkown message in ~w :: ~w", [?MODULE, Else]),
stop_update_notifiers(Notifiers),
exit(Parent, {unknown_message, Else})
+ after CheckMSec ->
+ chttpd_util:stop_client_process_if_disconnected(Parent, ClientSock),
+ cleanup_monitor(Parent, Ref, Notifiers, ClientSock)
end.
stop_update_notifiers(Notifiers) ->
diff --git a/src/fabric/src/fabric_streams.erl
b/src/fabric/src/fabric_streams.erl
index 318824814..b1d3e4719 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -43,7 +43,8 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
replacements = Replacements,
ring_opts = RingOpts
},
- spawn_worker_cleaner(self(), Workers0),
+ ClientSock = chttpd_util:mochiweb_socket_get(),
+ spawn_worker_cleaner(self(), Workers0, ClientSock),
Timeout = fabric_util:request_timeout(),
case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
{ok, #stream_acc{ready = Workers}} ->
@@ -155,12 +156,12 @@ handle_stream_start(Else, _, _) ->
% Spawn an auxiliary rexi worker cleaner. This will be used in cases
% when the coordinator (request) process is forceably killed and doesn't
% get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
+spawn_worker_cleaner(Coordinator, Workers, ClientSock) ->
case get(?WORKER_CLEANER) of
undefined ->
Pid = spawn(fun() ->
erlang:monitor(process, Coordinator),
- cleaner_loop(Coordinator, Workers)
+ cleaner_loop(Coordinator, Workers, ClientSock)
end),
put(?WORKER_CLEANER, Pid),
Pid;
@@ -168,12 +169,16 @@ spawn_worker_cleaner(Coordinator, Workers) ->
ExistingCleaner
end.
-cleaner_loop(Pid, Workers) ->
+cleaner_loop(Pid, Workers, ClientSock) ->
+ CheckMSec = chttpd_util:mochiweb_socket_check_msec(),
receive
{add_worker, Pid, Worker} ->
- cleaner_loop(Pid, [Worker | Workers]);
+ cleaner_loop(Pid, [Worker | Workers], ClientSock);
{'DOWN', _, _, Pid, _} ->
fabric_util:cleanup(Workers)
+ after CheckMSec ->
+ chttpd_util:stop_client_process_if_disconnected(Pid, ClientSock),
+ cleaner_loop(Pid, Workers, ClientSock)
end.
add_worker_to_cleaner(CoordinatorPid, Worker) ->
@@ -186,96 +191,158 @@ add_worker_to_cleaner(CoordinatorPid, Worker) ->
-ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
worker_cleaner_test_() ->
{
"Fabric spawn_worker_cleaner test",
{
- setup,
+ foreach,
fun setup/0,
fun teardown/1,
- fun(_) ->
- [
- should_clean_workers(),
- does_not_fire_if_cleanup_called(),
- should_clean_additional_worker_too()
- ]
- end
+ [
+ ?TDEF_FE(should_clean_workers),
+ ?TDEF_FE(does_not_fire_if_cleanup_called),
+ ?TDEF_FE(should_clean_additional_worker_too),
+ ?TDEF_FE(coordinator_is_killed_if_client_disconnects),
+ ?TDEF_FE(coordinator_is_not_killed_if_client_is_connected)
+ ]
}
}.
-should_clean_workers() ->
- ?_test(begin
- meck:reset(rexi),
- erase(?WORKER_CLEANER),
- Workers = [
- #shard{node = 'n1', ref = make_ref()},
- #shard{node = 'n2', ref = make_ref()}
- ],
- {Coord, _} = spawn_monitor(fun() ->
- receive
- die -> ok
- end
- end),
- Cleaner = spawn_worker_cleaner(Coord, Workers),
- Ref = erlang:monitor(process, Cleaner),
- Coord ! die,
+should_clean_workers(_) ->
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()},
+ #shard{node = 'n2', ref = make_ref()}
+ ],
+ {Coord, _} = spawn_monitor(fun() ->
receive
- {'DOWN', Ref, _, Cleaner, _} -> ok
- end,
- ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
- end).
+ die -> ok
+ end
+ end),
+ Cleaner = spawn_worker_cleaner(Coord, Workers, undefined),
+ Ref = erlang:monitor(process, Cleaner),
+ Coord ! die,
+ receive
+ {'DOWN', Ref, _, Cleaner, _} -> ok
+ end,
+ ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)).
-does_not_fire_if_cleanup_called() ->
- ?_test(begin
- meck:reset(rexi),
- erase(?WORKER_CLEANER),
- Workers = [
- #shard{node = 'n1', ref = make_ref()},
- #shard{node = 'n2', ref = make_ref()}
- ],
- {Coord, _} = spawn_monitor(fun() ->
- receive
- die -> ok
- end
- end),
- Cleaner = spawn_worker_cleaner(Coord, Workers),
- Ref = erlang:monitor(process, Cleaner),
- cleanup(Workers),
- Coord ! die,
+does_not_fire_if_cleanup_called(_) ->
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()},
+ #shard{node = 'n2', ref = make_ref()}
+ ],
+ {Coord, _} = spawn_monitor(fun() ->
+ receive
+ die -> ok
+ end
+ end),
+ Cleaner = spawn_worker_cleaner(Coord, Workers, undefined),
+ Ref = erlang:monitor(process, Cleaner),
+ cleanup(Workers),
+ Coord ! die,
+ receive
+ {'DOWN', Ref, _, _, _} -> ok
+ end,
+ % 2 calls would be from cleanup/1 function. If cleanup process fired
+ % too it would have been 4 calls total.
+ ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)).
+
+should_clean_additional_worker_too(_) ->
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()}
+ ],
+ {Coord, _} = spawn_monitor(fun() ->
receive
- {'DOWN', Ref, _, _, _} -> ok
- end,
- % 2 calls would be from cleanup/1 function. If cleanup process fired
- % too it would have been 4 calls total.
- ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
- end).
+ die -> ok
+ end
+ end),
+ Cleaner = spawn_worker_cleaner(Coord, Workers, undefined),
+ add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
+ Ref = erlang:monitor(process, Cleaner),
+ Coord ! die,
+ receive
+ {'DOWN', Ref, _, Cleaner, _} -> ok
+ end,
+ ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)).
-should_clean_additional_worker_too() ->
- ?_test(begin
- meck:reset(rexi),
- erase(?WORKER_CLEANER),
- Workers = [
- #shard{node = 'n1', ref = make_ref()}
- ],
- {Coord, _} = spawn_monitor(fun() ->
+coordinator_is_killed_if_client_disconnects(_) ->
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()},
+ #shard{node = 'n2', ref = make_ref()}
+ ],
+ {Coord, CoordRef} = spawn_monitor(fun() ->
+ receive
+ die -> ok
+ end
+ end),
+ {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+ % Close the socket and then expect coordinator to be killed
+ ok = gen_tcp:close(Sock),
+ Cleaner = spawn_worker_cleaner(Coord, Workers, Sock),
+ CleanerRef = erlang:monitor(process, Cleaner),
+ % Assert the correct behavior on the support platforms (all except Windows
so far)
+ case os:type() of
+ {unix, Type} when Type =:= linux; Type =:= darwin; Type =:= freebsd ->
+ % Coordinator should be torn down
receive
- die -> ok
- end
- end),
- Cleaner = spawn_worker_cleaner(Coord, Workers),
- add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
- Ref = erlang:monitor(process, Cleaner),
- Coord ! die,
+ {'DOWN', CoordRef, _, _, Reason} ->
+ ?assertEqual({shutdown, client_disconnected}, Reason)
+ end,
+ % Cleaner process itself should exit
+ receive
+ {'DOWN', CleanerRef, _, _, _} -> ok
+ end,
+ % Workers should have been killed
+ ?assertEqual(1, meck:num_calls(rexi, kill_all, 1));
+ {_, _} = OsType ->
+ ?debugFmt("~n * Client disconnect test not yet supported on ~p~n",
[OsType])
+ end.
+
+coordinator_is_not_killed_if_client_is_connected(_) ->
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()},
+ #shard{node = 'n2', ref = make_ref()}
+ ],
+ {Coord, CoordRef} = spawn_monitor(fun() ->
receive
- {'DOWN', Ref, _, Cleaner, _} -> ok
- end,
- ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
- end).
+ die -> ok
+ end
+ end),
+ {ok, Sock} = gen_tcp:listen(0, [{active, false}]),
+ Cleaner = spawn_worker_cleaner(Coord, Workers, Sock),
+ CleanerRef = erlang:monitor(process, Cleaner),
+ % Coordinator should stay up
+ receive
+ {'DOWN', CoordRef, _, Coord, _} ->
+ ?assert(false, {unexpected_coordinator_exit, Coord})
+ after 1000 ->
+ ?assert(is_process_alive(Coord))
+ end,
+ % Cleaner process stays up
+ ?assert(is_process_alive(Cleaner)),
+ % Tear everything down at the end of the test
+ gen_tcp:close(Sock),
+ Coord ! die,
+ receive
+ {'DOWN', CleanerRef, _, _, _} -> ok
+ end.
setup() ->
- ok = meck:expect(rexi, kill_all, fun(_) -> ok end).
+ ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
+ % Speed up disconnect socket timeout for the test to 200 msec
+ ok = meck:expect(chttpd_util, mochiweb_socket_check_msec, 0, 200).
teardown(_) ->
meck:unload().
diff --git a/src/fabric/src/fabric_view_changes.erl
b/src/fabric/src/fabric_view_changes.erl
index 970a200d2..bf0661a17 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -39,11 +39,12 @@ go(DbName, Feed, Options, Callback, Acc0) when
{Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
Ref = make_ref(),
Parent = self(),
+ ClientSock = chttpd_util:mochiweb_socket_get(),
UpdateListener = {
spawn_link(
fabric_db_update_listener,
go,
- [Parent, Ref, DbName, Timeout]
+ [Parent, Ref, DbName, Timeout, ClientSock]
),
Ref
},