This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch ioq-per-shard-or-user in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit acd776b23cf13235782de026a8d861b064214f6e Author: Russell Branca <[email protected]> AuthorDate: Wed Mar 6 23:03:55 2019 +0000 Support finding ioq pids by #ioq_request{} --- src/ioq_opener.erl | 53 +++++++++++++++++++++++++++++++++++++++++++++-------- src/ioq_server2.erl | 28 ++++++++++++---------------- test/ioq_tests.erl | 6 ++++-- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl index d85991e..5bcf065 100644 --- a/src/ioq_opener.erl +++ b/src/ioq_opener.erl @@ -12,7 +12,7 @@ ]). -export([ start_link/0, - %%fetch_pid_for/1, + fetch_pid_for/1, fetch_pid_for/2, fetch_pid_for/3, get_pid_for/1, @@ -24,10 +24,14 @@ -include_lib("couch/include/couch_db.hrl"). +-include_lib("ioq/include/ioq.hrl"). -define(BY_USER, by_user). -define(BY_SHARD, by_shard). +-define(BY_CLASS, by_class). +-define(BY_FD, by_fd). +-define(BY_DB, by_db). -define(DEFAULT_DISPATCH, ?BY_SHARD). -define(PDICT_MARKER, ioq_pid_for). @@ -36,7 +40,7 @@ idle = [] :: [{erlang:timestamp(), pid()}], pid_idx :: khash:khash(), monitors :: khash:khash(), - dispatch :: by_shard | by_user | undefined + dispatch :: ?BY_SHARD | ?BY_DB | ?BY_USER | ?BY_CLASS | ?BY_FD | undefined }). @@ -46,6 +50,10 @@ %% fetch_pid_for(DbName, self()). +fetch_pid_for(#ioq_request{}=Req) -> + gen_server:call(?MODULE, {fetch, Req}, infinity). + + %% TODO: cleanup the overloaded arity once experiments concluded %%fetch_pid_for(DbName, undefined) when is_binary(DbName) -> %% fetch_pid_for(DbName, undefined, self()); @@ -111,8 +119,11 @@ init([]) -> {ok, Monitors} = khash:new(), Dispatch = case config:get("ioq.opener", "dispatch", undefined) of "by_shard" -> ?BY_SHARD; - "by_user" -> ?BY_USER; - _ -> ?DEFAULT_DISPATCH + "by_db" -> ?BY_DB; + "by_user" -> ?BY_USER; + "by_class" -> ?BY_CLASS; + "by_fd" -> ?BY_FD; + _ -> ?DEFAULT_DISPATCH end, St = #st{ pid_idx = PidIdx, @@ -122,6 +133,30 @@ init([]) -> {ok, St}. +handle_call({fetch, #ioq_request{}=Req}, _From, #st{dispatch=Dispatch}=St) -> + Key = case Dispatch of + ?BY_SHARD -> + Req#ioq_request.shard; + ?BY_DB -> + Req#ioq_request.db; + ?BY_USER -> + Req#ioq_request.user; + ?BY_CLASS -> + Req#ioq_request.class; + ?BY_FD -> + {fd, Req#ioq_request.fd} + end, + IOQPid = case khash:get(St#st.pid_idx, Key, not_found) of + not_found -> + {ok, Pid} = ioq_server2:start_link({Dispatch, Key}), + khash:put(St#st.pid_idx, Key, Pid), + khash:put(St#st.pid_idx, Pid, Key), + Pid; + Pid -> + Pid + end, + ok = add_monitor(St#st.monitors, Req#ioq_request.fd, IOQPid), + {reply, IOQPid, St}; handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) -> Caller = case FdPid of undefined -> From; @@ -134,7 +169,7 @@ handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) - end, IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of not_found -> - {ok, Pid} = ioq_server2:start_link({user, Name}), + {ok, Pid} = ioq_server2:start_link({?BY_USER, Name}), khash:put(St#st.pid_idx, Name, Pid), khash:put(St#st.pid_idx, Pid, Name), Pid; @@ -151,7 +186,7 @@ handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) %% TODO: DbName = drop_compact_ext(DbName0), IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of not_found -> - {ok, Pid} = ioq_server2:start_link({shard, DbName}), + {ok, Pid} = ioq_server2:start_link({?BY_SHARD, DbName}), khash:put(St#st.pid_idx, DbName, Pid), khash:put(St#st.pid_idx, Pid, DbName), Pid; @@ -210,7 +245,8 @@ add_monitor(Mons, FdPid, IOQPid) -> not_found -> Ref0 = erlang:monitor(process, FdPid), khash:put(Mons, Ref0, PidKey), - khash:put(Mons, PidKey, Ref0); + khash:put(Mons, PidKey, Ref0), + khash:put(Mons, FdPid, Ref0); Ref0 -> Ref0 end, @@ -233,12 +269,13 @@ drop_monitor(Mons, Ref) when is_reference(Ref) -> not_found -> %% TODO: shouldn't happen throw(unexpected); - {_FdPid, IOQPid}=PidKey -> + {FdPid, IOQPid}=PidKey -> case khash:get(Mons, IOQPid, not_found) of not_found -> %% TODO: shouldn't happen throw(unexpected); Refs -> + khash:del(Mons, FdPid), khash:del(Mons, Ref), khash:del(Mons, PidKey), case lists:delete(Ref, Refs) of diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl index 60f4276..9483a92 100644 --- a/src/ioq_server2.erl +++ b/src/ioq_server2.erl @@ -112,13 +112,14 @@ call(Fd, Msg, Dimensions) when Dimensions =/= undefined -> _ -> Server = case ioq_opener:get_pid_for(Fd) of undefined -> - %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of - %% undefined -> - %% ioq_server2; - %% IOQPid -> - %% IOQPid - %%end; - ioq_server2; + IOQPid = case ioq_opener:fetch_pid_for(Req) of + undefined -> + ioq_server2; + IOQPid0 -> + IOQPid0 + end, + ioq_opener:set_pid_for(Fd, IOQPid), + IOQPid; IOQPid -> IOQPid end, @@ -270,9 +271,9 @@ start_link() -> start_link(?MODULE) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []); -start_link({user, _Name}=User) -> +start_link({by_user, _Name}=User) -> gen_server:start_link(?MODULE, [User], []); -start_link({shard, _Name}=Shard) -> +start_link({by_shard, _Name}=Shard) -> gen_server:start_link(?MODULE, [Shard], []). @@ -816,11 +817,6 @@ test_simple_dedupe(St0) -> from = FromA, key = {Fd, Pos} }, - _Request1B = Request0#ioq_request{ - init_priority = Priority, - from = FromA, - key = {Fd, Pos} - }, {noreply, St2, 0} = handle_call(Request0, FromA, St1), {noreply, St3, 0} = handle_call(Request0, FromB, St2), {reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3), @@ -890,7 +886,7 @@ test_auto_scale(#state{queue=HQ}=St0) -> {_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ), Tests0 = [?_assertEqual(PriorityA, PriorityA2)], {_St, Tests} = lists:foldl( - fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) -> + fun(_N, {#state{iterations=I, resize_limit=_RL}=StN0, TestsN}) -> ReqN = BaseReq#ioq_request{ref=make_ref()}, ExpectedPriority = case I == 1 of false -> PriorityA; @@ -981,7 +977,7 @@ cleanup(Servers) -> instantiate(S) -> - Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()), + Old = ?DEFAULT_CONCURRENCY * (1 + length(shards())), [{inparallel, lists:map(fun(IOClass) -> lists:map(fun(Shard) -> check_call(S, make_ref(), priority(IOClass, Shard)) diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl index c2502c5..c228241 100644 --- a/test/ioq_tests.erl +++ b/test/ioq_tests.erl @@ -35,14 +35,16 @@ cleanup({Apps, Server}) -> exit(Server, kill). instantiate({_, S}) -> + Shards = shards(), [{inparallel, lists:map(fun(IOClass) -> lists:map(fun(Shard) -> check_call(S, make_ref(), priority(IOClass, Shard)) - end, shards()) + end, Shards) end, io_classes())}, case ioq:ioq2_enabled() of true -> - ?_assertEqual(1, ioq:set_disk_concurrency(10)); + %% TODO: don't assume IOQ2 concurrency is 1 + ?_assertEqual(1 + length(Shards), ioq:set_disk_concurrency(10)); false -> ?_assertEqual(20, ioq:set_disk_concurrency(10)) end,
