This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
The following commit(s) were added to refs/heads/ioq-per-shard-or-user by this
push:
new ba74e0b Fix IOQ for dreyfus and some tests
ba74e0b is described below
commit ba74e0bec5616557d774760802be79df904db98f
Author: Russell Branca <[email protected]>
AuthorDate: Fri Sep 3 14:20:01 2021 -0700
Fix IOQ for dreyfus and some tests
---
src/ioq_server2.erl | 21 +++++++++++++++++----
src/ioq_sup.erl | 10 ++++++----
test/ioq_tests.erl | 7 ++++---
3 files changed, 27 insertions(+), 11 deletions(-)
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 7269368..3cedeed 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -107,6 +107,14 @@ call(Fd, Msg, Dimensions) ->
%% TODO: handle Clouseau requests with isolated IOQ2 pid
%%call_int(#ioq_file{fd={clouseau, _}=IOF, Req) ->
+call_int({Name, Node}=Server, #ioq_request{msg=Msg}=Req) when is_atom(Name)
andalso is_atom(Node) ->
+ case should_bypass(Req) of
+ true ->
+ gen_server:call(Server, Msg, infinity);
+ false ->
+ %% TODO: add dedicated clouseau IOQ pid
+ gen_server:call(ioq_server2, Req, infinity)
+ end;
call_int(#ioq_file{ioq=undefined, fd=Fd}, #ioq_request{msg=Msg}=Req) ->
Class = atom_to_list(Req#ioq_request.class),
case config:get_boolean("ioq2.bypass", Class, false) of
@@ -637,6 +645,10 @@ prioritize_request(Req, State) ->
end.
+should_bypass(#ioq_request{class=Class}) ->
+ config:get_boolean("ioq2.bypass", atom_to_list(Class), false).
+
+
%% ioq_server2 Tests
@@ -954,7 +966,7 @@ setup() ->
end,
F(F)
end,
- spawn(fun() -> FakeServer(FakeServer) end).
+ #ioq_file{fd=spawn(fun() -> FakeServer(FakeServer) end)}.
setup_many(Count, RespDelay) ->
@@ -977,7 +989,7 @@ setup_many(Count, RespDelay) ->
end,
F(F)
end,
- [spawn(fun() -> FakeServer(FakeServer) end) || _ <- lists:seq(1, Count)].
+ [#ioq_file{fd=spawn(fun() -> FakeServer(FakeServer) end)} || _ <-
lists:seq(1, Count)].
cleanup(Server) when not is_list(Server) ->
@@ -986,11 +998,12 @@ cleanup(Servers) ->
ok = application:stop(ioq),
true = meck:validate(config),
ok = meck:unload(config),
- [exit(Server, kill) || Server <- Servers].
+ [exit(ioq:fd_pid(Server), kill) || Server <- Servers].
instantiate(S) ->
- Old = ?DEFAULT_CONCURRENCY * (1 + length(shards())),
+ %%Old = ?DEFAULT_CONCURRENCY * (1 + length(shards())),
+ Old = ?DEFAULT_CONCURRENCY,
[{inparallel, lists:map(fun(IOClass) ->
lists:map(fun(Shard) ->
check_call(S, make_ref(), priority(IOClass, Shard))
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 973886a..436afb1 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -44,9 +44,10 @@ get_ioq2_servers_new() ->
[ioq_server2 | ioq_opener:get_ioq_pids()].
get_ioq2_servers() ->
- lists:map(fun(I) ->
- list_to_atom("ioq_server_" ++ integer_to_list(I))
- end, lists:seq(1, erlang:system_info(schedulers))).
+ [ioq_server2].
+ %%lists:map(fun(I) ->
+ %% list_to_atom("ioq_server_" ++ integer_to_list(I))
+ %%end, lists:seq(1, erlang:system_info(schedulers))).
handle_config_change("ioq", _Key, _Val, _Persist, St) ->
gen_server:cast(ioq_server, update_config),
@@ -69,7 +70,8 @@ handle_config_terminate(_Server, _Reason, _State) ->
ok.
processes(ioq2) ->
- filter_children("^ioq_server_.*$");
+ [{ioq_server2, whereis(ioq_server2)}];
+ %%filter_children("^ioq_server_.*$");
processes(ioq) ->
filter_children("^ioq_server$");
processes(config_listener_mon) ->
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index c228241..f266eac 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -14,6 +14,7 @@
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
+-include_lib("ioq/include/ioq.hrl").
all_test_() ->
{setup, fun setup/0, fun cleanup/1, fun instantiate/1}.
@@ -28,11 +29,11 @@ setup() ->
end,
F(F)
end,
- {Apps, spawn(fun() -> FakeServer(FakeServer) end)}.
+ {Apps, #ioq_file{fd=spawn(fun() -> FakeServer(FakeServer) end)}}.
cleanup({Apps, Server}) ->
test_util:stop_applications(Apps),
- exit(Server, kill).
+ exit(ioq:fd_pid(Server), kill).
instantiate({_, S}) ->
Shards = shards(),
@@ -44,7 +45,7 @@ instantiate({_, S}) ->
case ioq:ioq2_enabled() of
true ->
%% TODO: don't assume IOQ2 concurrency is 1
- ?_assertEqual(1 + length(Shards), ioq:set_disk_concurrency(10));
+ ?_assertEqual(1, ioq:set_disk_concurrency(10));
false ->
?_assertEqual(20, ioq:set_disk_concurrency(10))
end,