This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch monitor-client-socket-for-disconnections in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 90a1d3191656166ef07c1c84732ec86488c3b63f Author: Nick Vatamaniuc <[email protected]> AuthorDate: Wed Aug 23 19:10:28 2023 -0400 [fixup] add tests asserting cleanup happens depending on the socket state --- src/fabric/src/fabric_streams.erl | 206 +++++++++++++++++++++++++------------- 1 file changed, 134 insertions(+), 72 deletions(-) diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index f4144666c..b1d3e4719 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -191,96 +191,158 @@ add_worker_to_cleaner(CoordinatorPid, Worker) -> -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). worker_cleaner_test_() -> { "Fabric spawn_worker_cleaner test", { - setup, + foreach, fun setup/0, fun teardown/1, - fun(_) -> - [ - should_clean_workers(), - does_not_fire_if_cleanup_called(), - should_clean_additional_worker_too() - ] - end + [ + ?TDEF_FE(should_clean_workers), + ?TDEF_FE(does_not_fire_if_cleanup_called), + ?TDEF_FE(should_clean_additional_worker_too), + ?TDEF_FE(coordinator_is_killed_if_client_disconnects), + ?TDEF_FE(coordinator_is_not_killed_if_client_is_connected) + ] } }. -should_clean_workers() -> - ?_test(begin - meck:reset(rexi), - erase(?WORKER_CLEANER), - Workers = [ - #shard{node = 'n1', ref = make_ref()}, - #shard{node = 'n2', ref = make_ref()} - ], - {Coord, _} = spawn_monitor(fun() -> - receive - die -> ok - end - end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - Ref = erlang:monitor(process, Cleaner), - Coord ! die, +should_clean_workers(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive - {'DOWN', Ref, _, Cleaner, _} -> ok - end, - ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) - end). + die -> ok + end + end), + Cleaner = spawn_worker_cleaner(Coord, Workers, undefined), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive + {'DOWN', Ref, _, Cleaner, _} -> ok + end, + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)). -does_not_fire_if_cleanup_called() -> - ?_test(begin - meck:reset(rexi), - erase(?WORKER_CLEANER), - Workers = [ - #shard{node = 'n1', ref = make_ref()}, - #shard{node = 'n2', ref = make_ref()} - ], - {Coord, _} = spawn_monitor(fun() -> - receive - die -> ok - end - end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - Ref = erlang:monitor(process, Cleaner), - cleanup(Workers), - Coord ! die, +does_not_fire_if_cleanup_called(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> + receive + die -> ok + end + end), + Cleaner = spawn_worker_cleaner(Coord, Workers, undefined), + Ref = erlang:monitor(process, Cleaner), + cleanup(Workers), + Coord ! die, + receive + {'DOWN', Ref, _, _, _} -> ok + end, + % 2 calls would be from cleanup/1 function. If cleanup process fired + % too it would have been 4 calls total. + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)). + +should_clean_additional_worker_too(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive - {'DOWN', Ref, _, _, _} -> ok - end, - % 2 calls would be from cleanup/1 function. If cleanup process fired - % too it would have been 4 calls total. - ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) - end). + die -> ok + end + end), + Cleaner = spawn_worker_cleaner(Coord, Workers, undefined), + add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive + {'DOWN', Ref, _, Cleaner, _} -> ok + end, + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)). -should_clean_additional_worker_too() -> - ?_test(begin - meck:reset(rexi), - erase(?WORKER_CLEANER), - Workers = [ - #shard{node = 'n1', ref = make_ref()} - ], - {Coord, _} = spawn_monitor(fun() -> +coordinator_is_killed_if_client_disconnects(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, CoordRef} = spawn_monitor(fun() -> + receive + die -> ok + end + end), + {ok, Sock} = gen_tcp:listen(0, [{active, false}]), + % Close the socket and then expect coordinator to be killed + ok = gen_tcp:close(Sock), + Cleaner = spawn_worker_cleaner(Coord, Workers, Sock), + CleanerRef = erlang:monitor(process, Cleaner), + % Assert the correct behavior on the support platforms (all except Windows so far) + case os:type() of + {unix, Type} when Type =:= linux; Type =:= darwin; Type =:= freebsd -> + % Coordinator should be torn down receive - die -> ok - end - end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), - Ref = erlang:monitor(process, Cleaner), - Coord ! die, + {'DOWN', CoordRef, _, _, Reason} -> + ?assertEqual({shutdown, client_disconnected}, Reason) + end, + % Cleaner process itself should exit + receive + {'DOWN', CleanerRef, _, _, _} -> ok + end, + % Workers should have been killed + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)); + {_, _} = OsType -> + ?debugFmt("~n * Client disconnect test not yet supported on ~p~n", [OsType]) + end. + +coordinator_is_not_killed_if_client_is_connected(_) -> + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, CoordRef} = spawn_monitor(fun() -> receive - {'DOWN', Ref, _, Cleaner, _} -> ok - end, - ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) - end). + die -> ok + end + end), + {ok, Sock} = gen_tcp:listen(0, [{active, false}]), + Cleaner = spawn_worker_cleaner(Coord, Workers, Sock), + CleanerRef = erlang:monitor(process, Cleaner), + % Coordinator should stay up + receive + {'DOWN', CoordRef, _, Coord, _} -> + ?assert(false, {unexpected_coordinator_exit, Coord}) + after 1000 -> + ?assert(is_process_alive(Coord)) + end, + % Cleaner process stays up + ?assert(is_process_alive(Cleaner)), + % Tear everything down at the end of the test + gen_tcp:close(Sock), + Coord ! die, + receive + {'DOWN', CleanerRef, _, _, _} -> ok + end. setup() -> - ok = meck:expect(rexi, kill_all, fun(_) -> ok end). + ok = meck:expect(rexi, kill_all, fun(_) -> ok end), + % Speed up disconnect socket timeout for the test to 200 msec + ok = meck:expect(chttpd_util, mochiweb_socket_check_msec, 0, 200). teardown(_) -> meck:unload().
