This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch ioq-per-shard-or-user in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit 1dee640342c4db8756606bff3962cfed6efc8bc1 Author: Russell Branca <[email protected]> AuthorDate: Thu Feb 28 18:48:21 2019 +0000 WIP: IOQ2 per shard/user --- src/ioq.erl | 24 ++++- src/ioq_opener.erl | 256 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ioq_server2.erl | 47 ++++++---- src/ioq_sup.erl | 17 +--- test/ioq_tests.erl | 7 +- 5 files changed, 315 insertions(+), 36 deletions(-) diff --git a/src/ioq.erl b/src/ioq.erl index 160a448..2530dba 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -15,7 +15,12 @@ get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0, get_disk_counters/0, get_disk_concurrency/0]). -export([ - ioq2_enabled/0 + ioq2_enabled/0, + fetch_pid_for/1, + fetch_pid_for/2, + fetch_pid_for/3, + get_pid_for/1, + set_pid_for/2 ]). -define(APPS, [config, folsom, couch_stats, ioq]). @@ -72,5 +77,20 @@ get_osproc_requests() -> gen_server:call(ioq_osq, get_requests). ioq2_enabled() -> - config:get_boolean("ioq2", "enabled", false). + config:get_boolean("ioq2", "enabled", true). + +fetch_pid_for(DbName) -> + ioq_opener:fetch_pid_for(DbName). + +fetch_pid_for(DbName, FdPid) -> + ioq_opener:fetch_pid_for(DbName, FdPid). + +fetch_pid_for(DbName, UserCtx, FdPid) -> + ioq_opener:fetch_pid_for(DbName, UserCtx, FdPid). + +get_pid_for(FdPid) -> + ioq_opener:get_pid_for(FdPid). + +set_pid_for(FdPid, IOQPid) -> + ioq_opener:set_pid_for(FdPid, IOQPid). diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl new file mode 100644 index 0000000..d85991e --- /dev/null +++ b/src/ioq_opener.erl @@ -0,0 +1,256 @@ +-module(ioq_opener). +-behavior(gen_server). + + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). +-export([ + start_link/0, + %%fetch_pid_for/1, + fetch_pid_for/2, + fetch_pid_for/3, + get_pid_for/1, + set_pid_for/2, + get_ioq_pids/0, + get_pid_idx/0, + get_monitor_idx/0 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +-define(BY_USER, by_user). +-define(BY_SHARD, by_shard). +-define(DEFAULT_DISPATCH, ?BY_SHARD). +-define(PDICT_MARKER, ioq_pid_for). + + +-record(st, { + idle = [] :: [{erlang:timestamp(), pid()}], + pid_idx :: khash:khash(), + monitors :: khash:khash(), + dispatch :: by_shard | by_user | undefined + }). + + +%% HACK: experiment to allow for spawning IOQ2 pids prior to the spawning +%% the associated couch_file pids +%%fetch_pid_for(DbName) when is_binary(DbName) -> +%% fetch_pid_for(DbName, self()). + + +%% TODO: cleanup the overloaded arity once experiments concluded +%%fetch_pid_for(DbName, undefined) when is_binary(DbName) -> +%% fetch_pid_for(DbName, undefined, self()); +%%fetch_pid_for(DbName, #user_ctx{}=Ctx) when is_binary(DbName) -> +%% fetch_pid_for(DbName, Ctx, self()); +fetch_pid_for(DbName, FdPid) when is_binary(DbName), is_pid(FdPid) -> + fetch_pid_for(DbName, undefined, FdPid). + + +fetch_pid_for(DbName, UserCtx, FdPid) when is_binary(DbName), is_pid(FdPid) -> + gen_server:call(?MODULE, {fetch, DbName, UserCtx, FdPid}, infinity). + + +get_pid_for(undefined) -> + undefined; +get_pid_for(DbName) when is_binary(DbName) -> + %% HACK: use the same shard format as per #ioq_request{} to post facto + %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior + %% to having a db handle + erlang:get({?PDICT_MARKER, filename:rootname(DbName)}); +get_pid_for(FdPid) when is_pid(FdPid) -> + erlang:get({?PDICT_MARKER, FdPid}). + + +set_pid_for(_, undefined) -> + ok; +set_pid_for(DbName, IOQPid) when is_binary(DbName), is_pid(IOQPid) -> + %% HACK: use the same shard format as per #ioq_request{} to post facto + %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior + %% to having a db handle + erlang:put({?PDICT_MARKER, filename:rootname(DbName)}, IOQPid), + ok; +set_pid_for(FdPid, IOQPid) when is_pid(FdPid), is_pid(IOQPid) -> + erlang:put({?PDICT_MARKER, FdPid}, IOQPid), + ok. + + +get_pid_idx() -> + gen_server:call(?MODULE, get_pid_idx, infinity). + + +get_monitor_idx() -> + gen_server:call(?MODULE, get_monitor_idx, infinity). + + +get_ioq_pids() -> + lists:foldl( + fun + ({K, _V}, Acc) when is_pid(K) -> + [K | Acc]; + (_, Acc) -> + Acc + end, [], get_pid_idx()). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init([]) -> + process_flag(trap_exit, true), + {ok, PidIdx} = khash:new(), + {ok, Monitors} = khash:new(), + Dispatch = case config:get("ioq.opener", "dispatch", undefined) of + "by_shard" -> ?BY_SHARD; + "by_user" -> ?BY_USER; + _ -> ?DEFAULT_DISPATCH + end, + St = #st{ + pid_idx = PidIdx, + monitors = Monitors, + dispatch = Dispatch + }, + {ok, St}. + + +handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) -> + Caller = case FdPid of + undefined -> From; + _ when is_pid(FdPid) -> FdPid + end, + Name = case UserCtx of + #user_ctx{name=Name0} -> Name0; + %% TODO: support unknown user + undefined -> throw(unknown_user) + end, + IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of + not_found -> + {ok, Pid} = ioq_server2:start_link({user, Name}), + khash:put(St#st.pid_idx, Name, Pid), + khash:put(St#st.pid_idx, Pid, Name), + Pid; + Pid -> + Pid + end, + ok = add_monitor(St#st.monitors, Caller, IOQPid), + {reply, IOQPid, St}; +handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) -> + Caller = case FdPid of + undefined -> From; + _ when is_pid(FdPid) -> FdPid + end, + %% TODO: DbName = drop_compact_ext(DbName0), + IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of + not_found -> + {ok, Pid} = ioq_server2:start_link({shard, DbName}), + khash:put(St#st.pid_idx, DbName, Pid), + khash:put(St#st.pid_idx, Pid, DbName), + Pid; + Pid -> + Pid + end, + ok = add_monitor(St#st.monitors, Caller, IOQPid), + {reply, IOQPid, St}; +handle_call(get_pid_idx, _From, #st{}=St) -> + {reply, khash:to_list(St#st.pid_idx), St}; +handle_call(get_monitor_idx, _From, #st{}=St) -> + {reply, khash:to_list(St#st.monitors), St}; +handle_call(_, _From, St) -> + {reply, ok, St}. + + +handle_cast(_Msg, St) -> + {noreply, St}. + + +handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) -> + case drop_monitor(St#st.monitors, Ref) of + {IOQPid, []} -> + Name = khash:get(St#st.pid_idx, IOQPid), %% TODO: assert found? + khash:del(St#st.pid_idx, IOQPid), + khash:del(St#st.pid_idx, Name); + {_IOQPid, _Refs} -> + ok + end, + {noreply, St}; +handle_info({'EXIT', Pid, _}, St) -> + case khash:get(St#st.pid_idx, Pid, not_found) of + not_found -> + %% TODO: shouldn't happen, throw error? + ok; + Name -> + khash:del(St#st.pid_idx, Pid), + khash:del(St#st.pid_idx, Name) + end, + {noreply, St}; +handle_info(_Info, St) -> + {noreply, St}. + + +terminate(_Reason, _St) -> + ok. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +add_monitor(Mons, FdPid, IOQPid) -> + PidKey = {FdPid, IOQPid}, + Ref = case khash:get(Mons, PidKey, not_found) of + not_found -> + Ref0 = erlang:monitor(process, FdPid), + khash:put(Mons, Ref0, PidKey), + khash:put(Mons, PidKey, Ref0); + Ref0 -> + Ref0 + end, + case khash:get(Mons, IOQPid, not_found) of + not_found -> + khash:put(Mons, IOQPid, [Ref]); + Refs -> + case lists:member(Ref, Refs) of + true -> + ok; + false -> + khash:put(Mons, IOQPid, [Ref | Refs]) + end + end, + ok. + + +drop_monitor(Mons, Ref) when is_reference(Ref) -> + case khash:get(Mons, Ref, not_found) of + not_found -> + %% TODO: shouldn't happen + throw(unexpected); + {_FdPid, IOQPid}=PidKey -> + case khash:get(Mons, IOQPid, not_found) of + not_found -> + %% TODO: shouldn't happen + throw(unexpected); + Refs -> + khash:del(Mons, Ref), + khash:del(Mons, PidKey), + case lists:delete(Ref, Refs) of + [] -> + unlink(IOQPid), + khash:del(Mons, IOQPid), + exit(IOQPid, idle), + {IOQPid, []}; + Refs1 -> + khash:put(Mons, IOQPid, Refs1), + {IOQPid, Refs1} + end + end + end. + diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl index 5e2e01f..1ed49c4 100644 --- a/src/ioq_server2.erl +++ b/src/ioq_server2.erl @@ -23,6 +23,8 @@ code_change/3 ]). -export([ + start_link/0, + start_link/1, start_link/3, call/3, pcall/1, @@ -108,22 +110,17 @@ call(Fd, Msg, Dimensions) -> [couchdb, io_queue2, RW, bypassed_count]), gen_server:call(Fd, Msg, infinity); _ -> - DispatchStrategy = config:get( - "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER), - Server = case DispatchStrategy of - ?DISPATCH_RANDOM -> - maybe_seed(), - SID = random:uniform(erlang:system_info(schedulers)), - ?SERVER_ID(SID); - ?DISPATCH_FD_HASH -> - NumSchedulers = erlang:system_info(schedulers), - SID = 1 + (erlang:phash2(Fd) rem NumSchedulers), - ?SERVER_ID(SID); - ?DISPATCH_SINGLE_SERVER -> - ?SERVER_ID(1); - _ -> - SID = erlang:system_info(scheduler_id), - ?SERVER_ID(SID) + Server = case ioq_opener:get_pid_for(Fd) of + undefined -> + %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of + %% undefined -> + %% ioq_server2; + %% IOQPid -> + %% IOQPid + %%end; + ioq_server2; + IOQPid -> + IOQPid end, gen_server:call(Server, Req, infinity) end. @@ -267,6 +264,18 @@ update_config() -> gen_server:call(?SERVER_ID(1), update_config, infinity). +start_link() -> + start_link(?MODULE). + + +start_link(?MODULE) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []); +start_link({user, _Name}=User) -> + gen_server:start_link(?MODULE, [User], []); +start_link({shard, _Name}=Shard) -> + gen_server:start_link(?MODULE, [Shard], []). + + start_link(Name, SID, Bind) -> Options = case Bind of true -> [{scheduler, SID}]; @@ -275,7 +284,7 @@ start_link(Name, SID, Bind) -> gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options). -init([Name, SID]) -> +init([{Type, Name}]) -> {ok, HQ} = hqueue:new(), {ok, Reqs} = khash:new(), {ok, Waiters} = khash:new(), @@ -284,7 +293,7 @@ init([Name, SID]) -> reqs = Reqs, waiters = Waiters, server_name = Name, - scheduler_id = SID + scheduler_id = Type }, {ok, update_config_int(State)}. @@ -644,7 +653,7 @@ mock_server(Config) -> meck:expect(config, get_boolean, fun("ioq2", _, Default) -> Default end), - {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]), + {ok, State} = ioq_server2:init([{global, ?SERVER_ID(1)}]), State. diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl index 7ea6284..1129a9a 100644 --- a/src/ioq_sup.erl +++ b/src/ioq_sup.erl @@ -23,27 +23,16 @@ start_link() -> init([]) -> ok = ioq_config_listener:subscribe(), - IOQ2Children = ioq_server2_children(), {ok, { {one_for_one, 5, 10}, [ + ?CHILD(ioq_opener, worker), ?CHILD(ioq_server, worker), + ?CHILD(ioq_server2, worker), ?CHILD(ioq_osq, worker) - | IOQ2Children ] }}. -ioq_server2_children() -> - Bind = config:get_boolean("ioq2", "bind_to_schedulers", false), - ioq_server2_children(erlang:system_info(schedulers), Bind). - -ioq_server2_children(Count, Bind) -> - lists:map(fun(I) -> - Name = list_to_atom("ioq_server_" ++ integer_to_list(I)), - {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]} - end, lists:seq(1, Count)). get_ioq2_servers() -> - lists:map(fun(I) -> - list_to_atom("ioq_server_" ++ integer_to_list(I)) - end, lists:seq(1, erlang:system_info(schedulers))). + [ioq_server2 | ioq_opener:get_ioq_pids()]. diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl index b6b7bad..c2502c5 100644 --- a/test/ioq_tests.erl +++ b/test/ioq_tests.erl @@ -40,7 +40,12 @@ instantiate({_, S}) -> check_call(S, make_ref(), priority(IOClass, Shard)) end, shards()) end, io_classes())}, - ?_assertEqual(20, ioq:set_disk_concurrency(10)), + case ioq:ioq2_enabled() of + true -> + ?_assertEqual(1, ioq:set_disk_concurrency(10)); + false -> + ?_assertEqual(20, ioq:set_disk_concurrency(10)) + end, ?_assertError(badarg, ioq:set_disk_concurrency(0)), ?_assertError(badarg, ioq:set_disk_concurrency(-1)), ?_assertError(badarg, ioq:set_disk_concurrency(foo))].
