This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 64be6bf4afbdadd857e1791ed004b1cb5c3399fe Author: Garren Smith <[email protected]> AuthorDate: Mon Jul 8 16:40:29 2019 +0200 fixes based on reviews --- rel/overlay/etc/default.ini | 4 +- src/couch_views/src/couch_views.erl | 23 ++++--- src/couch_views/src/couch_views_jobs.erl | 2 +- src/couch_views/src/couch_views_reader.erl | 63 ++++++++--------- src/couch_views/src/couch_views_worker_server.erl | 84 +++++++++++++---------- src/couch_views/test/couch_views_map_test.erl | 1 - test/elixir/test/map_test.exs | 13 +--- 7 files changed, 92 insertions(+), 98 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 59b7d57..11bd611 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -225,9 +225,7 @@ iterations = 10 ; iterations for password hashing ; Settings for view indexing [couch_views] -; type_check_period_msec = 500 -; type_check_max_jitter_msec = 500 -; change_limit = 100 +; max_workers = 100 ; CSP (Content Security Policy) Support for _utils [csp] diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl index 4ccf0fa..69d6765 100644 --- a/src/couch_views/src/couch_views.erl +++ b/src/couch_views/src/couch_views.erl @@ -34,15 +34,20 @@ map_query(Db, DDoc, ViewName, Callback, Acc0, Args0) -> process_args(#{} = Args) -> - Args1 = maps:filter(fun (_, V) -> V /= undefined end, Args), - - maps:merge(#{ - direction => fwd, - inclusive_end => true, - update => true, - skip => 0, - limit => ?MAX_VIEW_LIMIT - }, Args1). + Args1 = remove_ununsed_values(Args), + Defaults = #{ + direction => fwd, + inclusive_end => true, + update => true, + skip => 0, + limit => ?MAX_VIEW_LIMIT + }, + + maps:merge(Defaults, Args1). + + +remove_ununsed_values(Args) -> + maps:filter(fun (_, V) -> V /= undefined end, Args). maybe_build_index(_Db, _Mrst, #{update := false}) -> diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl index ff99475..31ab728 100644 --- a/src/couch_views/src/couch_views_jobs.erl +++ b/src/couch_views/src/couch_views_jobs.erl @@ -50,7 +50,7 @@ add(TxDb, Mrst) -> JobData = create_job_data(TxDb, Mrst, 0), JobId = create_job_id(TxDb, Mrst), - JTx = couch_jobs_fdb:get_jtx(), + JTx = couch_jobs_fdb:get_jtx(TxDb), couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData). diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl index 2ddb5b6..f4e768a 100644 --- a/src/couch_views/src/couch_views_reader.erl +++ b/src/couch_views/src/couch_views_reader.erl @@ -136,27 +136,12 @@ get_unpack_fun(TxDb, Opts, Callback) -> UnPackFwd = fun({K, V}, State) -> case couch_views_fdb:unpack_map_row(TxDb, K, V) of {key, _Id, RowKey} -> - maps:put(current_key, RowKey, State); + State#{current_key => RowKey}; {value, Id, RowValue} -> #{ - current_key := RowKey, - acc := Acc, - skip := Skip, - db := Db + current_key := RowKey } = State, - - case Skip > 0 of - true -> - maps:put(skip, Skip - 1, State); - false -> - Row = [{id, Id}, {key, RowKey}, {value, RowValue}], - - IncludeDoc = maps:get(include_docs, State, false), - Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc), - - {ok, AccNext} = Callback({row, Row1}, Acc), - maps:put(acc, AccNext, State) - end + process_map_row(Id, RowKey, RowValue, State, Callback) end end, @@ -164,26 +149,11 @@ get_unpack_fun(TxDb, Opts, Callback) -> case couch_views_fdb:unpack_map_row(TxDb, K, V) of {key, Id, RowKey} -> #{ - current_value := RowValue, - acc := Acc, - skip := Skip, - db := Db + current_value := RowValue } = State, - - case Skip > 0 of - true -> - maps:put(skip, Skip - 1, State); - false -> - Row = [{id, Id}, {key, RowKey}, {value, RowValue}], - - IncludeDoc = maps:get(include_docs, State, false), - Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc), - - {ok, AccNext} = Callback({row, Row1}, Acc), - maps:put(acc, AccNext, State) - end; + process_map_row(Id, RowKey, RowValue, State, Callback); {value, _Id, RowValue} -> - maps:put(current_value, RowValue, State) + State#{current_value => RowValue} end end, @@ -193,6 +163,27 @@ get_unpack_fun(TxDb, Opts, Callback) -> end. +process_map_row(Id, RowKey, RowValue, State, Callback) -> + #{ + acc := Acc, + skip := Skip, + db := Db + } = State, + + case Skip > 0 of + true -> + State#{skip := Skip -1}; + false -> + Row = [{id, Id}, {key, RowKey}, {value, RowValue}], + + IncludeDoc = maps:get(include_docs, State, false), + Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc), + + {ok, AccNext} = Callback({row, Row1}, Acc), + State#{acc := AccNext} + end. + + maybe_include_doc(_Db, _Id, Row, false) -> Row; diff --git a/src/couch_views/src/couch_views_worker_server.erl b/src/couch_views/src/couch_views_worker_server.erl index 1c815e5..13bd9aa 100644 --- a/src/couch_views/src/couch_views_worker_server.erl +++ b/src/couch_views/src/couch_views_worker_server.erl @@ -31,8 +31,7 @@ ]). --define(TYPE_CHECK_PERIOD_DEFAULT, 500). --define(MAX_JITTER_DEFAULT, 100). +-define(MAX_WORKERS, 100). start_link() -> @@ -41,8 +40,12 @@ start_link() -> init(_) -> couch_views_jobs:set_timeout(), - schedule_check(), - {ok, #{}}. + State0 = #{ + workers => #{}, + acceptor_pid => undefined + }, + State = spawn_acceptor(State0), + {ok, State}. terminate(_, _St) -> @@ -53,19 +56,20 @@ handle_call(Msg, _From, St) -> {stop, {bad_call, Msg}, {bad_call, Msg}, St}. +handle_cast({job, Job, JobData}, State) -> + State1 = start_worker(State, Job, JobData), + State2 = spawn_acceptor(State1), + {noreply, State2}; + handle_cast(Msg, St) -> {stop, {bad_cast, Msg}, St}. -handle_info(check_for_jobs, State) -> - accept_jobs(), - schedule_check(), - {noreply, State}; - -handle_info({'DOWN', _Ref, process, Pid, Reason}, St) -> +handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> LogMsg = "~p : process ~p exited with ~p", couch_log:error(LogMsg, [?MODULE, Pid, Reason]), - {noreply, St}; + State1 = check_finished_process(State, Pid), + {noreply, State1}; handle_info(Msg, St) -> couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]), @@ -76,35 +80,43 @@ code_change(_OldVsn, St, _Extra) -> {ok, St}. -accept_jobs() -> +start_worker(State, Job, JobData) -> + #{workers := Workers} = State, + {Pid, _Ref} = spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end), + Workers1 = Workers#{Pid => true}, + State#{workers := Workers1}. + + +spawn_acceptor(State) -> + #{ + workers := Workers, + acceptor_pid := Pid + } = State, + MaxWorkers = config:get_integer("couch_views", "max_workers", ?MAX_WORKERS), + case maps:size(Workers) >= MaxWorkers of + false when not is_pid(Pid) -> + Parent = self(), + {Pid1, _Ref} = spawn_monitor(fun() -> blocking_acceptor(Parent) end), + State#{acceptor_pid := Pid1}; + _ -> + State + end. + + +blocking_acceptor(Parent) -> case couch_views_jobs:accept() of not_found -> - ok; + blocking_acceptor(Parent); {ok, Job, JobData} -> - start_worker(Job, JobData), - % keep accepting jobs until not_found - accept_jobs() + gen_server:cast(Parent, {job, Job, JobData}) end. -start_worker(Job, JobData) -> - % TODO Should I monitor it, or let jobs do that? - spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end), - ok. - - -schedule_check() -> - Timeout = get_period_msec(), - MaxJitter = max(Timeout div 2, get_max_jitter_msec()), - Wait = Timeout + rand:uniform(max(1, MaxJitter)), - timer:send_after(Wait, self(), check_for_jobs). - - -get_period_msec() -> - config:get_integer("couch_views", "type_check_period_msec", - ?TYPE_CHECK_PERIOD_DEFAULT). - +check_finished_process(#{acceptor_pid := Pid} = State, Pid) -> + State1 = State#{acceptor_pid := undefined}, + spawn_acceptor(State1); -get_max_jitter_msec() -> - config:get_integer("couch_views", "type_check_max_jitter_msec", - ?MAX_JITTER_DEFAULT). +check_finished_process(State, Pid) -> + #{workers := Workers} = State, + Workers1 = maps:remove(Pid, Workers), + State#{workers := Workers1}. diff --git a/src/couch_views/test/couch_views_map_test.erl b/src/couch_views/test/couch_views_map_test.erl index bbad93f..e7be521 100644 --- a/src/couch_views/test/couch_views_map_test.erl +++ b/src/couch_views/test/couch_views_map_test.erl @@ -300,7 +300,6 @@ should_map_duplicate_keys() -> {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 2}]}, {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 3}]} ]}, - ?debugFmt("EXPE ~p ~n", [Expect]), ?assertEqual(Expect, Result). diff --git a/test/elixir/test/map_test.exs b/test/elixir/test/map_test.exs index b7a809d..7c443ab 100644 --- a/test/elixir/test/map_test.exs +++ b/test/elixir/test/map_test.exs @@ -19,7 +19,7 @@ defmodule ViewMapTest do "two" end - doc = %{ + %{ :_id => "doc-id-#{i}", :value => i, :some => "field", @@ -78,17 +78,6 @@ defmodule ViewMapTest do resp = Couch.post("/#{db_name}/_bulk_docs", body: body) Enum.each(resp.body, &assert(&1["ok"])) - # ddoc = %{ - # :_id => "_design/map", - # views: %{ - # some: %{map: map_fun1}, - # map_some: %{map: map_fun2} - # } - # } - # resp = Couch.put("/#{db_name}/#{ddoc._id}", body: ddoc) - # IO.inspect resp - # assert resp.status_code == 201 - {:ok, [db_name: db_name]} end
