This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch monitor-client-socket-for-disconnections in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a39066724d39fc041ab6bc731382a1a4de8c34e1 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Wed Aug 23 16:02:39 2023 -0400 Stop client process and clean up if client disconnects Previously, when processing long running streaming requests, such as _find with a highly selective selector, when no rows are emitted for a while, the client could disconnect but the request process, and all the associated workers on remote nodes would continue running, consuming server resources. We already handle remote (RPC) process cleanup if the coordinator crashes, we just need a way to kill that coordinator if the connection is closed in the case when no data may be emitted for a long time. This is what the current commit accomplishes. It turns out there is no simple way to detect passive mode socket disconnects in current versions of Erlang/OTP. The socket at the kernel level may be closed (in close_wait state), however `inet:info/1` will continue reporting it as `connected`. The only ways to obtain an accurate connection state is to try to write, read, or query the TCP info state. Here we attempt to query the state with tcp_info. Unfortunately, not all versions of supported OTP platforms have this option, so it probably never became an officially supported inet socket option, so we use `raw` socket option for it. However, it turns out most of CouchDB supported platforms do have that option. The only platform this is not working currently is Windows. In that case, or future platform cases we default to the previous behavior, assuming the socket is still open. Co-Authored-By: Robert Newson <[email protected]> --- rel/overlay/etc/default.ini | 4 + src/chttpd/priv/stats_descriptions.cfg | 4 + src/chttpd/src/chttpd.erl | 5 + src/chttpd/src/chttpd_util.erl | 120 ++++++++++++++- src/chttpd/test/eunit/chttpd_util_test.erl | 87 +++++++++++ src/docs/src/config/http.rst | 11 ++ src/fabric/src/fabric_db_update_listener.erl | 16 +- src/fabric/src/fabric_streams.erl | 221 +++++++++++++++++---------- src/fabric/src/fabric_view_changes.erl | 3 +- 9 files changed, 386 insertions(+), 85 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index c3124a643..396244754 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -206,6 +206,10 @@ bind_address = 127.0.0.1 ; response header. ;server_header_versions = true +; How often to check for client disconnects while processing streaming +; requests such as _all_docs, _find, _changes and views +;disconnect_check_msec = 30000 + ;[jwt_auth] ; List of claims to validate ; can be the name of a claim like "exp" or a tuple if the claim requires diff --git a/src/chttpd/priv/stats_descriptions.cfg b/src/chttpd/priv/stats_descriptions.cfg index f54231ce3..901d99a26 100644 --- a/src/chttpd/priv/stats_descriptions.cfg +++ b/src/chttpd/priv/stats_descriptions.cfg @@ -18,6 +18,10 @@ {type, counter}, {desc, <<"number of aborted requests">>} ]}. +{[couchdb, httpd, abandoned_streaming_requests], [ + {type, counter}, + {desc, <<"number of abandoned streaming requests">>} +]}. {[couchdb, dbinfo], [ {type, histogram}, {desc, <<"distribution of latencies for calls to retrieve DB info">>} diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index c8e6fdc97..7f432f8f1 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -320,6 +320,9 @@ handle_request_int(MochiReq) -> erlang:put(dont_log_request, true), erlang:put(dont_log_response, true), + % Save client socket so that it can be monitored for disconnects + chttpd_util:mochiweb_socket_set(MochiReq:get(socket)), + {HttpReq2, Response} = case before_request(HttpReq0) of {ok, HttpReq1} -> @@ -328,6 +331,8 @@ handle_request_int(MochiReq) -> {HttpReq0, Response0} end, + chttpd_util:mochiweb_socket_clean(), + {Status, Code, Reason, Resp} = split_response(Response), HttpResp = #httpd_resp{ diff --git a/src/chttpd/src/chttpd_util.erl b/src/chttpd/src/chttpd_util.erl index 955beca57..7fbe436d7 100644 --- a/src/chttpd/src/chttpd_util.erl +++ b/src/chttpd/src/chttpd_util.erl @@ -22,9 +22,17 @@ get_chttpd_auth_config_integer/2, get_chttpd_auth_config_boolean/2, maybe_add_csp_header/3, - get_db_info/1 + get_db_info/1, + mochiweb_socket_set/1, + mochiweb_socket_clean/0, + mochiweb_socket_get/0, + mochiweb_socket_check_msec/0, + stop_client_process_if_disconnected/2 ]). +-define(MOCHIWEB_SOCKET, mochiweb_connection_socket). +-define(DISCONNECT_CHECK_MSEC, 30000). + get_chttpd_config(Key) -> config:get("chttpd", Key, config:get("httpd", Key)). @@ -110,3 +118,113 @@ get_db_info(DbName) -> catch _Tag:Error -> {error, Error} end. + +mochiweb_socket_set(Sock) -> + put(?MOCHIWEB_SOCKET, Sock). + +mochiweb_socket_clean() -> + erase(?MOCHIWEB_SOCKET). + +mochiweb_socket_get() -> + get(?MOCHIWEB_SOCKET). + +mochiweb_socket_check_msec() -> + MSec = config:get_integer("chttpd", "disconnect_check_msec", ?DISCONNECT_CHECK_MSEC), + % Add jitter to avoid a stampede in case of a larger number of concurrent connections + MSec + rand:uniform(MSec). + +stop_client_process_if_disconnected(Pid, Sock) -> + case is_mochiweb_socket_closed(Sock) of + true -> + exit(Pid, {shutdown, client_disconnected}), + couch_stats:increment_counter([couchdb, httpd, abandoned_streaming_requests]), + ok; + false -> + ok + end. + +is_mochiweb_socket_closed(undefined) -> + false; +is_mochiweb_socket_closed(Sock) -> + OsType = os:type(), + case tcp_info_opt(OsType) of + {raw, _, _, _} = InfoOpt -> + case mochiweb_socket:getopts(Sock, [InfoOpt]) of + {ok, [{raw, _, _, <<State:8/native, _/binary>>}]} -> + tcp_is_closed(State, OsType); + {ok, []} -> + false; + {error, einval} -> + % Already cleaned up + true; + {error, _} -> + false + end; + undefined -> + false + end. + +% All OS-es have the tcpi_state (uint8) as first member of tcp_info struct + +tcp_info_opt({unix, linux}) -> + %% netinet/in.h + %% IPPROTO_TCP = 6 + %% + %% netinet/tcp.h + %% #define TCP_INFO 11 + %% + {raw, 6, 11, 1}; +tcp_info_opt({unix, darwin}) -> + %% netinet/in.h + %% #define IPPROTO_TCP 6 + %% + %% netinet/tcp.h + %% #define TCP_CONNECTION_INFO 0x106 + %% + {raw, 6, 16#106, 1}; +tcp_info_opt({unix, freebsd}) -> + %% sys/netinet/in.h + %% #define IPPROTO_TCP 6 + %% + %% sys/netinet/tcp.h + %% #define TCP_INFO 32 + %% + {raw, 6, 32, 1}; +tcp_info_opt({_, _}) -> + undefined. + +tcp_is_closed(State, {unix, linux}) -> + %% netinet/tcp.h + %% enum + %% { + %% TCP_ESTABLISHED = 1, + %% TCP_SYN_SENT, + %% TCP_SYN_RECV, + %% TCP_FIN_WAIT1, + %% TCP_FIN_WAIT2, + %% TCP_TIME_WAIT, + %% TCP_CLOSE, + %% TCP_CLOSE_WAIT, + %% TCP_LAST_ACK, + %% TCP_LISTEN, + %% TCP_CLOSING + %% } + %% + lists:member(State, [4, 5, 6, 7, 8, 9, 11]); +tcp_is_closed(State, {unix, Type}) when Type =:= darwin; Type =:= freebsd -> + %% tcp_fsm.h states are the same on macos and freebsd + %% + %% netinet/tcp_fsm.h + %% #define TCPS_CLOSED 0 /* closed */ + %% #define TCPS_LISTEN 1 /* listening for connection */ + %% #define TCPS_SYN_SENT 2 /* active, have sent syn */ + %% #define TCPS_SYN_RECEIVED 3 /* have send and received syn */ + %% #define TCPS_ESTABLISHED 4 /* established */ + %% #define TCPS_CLOSE_WAIT 5 /* rcvd fin, waiting for close */ + %% #define TCPS_FIN_WAIT_1 6 /* have closed, sent fin */ + %% #define TCPS_CLOSING 7 /* closed xchd FIN; await FIN ACK */ + %% #define TCPS_LAST_ACK 8 /* had fin and close; await FIN ACK */ + %% #define TCPS_FIN_WAIT_2 9 /* have closed, fin is acked */ + %% #define TCPS_TIME_WAIT 10 /* in 2*msl quiet wait after close */ + %% + lists:member(State, [0, 5, 6, 7, 8, 9, 10]). diff --git a/src/chttpd/test/eunit/chttpd_util_test.erl b/src/chttpd/test/eunit/chttpd_util_test.erl index c381dac6e..06b591d6d 100644 --- a/src/chttpd/test/eunit/chttpd_util_test.erl +++ b/src/chttpd/test/eunit/chttpd_util_test.erl @@ -113,3 +113,90 @@ test_auth_with_undefined_option(_) -> ?assertEqual("", chttpd_util:get_chttpd_auth_config("undefine", "")), ?assert(chttpd_util:get_chttpd_auth_config("undefine", true)), ?assertNot(chttpd_util:get_chttpd_auth_config("undefine", false)). + +chttpd_util_client_socker_monitor_test_() -> + { + setup, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + with([ + ?TDEF(t_socket_set_get_clean), + ?TDEF(t_socket_check_config), + ?TDEF(t_closed_socket_kills_coordinator) + ]) + }. + +t_socket_set_get_clean(_) -> + ?assertEqual(undefined, chttpd_util:mochiweb_socket_get()), + {ok, Sock} = gen_tcp:listen(0, [{active, false}]), + chttpd_util:mochiweb_socket_set(Sock), + ?assertEqual(Sock, chttpd_util:mochiweb_socket_get()), + chttpd_util:mochiweb_socket_clean(), + ?assertEqual(undefined, chttpd_util:mochiweb_socket_get()), + gen_tcp:close(Sock). + +t_socket_check_config(_) -> + config:set("chttpd", "disconnect_check_msec", "100", false), + lists:foreach( + fun(_) -> + MSec = chttpd_util:mochiweb_socket_check_msec(), + ?assert(is_integer(MSec)), + ?assert(MSec >= 100), + ?assert(MSec =< 200) + end, + lists:seq(1, 100) + ), + config:delete("chttpd", "disconnect_check_msec", false). + +t_closed_socket_kills_coordinator(_) -> + {Pid, Ref} = spawn_coord(), + {ok, Sock} = gen_tcp:listen(0, [{active, false}]), + + % Can call getopts many times in a row process should stay alive + lists:foreach( + fun(_) -> + ok = chttpd_util:stop_client_process_if_disconnected(Pid, Sock) + end, + lists:seq(1, 10000) + ), + ?assert(is_process_alive(Pid)), + + gen_tcp:close(Sock), + + ?assertEqual(ok, chttpd_util:stop_client_process_if_disconnected(Pid, Sock)), + case tcp_info_works() of + true -> + ?assertEqual({shutdown, client_disconnected}, wait_coord_death(Ref)); + false -> + ?assert(is_process_alive(Pid)), + % Kill it manually + exit(Pid, kill) + end, + + % Can call stop_client_... even if process may be dead and the socket is closed + lists:foreach( + fun(_) -> + ok = chttpd_util:stop_client_process_if_disconnected(Pid, Sock) + end, + lists:seq(1, 10000) + ). + +spawn_coord() -> + spawn_monitor(fun() -> + receive + die -> ok + end + end). + +wait_coord_death(Ref) -> + receive + {'DOWN', Ref, _, _, Reason} -> Reason + end. + +tcp_info_works() -> + case os:type() of + {unix, OsName} -> + lists:member(OsName, [linux, freebsd, darwin]); + {_, _} -> + false + end. diff --git a/src/docs/src/config/http.rst b/src/docs/src/config/http.rst index cbe36c4e3..bfba6b75d 100644 --- a/src/docs/src/config/http.rst +++ b/src/docs/src/config/http.rst @@ -266,6 +266,17 @@ HTTP Server Options [chttpd] server_header_versions = true + .. config:option:: disconnect_check_msec :: Client disconnection check interval + + .. versionadded:: 3.4 + + How often, in milliseconds, to check for client disconnects while + processing streaming requests such as _all_docs, _find, _changes and + views. :: + + [chttpd] + disconnect_check_msec = 30000 + .. config:section:: httpd :: HTTP Server Options .. versionchanged:: 3.2 These options were moved to [chttpd] section: diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl index 78ccf5a4d..d4a49f37d 100644 --- a/src/fabric/src/fabric_db_update_listener.erl +++ b/src/fabric/src/fabric_db_update_listener.erl @@ -12,7 +12,7 @@ -module(fabric_db_update_listener). --export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]). +-export([go/5, start_update_notifier/1, stop/1, wait_db_updated/1]). -export([handle_db_event/3]). -include_lib("fabric/include/fabric.hrl"). @@ -36,12 +36,12 @@ shards }). -go(Parent, ParentRef, DbName, Timeout) -> +go(Parent, ParentRef, DbName, Timeout, ClientSock) -> Shards = mem3:shards(DbName), Notifiers = start_update_notifiers(Shards), MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]), RexiMon = rexi_monitor:start(MonRefs), - MonPid = start_cleanup_monitor(self(), Notifiers), + MonPid = start_cleanup_monitor(self(), Notifiers, ClientSock), %% This is not a common pattern for rexi but to enable the calling %% process to communicate via handle_message/3 we "fake" it as a %% a spawned worker. @@ -97,16 +97,17 @@ handle_db_event(_DbName, deleted, St) -> handle_db_event(_DbName, _Event, St) -> {ok, St}. -start_cleanup_monitor(Parent, Notifiers) -> +start_cleanup_monitor(Parent, Notifiers, ClientSock) -> spawn(fun() -> Ref = erlang:monitor(process, Parent), - cleanup_monitor(Parent, Ref, Notifiers) + cleanup_monitor(Parent, Ref, Notifiers, ClientSock) end). stop_cleanup_monitor(MonPid) -> MonPid ! {self(), stop}. -cleanup_monitor(Parent, Ref, Notifiers) -> +cleanup_monitor(Parent, Ref, Notifiers, ClientSock) -> + CheckMSec = chttpd_util:mochiweb_socket_check_msec(), receive {'DOWN', Ref, _, _, _} -> stop_update_notifiers(Notifiers); @@ -116,6 +117,9 @@ cleanup_monitor(Parent, Ref, Notifiers) -> couch_log:error("Unkown message in ~w :: ~w", [?MODULE, Else]), stop_update_notifiers(Notifiers), exit(Parent, {unknown_message, Else}) + after CheckMSec -> + chttpd_util:stop_client_process_if_disconnected(Parent, ClientSock), + cleanup_monitor(Parent, Ref, Notifiers, ClientSock) end. stop_update_notifiers(Notifiers) -> diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 318824814..b1d3e4719 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -43,7 +43,8 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> replacements = Replacements, ring_opts = RingOpts }, - spawn_worker_cleaner(self(), Workers0), + ClientSock = chttpd_util:mochiweb_socket_get(), + spawn_worker_cleaner(self(), Workers0, ClientSock), Timeout = fabric_util:request_timeout(), case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of {ok, #stream_acc{ready = Workers}} -> @@ -155,12 +156,12 @@ handle_stream_start(Else, _, _) -> % Spawn an auxiliary rexi worker cleaner. This will be used in cases % when the coordinator (request) process is forceably killed and doesn't % get a chance to process its `after` fabric:clean/1 clause. -spawn_worker_cleaner(Coordinator, Workers) -> +spawn_worker_cleaner(Coordinator, Workers, ClientSock) -> case get(?WORKER_CLEANER) of undefined -> Pid = spawn(fun() -> erlang:monitor(process, Coordinator), - cleaner_loop(Coordinator, Workers) + cleaner_loop(Coordinator, Workers, ClientSock) end), put(?WORKER_CLEANER, Pid), Pid; @@ -168,12 +169,16 @@ spawn_worker_cleaner(Coordinator, Workers) -> ExistingCleaner end. -cleaner_loop(Pid, Workers) -> +cleaner_loop(Pid, Workers, ClientSock) -> + CheckMSec = chttpd_util:mochiweb_socket_check_msec(), receive {add_worker, Pid, Worker} -> - cleaner_loop(Pid, [Worker | Workers]); + cleaner_loop(Pid, [Worker | Workers], ClientSock); {'DOWN', _, _, Pid, _} -> fabric_util:cleanup(Workers) + after CheckMSec -> + chttpd_util:stop_client_process_if_disconnected(Pid, ClientSock), + cleaner_loop(Pid, Workers, ClientSock) end. add_worker_to_cleaner(CoordinatorPid, Worker) -> @@ -186,96 +191,158 @@ add_worker_to_cleaner(CoordinatorPid, Worker) -> -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). worker_cleaner_test_() -> { "Fabric spawn_worker_cleaner test", { - setup, + foreach, fun setup/0, fun teardown/1, - fun(_) -> - [ - should_clean_workers(), - does_not_fire_if_cleanup_called(), - should_clean_additional_worker_too() - ] - end + [ + ?TDEF_FE(should_clean_workers), + ?TDEF_FE(does_not_fire_if_cleanup_called), + ?TDEF_FE(should_clean_additional_worker_too), + ?TDEF_FE(coordinator_is_killed_if_client_disconnects), + ?TDEF_FE(coordinator_is_not_killed_if_client_is_connected) + ] } }. -should_clean_workers() -> - ?_test(begin - meck:reset(rexi), - erase(?WORKER_CLEANER), - Workers = [ - #shard{node = 'n1', ref = make_ref()}, - #shard{node = 'n2', ref = make_ref()} - ], - {Coord, _} = spawn_monitor(fun() -> - receive - die -> ok - end - end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - Ref = erlang:monitor(process, Cleaner), - Coord ! die, +should_clean_workers(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive - {'DOWN', Ref, _, Cleaner, _} -> ok - end, - ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) - end). + die -> ok + end + end), + Cleaner = spawn_worker_cleaner(Coord, Workers, undefined), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive + {'DOWN', Ref, _, Cleaner, _} -> ok + end, + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)). -does_not_fire_if_cleanup_called() -> - ?_test(begin - meck:reset(rexi), - erase(?WORKER_CLEANER), - Workers = [ - #shard{node = 'n1', ref = make_ref()}, - #shard{node = 'n2', ref = make_ref()} - ], - {Coord, _} = spawn_monitor(fun() -> - receive - die -> ok - end - end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - Ref = erlang:monitor(process, Cleaner), - cleanup(Workers), - Coord ! die, +does_not_fire_if_cleanup_called(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> + receive + die -> ok + end + end), + Cleaner = spawn_worker_cleaner(Coord, Workers, undefined), + Ref = erlang:monitor(process, Cleaner), + cleanup(Workers), + Coord ! die, + receive + {'DOWN', Ref, _, _, _} -> ok + end, + % 2 calls would be from cleanup/1 function. If cleanup process fired + % too it would have been 4 calls total. + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)). + +should_clean_additional_worker_too(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive - {'DOWN', Ref, _, _, _} -> ok - end, - % 2 calls would be from cleanup/1 function. If cleanup process fired - % too it would have been 4 calls total. - ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) - end). + die -> ok + end + end), + Cleaner = spawn_worker_cleaner(Coord, Workers, undefined), + add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive + {'DOWN', Ref, _, Cleaner, _} -> ok + end, + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)). -should_clean_additional_worker_too() -> - ?_test(begin - meck:reset(rexi), - erase(?WORKER_CLEANER), - Workers = [ - #shard{node = 'n1', ref = make_ref()} - ], - {Coord, _} = spawn_monitor(fun() -> +coordinator_is_killed_if_client_disconnects(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, CoordRef} = spawn_monitor(fun() -> + receive + die -> ok + end + end), + {ok, Sock} = gen_tcp:listen(0, [{active, false}]), + % Close the socket and then expect coordinator to be killed + ok = gen_tcp:close(Sock), + Cleaner = spawn_worker_cleaner(Coord, Workers, Sock), + CleanerRef = erlang:monitor(process, Cleaner), + % Assert the correct behavior on the support platforms (all except Windows so far) + case os:type() of + {unix, Type} when Type =:= linux; Type =:= darwin; Type =:= freebsd -> + % Coordinator should be torn down receive - die -> ok - end - end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), - Ref = erlang:monitor(process, Cleaner), - Coord ! die, + {'DOWN', CoordRef, _, _, Reason} -> + ?assertEqual({shutdown, client_disconnected}, Reason) + end, + % Cleaner process itself should exit + receive + {'DOWN', CleanerRef, _, _, _} -> ok + end, + % Workers should have been killed + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)); + {_, _} = OsType -> + ?debugFmt("~n * Client disconnect test not yet supported on ~p~n", [OsType]) + end. + +coordinator_is_not_killed_if_client_is_connected(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, CoordRef} = spawn_monitor(fun() -> receive - {'DOWN', Ref, _, Cleaner, _} -> ok - end, - ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) - end). + die -> ok + end + end), + {ok, Sock} = gen_tcp:listen(0, [{active, false}]), + Cleaner = spawn_worker_cleaner(Coord, Workers, Sock), + CleanerRef = erlang:monitor(process, Cleaner), + % Coordinator should stay up + receive + {'DOWN', CoordRef, _, Coord, _} -> + ?assert(false, {unexpected_coordinator_exit, Coord}) + after 1000 -> + ?assert(is_process_alive(Coord)) + end, + % Cleaner process stays up + ?assert(is_process_alive(Cleaner)), + % Tear everything down at the end of the test + gen_tcp:close(Sock), + Coord ! die, + receive + {'DOWN', CleanerRef, _, _, _} -> ok + end. setup() -> - ok = meck:expect(rexi, kill_all, fun(_) -> ok end). + ok = meck:expect(rexi, kill_all, fun(_) -> ok end), + % Speed up disconnect socket timeout for the test to 200 msec + ok = meck:expect(chttpd_util, mochiweb_socket_check_msec, 0, 200). teardown(_) -> meck:unload(). diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index 970a200d2..bf0661a17 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -39,11 +39,12 @@ go(DbName, Feed, Options, Callback, Acc0) when {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), Ref = make_ref(), Parent = self(), + ClientSock = chttpd_util:mochiweb_socket_get(), UpdateListener = { spawn_link( fabric_db_update_listener, go, - [Parent, Ref, DbName, Timeout] + [Parent, Ref, DbName, Timeout, ClientSock] ), Ref },
