This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 4dbf745b913b0879d8ca5c02cd9e72a2a0c08ca6 Author: Paul J. Davis <[email protected]> AuthorDate: Thu Jul 18 15:05:00 2019 -0500 Fix compiler errors --- src/couch_views/src/couch_views.erl | 18 +++--- src/couch_views/src/couch_views_encoding.erl | 10 +-- src/couch_views/src/couch_views_fdb.erl | 81 ++++++++++++----------- src/couch_views/src/couch_views_indexer.erl | 97 ++++++++++++---------------- src/couch_views/src/couch_views_jobs.erl | 29 +++++---- src/couch_views/src/couch_views_reader.erl | 19 +++--- src/couch_views/src/couch_views_server.erl | 30 +-------- src/couch_views/src/couch_views_util.erl | 1 + 8 files changed, 127 insertions(+), 158 deletions(-) diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl index 7deb54d..e10675b 100644 --- a/src/couch_views/src/couch_views.erl +++ b/src/couch_views/src/couch_views.erl @@ -31,13 +31,12 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), #mrst{ - views = Views, - language = Lang + views = Views } = Mrst, + View = get_view(ViewName, Views), QueryArgs1 = couch_mrview_util:set_view_type(QueryArgs0, View, Views), QueryArgs2 = couch_mrview_util:validate_args(QueryArgs1), - VInfo = couch_mrview_util:extract_view(Lang, QueryArgs2, View, Views), case is_reduce_view(QueryArgs2) of true -> throw({not_implemented}); false -> ok @@ -72,15 +71,18 @@ maybe_update_view(Db, Mrst, _Args) -> end), if WaitSeq == ready -> ok; true -> - couch_views_jobs:build_view(Db, Mrst, DbSeq) + couch_views_jobs:build_view(Db, Mrst, WaitSeq) end. +get_view(ViewName, Views) -> + {value, View} = lists:search(fun(View) -> + lists:member(ViewName, View#mrview.map_names) + end, Views), + View. + + is_reduce_view(#mrargs{view_type = ViewType}) -> ViewType =:= red; is_reduce_view({Reduce, _, _}) -> Reduce =:= red. - - -remove_ununsed_values(Args) -> - maps:filter(fun (_, V) -> V /= undefined end, Args). diff --git a/src/couch_views/src/couch_views_encoding.erl b/src/couch_views/src/couch_views_encoding.erl index 9d3e3fc..9f76ea6 100644 --- a/src/couch_views/src/couch_views_encoding.erl +++ b/src/couch_views/src/couch_views_encoding.erl @@ -34,7 +34,7 @@ encode(X) -> encode(X, Type) when Type == key; Type == value -> - erlfdb_tuple:pack(encode_int(X, value)) + erlfdb_tuple:pack(encode_int(X, value)). decode(Encoded) -> @@ -60,7 +60,7 @@ encode_int(Num, value) when is_number(Num) -> encode_int(Bin, key) when is_binary(Bin) -> {?STRING, couch_util:get_sort_key(Bin)}; -encode_int(Bin, value) when is_bianry(Bin) -> +encode_int(Bin, value) when is_binary(Bin) -> {?STRING, Bin}; encode_int(List, Type) when is_list(List) -> @@ -75,7 +75,7 @@ encode_int({Props}, Type) when is_list(Props) -> EV = encode_int(V, Type), {EK, EV} end, Props), - {?OBJECT, list_to_tuple(EncodedProps)}. + {?OBJECT, list_to_tuple(Encoded)}. decode_int({?NULL}) -> @@ -98,8 +98,8 @@ decode_int({?LIST, List}) -> decode_int({?OBJECT, Object}) -> Props = lists:map(fun({EK, EV}) -> - K = decode_int(EncodedK), - V = decode_int(EncodedV), + K = decode_int(EK), + V = decode_int(EV), {K, V} end, tuple_to_list(Object)), {Props}. diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl index 57ed5f1..dc1840d 100644 --- a/src/couch_views/src/couch_views_fdb.erl +++ b/src/couch_views/src/couch_views_fdb.erl @@ -16,7 +16,7 @@ get_update_seq/2, set_update_seq/3, - fold_map_idx/5, + fold_map_idx/6, write_doc/4 ]). @@ -27,6 +27,7 @@ -define(VALUE, 2). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include_lib("fabric/src/fabric2.hrl"). -include("couch_views.hrl"). @@ -40,25 +41,22 @@ get_update_seq(TxDb, #mrst{sig = Sig}) -> db_prefix := DbPrefix } = TxDb, - Key = get_seq_key(Sig, DbPrefix), - case erlfdb:wait(erlfdb:get(Tx, Key)) of + case erlfdb:wait(erlfdb:get(Tx, seq_key(DbPrefix, Sig))) of not_found -> <<>>; UpdateSeq -> UpdateSeq end. -set_view_seq(TxDb, Sig, Seq) -> +set_update_seq(TxDb, Sig, Seq) -> #{ - tx := Tx - db_prefix := DbPrefix, + tx := Tx, + db_prefix := DbPrefix } = TxDb, - SeqKey = get_seq_key(Sig, DbPrefix), - ok = erlfdb:set(Tx, SeqKey, Seq). + ok = erlfdb:set(Tx, seq_key(DbPrefix, Sig), Seq). fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) -> #{ - tx := Tx, db_prefix := DbPrefix } = TxDb, @@ -73,11 +71,11 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) -> docid => undefined, dupe_id => undefined, callback => Callback, - acc => Acc0, + acc => Acc0 }, - {fun fold_fwd/2, FwdAcc} + {fun fold_fwd/2, FwdAcc}; rev -> - RevAcc #{ + RevAcc = #{ prefix => MapIdxPrefix, next => value, value => undefined, @@ -93,24 +91,20 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) -> fabric2_db:fold_range(TxDb, MapIdxPrefix, Fun, Acc, Options). -write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) -> +write_doc(TxDb, Sig, #{deleted := true} = Doc, _ViewIds) -> #{ id := DocId } = Doc, ExistingViewKeys = get_view_keys(TxDb, Sig, DocId), - sclear_id_idx(TxDb, Sig, DocId), + clear_id_idx(TxDb, Sig, DocId), lists:foreach(fun({ViewId, ViewKeys}) -> - clear_map_idx(TxDb, Sig, ViewId, ViewKeys) + clear_map_idx(TxDb, Sig, ViewId, DocId, ViewKeys) end, ExistingViewKeys); write_doc(TxDb, Sig, Doc, ViewIds) -> #{ - db_prefix := DbPrefix - } = TxDb, - - #{ id := DocId, results := Results } = Doc, @@ -122,7 +116,7 @@ write_doc(TxDb, Sig, Doc, ViewIds) -> lists:foreach(fun({ViewId, NewRows}) -> ExistingKeys = fabric2_util:get_value(ViewId, ExistingViewKeys, []), update_id_idx(TxDb, Sig, ViewId, DocId, NewRows), - update_map_idx(TxDb, Sig, ViewId, DocId, ExitingKeys, NewRows) + update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) end, lists:zip(ViewIds, Results)). @@ -131,7 +125,7 @@ fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) -> prefix := Prefix } = Acc, - {{SortKey, DocId}, _DupeId, ?VIEW_ROW_KEY} = + {{SortKey, DocId}, DupeId, ?VIEW_ROW_KEY} = erlfdb_tuple:unpack(RowKey, Prefix), Acc#{ next := val, @@ -147,21 +141,21 @@ fold_fwd({RowKey, EncodedValue}, #{next := val} = Acc) -> key := Key, sort_key := SortKey, docid := DocId, - dupe_id := DocId, - callback := UserCallback + dupe_id := DupeId, + callback := UserCallback, acc := UserAcc0 } = Acc, % We're asserting there that this row is paired % correctly with the previous row by relying on % a badmatch if any of these values don't match. - {{SortKey, DocId}, DupeId, ?VIEW_ROW_VAL} = + {{SortKey, DocId}, DupeId, ?VIEW_ROW_VALUE} = erlfdb_tuple:unpack(RowKey, Prefix), Value = couch_views_encoding:decode(EncodedValue), - NewAcc = UserCallback(DocId, Key, Value, UserAcc0), + UserAcc1 = UserCallback(DocId, Key, Value, UserAcc0), - #{ + Acc#{ next := key, key := undefined, sort_key := undefined, @@ -176,7 +170,7 @@ fold_rev({RowKey, EncodedValue}, #{next := value} = Acc) -> prefix := Prefix } = Acc, - {{SortKey, DocId}, _DupeId, ?VIEW_ROW_VAL} = + {{SortKey, DocId}, DupeId, ?VIEW_ROW_VALUE} = erlfdb_tuple:unpack(RowKey, Prefix), Acc#{ next := key, @@ -192,8 +186,8 @@ fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) -> value := Value, sort_key := SortKey, docid := DocId, - dupe_id := DocId, - callback := UserCallback + dupe_id := DupeId, + callback := UserCallback, acc := UserAcc0 } = Acc, @@ -204,9 +198,9 @@ fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) -> erlfdb_tuple:unpack(RowKey, Prefix), Key = couch_views_encoding:decode(EncodedOriginalKey), - NewAcc = UserCallback(DocId, Key, Value, UserAcc0), + UserAcc1 = UserCallback(DocId, Key, Value, UserAcc0), - #{ + Acc#{ next := val, value := undefined, sort_key := undefined, @@ -222,10 +216,10 @@ clear_id_idx(TxDb, Sig, DocId) -> } = TxDb, {Start, End} = id_idx_range(DbPrefix, Sig, DocId), - ok = erlfdb:clear_range(Start, End). + ok = erlfdb:clear_range(Tx, Start, End). -clear_map_idx(TxDb, Sig, ViewId, ViewKeys) -> +clear_map_idx(TxDb, Sig, ViewId, DocId, ViewKeys) -> #{ tx := Tx, db_prefix := DbPrefix @@ -269,7 +263,7 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) -> lists:foreach(fun({DupeId, Key1, Key2, Val}) -> KK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_KEY), - VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VAL), + VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VALUE), ok = erlfdn:store(Tx, KK, Key2), ok = erlfdb:store(Tx, VK, Val) end, KVsToAdd). @@ -280,15 +274,20 @@ get_view_keys(TxDb, Sig, DocId) -> tx := Tx, db_prefix := DbPrefix } = TxDb, - {Start, End} = id_idx_range(DbPrefix, Sig, DocId) + {Start, End} = id_idx_range(DbPrefix, Sig, DocId), lists:map(fun({K, V}) -> {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId} = erlfdb_tuple:unpack(K, DbPrefix), - ViewKeys = couch_views_encoding:decode(V) + ViewKeys = couch_views_encoding:decode(V), {ViewId, ViewKeys} end, erlfdb:get_range(Tx, Start, End, [])). +seq_key(DbPrefix, Sig) -> + Key = {?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ}, + erlfdb_tuple:pack(Key, DbPrefix). + + id_idx_key(DbPrefix, Sig, DocId, ViewId) -> Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}, erlfdb_tuple:pack(Key, DbPrefix). @@ -300,12 +299,12 @@ id_idx_range(DbPrefix, Sig, DocId) -> map_idx_prefix(DbPrefix, Sig, ViewId) -> - Key = {?DB_VIES, Sig, ?VIEW_MAP_RANGE, ViewId}, - erlfdb_tuple:pack(Key). + Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId}, + erlfdb_tuple:pack(Key, DbPrefix). -map_idx_key(MapIdxPrefix, MapKey, DocId, DupeId, Type) - Key = {MapKey, DocId, DupeId, Type}, +map_idx_key(MapIdxPrefix, MapKey, DupeId, Type) -> + Key = {MapKey, DupeId, Type}, erldb_tuple:encode(Key, MapIdxPrefix). @@ -320,7 +319,7 @@ process_rows(Rows) -> EK1 = couch_views_encoding:encode(K, key), EK2 = couch_views_encoding:encode(K, value), EV = couch_views_encoding:encode(V, value), - {EKK, EKV, EV} + {EK1, EK2, EV} end, Rows), Grouped = lists:foldl(fun({K1, K2, V}, Acc) -> diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 1a84116..91072a1 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -21,10 +21,10 @@ init/0 ]). --include_lib("couch_views/include/couch_views.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include_lib("fabric/src/fabric2.hrl"). --include_lib("eunit/include/eunit.hrl"). +-include("couch_views.hrl"). % TODO: % * Handle timeouts of transaction and other errors @@ -57,6 +57,8 @@ init() -> db_seq => undefined, view_seq => undefined, last_seq => undefined, + job => Job, + job_data => Data, count => 0, limit => num_changes(), doc_acc => [], @@ -67,7 +69,7 @@ init() -> update(#{} = Db, Mrst0, State0) -> - fabric2_fdb:transactional(Db, fun(TxDb) -> + {Mrst2, State3} = fabric2_fdb:transactional(Db, fun(TxDb) -> % In the first iteration of update we need % to populate our db and view sequences State1 = case State0 of @@ -75,7 +77,7 @@ update(#{} = Db, Mrst0, State0) -> State0#{ tx_db := TxDb, db_seq := fabric2_db:get_update_seq(TxDb), - view_seq := couch_views_fdb:get_update_seq(TxDb, Mrst) + view_seq := couch_views_fdb:get_update_seq(TxDb, Mrst0) }; _ -> State0#{ @@ -93,53 +95,29 @@ update(#{} = Db, Mrst0, State0) -> } = State2, {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc), - write_docs(Db, Mrst1, MappedResults, State2), + write_docs(Db, Mrst1, MappedDocs, State2), case Count < Limit of true -> - report_progress(State2, finished); + report_progress(State2, finished), + {Mrst1, finished}; false -> report_progress(State2, update), - State3 = maps:merge(FinalState, #{ - count => 0, - doc_acc => [], - db_seq => LastSeq, - last_seq => 0, - mrst => Mrst1 - }), - - end). - - -update_int(#{} = Db, State) -> - - - #{ - count := Count, - limit := Limit, - doc_acc := DocAcc, - last_seq := LastSeq, - callback := Cb, - callback_args := CallbackArgs, - mrst := Mrst - } = FinalState, - - {MappedDocs, Mrst1} = map_docs(Mrst, DocAcc), - write_docs(Db, Mrst1, MappedDocs, FinalState), - - case Count < Limit of - true -> - Cb(undefined, finished, CallbackArgs, Db, Mrst, LastSeq); - false -> - NextState = maps:merge(FinalState, #{ - limit => Limit, - count => 0, - doc_acc => [], - since_seq => LastSeq, - last_seq => 0, - mrst => Mrst1 - }), - update_int(Db, NextState) + {Mrst1, State2#{ + tx_db := undefined, + count := 0, + doc_acc := [], + view_seq := LastSeq, + last_seq := undefined + }} + end + end), + + case State3 of + finished -> + couch_query_servers:stop_doc_map(Mrst2#mrst.qserver); + _ -> + update(Db, Mrst2, State3) end. @@ -172,7 +150,6 @@ process_changes(Change, Acc) -> Acc1 = case {Id, IncludeDesign} of {<<"_design/", _/binary>>, false} -> - % {ok, Doc} = fabric2_db:open_doc(Db, Id), maps:merge(Acc, #{ count => Count + 1, last_seq => LastSeq @@ -186,11 +163,11 @@ process_changes(Change, Acc) -> end, Change1 = maps:put(doc, Doc, Change), - maps:merge(Acc, #{ - doc_acc => DocAcc ++ [Change1], - count => Count + 1, - last_seq => LastSeq - }) + Acc#{ + doc_acc := DocAcc ++ [Change1], + count := Count + 1, + last_seq := LastSeq + } end, {ok, Acc1}. @@ -247,10 +224,22 @@ start_query_server(#mrst{} = Mrst) -> report_progress(State, UpdateType) -> + #{ + tx_db := TxDb, + job := Job, + job_data := JobData, + last_seq := LastSeq + } = State, + + NewData = JobData#{view_seq => LastSeq}, + case UpdateType of update -> - couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq); + couch_jobs:update(TxDb, Job, NewData); finished -> - couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq) + couch_jobs:finish(TxDb, Job, NewData) end. + +num_changes() -> + config:get_integer("couch_views", "change_limit", 100). diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl index 9714b29..9e299af 100644 --- a/src/couch_views/src/couch_views_jobs.erl +++ b/src/couch_views/src/couch_views_jobs.erl @@ -19,6 +19,7 @@ ]). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include("couch_views.hrl"). @@ -26,18 +27,18 @@ set_timeout() -> couch_jobs:set_type_timeout(?INDEX_JOB_TYPE, 6 * 1000). -build_view(Db, Mrst, UpdateSeq) -> - {ok, JobId} = build_view_async(Db, Mrst), +build_view(TxDb, Mrst, UpdateSeq) -> + {ok, JobId} = build_view_async(TxDb, Mrst), case wait_for_job(JobId, UpdateSeq) of ok -> ok; - retry -> build_view(Db, Mrst, UpdateSeq) + retry -> build_view(TxDb, Mrst, UpdateSeq) end. -build_view_async(Db, Mrst) -> - JobId = create_job_id(TxDb, Mrst), - JobData = create_job_data(TxDb, Mrst), - ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData). +build_view_async(TxDb, Mrst) -> + JobId = job_id(TxDb, Mrst), + JobData = job_data(TxDb, Mrst), + ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData), {ok, JobId}. @@ -45,7 +46,7 @@ build_view_async(Db, Mrst) -> wait_for_job(JobId, UpdateSeq) -> case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of {ok, Subscription, _State, _Data} -> - wait_for_job(JobId, Subscription, UpdateSeq) + wait_for_job(JobId, Subscription, UpdateSeq); {ok, finished, Data} -> case Data of #{view_seq := ViewSeq} when ViewSeq >= UpdateSeq -> @@ -57,7 +58,7 @@ wait_for_job(JobId, UpdateSeq) -> wait_for_job(JobId, Subscription, UpdateSeq) -> - case wait(Subscription, infinity) of + case wait(Subscription) of {error, Error} -> erlang:error(Error); {finished, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq -> @@ -72,14 +73,14 @@ wait_for_job(JobId, Subscription, UpdateSeq) -> end. -get_id(#{name := DbName}, #mrst{sig = Sig}) -> - create_job_id(DbName, Sig); +job_id(#{name := DbName}, #mrst{sig = Sig}) -> + job_id(DbName, Sig); -get_id(DbName, Sig) -> +job_id(DbName, Sig) -> <<DbName/binary, Sig/binary>>. -create_job_data(Db, Mrst) -> +job_data(Db, Mrst) -> #mrst{ idx_name = DDocId, sig = Sig @@ -93,7 +94,7 @@ create_job_data(Db, Mrst) -> wait(Subscription) -> - case couch_jobs:wait(JobSubscription, infinity) of + case couch_jobs:wait(Subscription, infinity) of {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData}; timeout -> {error, timeout} end. diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl index 56b23f2..8d2bf5a 100644 --- a/src/couch_views/src/couch_views_reader.erl +++ b/src/couch_views/src/couch_views_reader.erl @@ -19,6 +19,7 @@ -include("couch_views.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include_lib("fabric/src/fabric2.hrl"). @@ -39,7 +40,7 @@ read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) -> fabric2_fdb:transactional(Db, fun(TxDb) -> Acc0 = #{ - db => TxDb + db => TxDb, skip => Args#mrargs.skip, mrargs => Args, callback => UserCallback, @@ -59,7 +60,7 @@ read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) -> acc := UserAcc2 } = Acc1, - maybe_stop(Callback(complete, UserAcc2) + maybe_stop(UserCallback(complete, UserAcc2)) end) catch throw:{done, Out} -> {ok, Out} @@ -71,10 +72,10 @@ handle_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 -> handle_row(DocId, Key, Value, Acc) -> #{ - db := TxDb + db := TxDb, mrargs := Args, callback := UserCallback, - acc := UserAcc + acc := UserAcc0 } = Acc, BaseRow = [ @@ -83,21 +84,21 @@ handle_row(DocId, Key, Value, Acc) -> {value, Value} ], - Row = BaseRow ++ if not IncludeDocs -> []; true -> + Row = BaseRow ++ if not Args#mrargs.include_docs -> []; true -> DocOpts0 = Args#mrargs.doc_options, - DocOpts1 = OpenOpts0 ++ case Args#mrargs.conflicts of + DocOpts1 = DocOpts0 ++ case Args#mrargs.conflicts of true -> [conflicts]; false -> [] end, - DocObj = case fabric2_db:open_doc(Db, DocId, DocOpts1) of + DocObj = case fabric2_db:open_doc(TxDb, DocId, DocOpts1) of {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts1); {not_found, _} -> null end, [{doc, DocObj}] end, - UserAcc1 = maybe_stop(Callback({row, Row1}, UserAcc0)), - Acc#{acc := UserAcc1} + UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)), + Acc#{acc := UserAcc1}. get_view_id(ViewName, Views) -> diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl index 8ec2425..0417a9b 100644 --- a/src/couch_views/src/couch_views_server.erl +++ b/src/couch_views/src/couch_views_server.erl @@ -41,7 +41,7 @@ start_link() -> init(_) -> process_flag(trap_exit, true), couch_views_jobs:set_timeout(), - State0 = #{ + State = #{ workers => [], num_workers => num_workers() }, @@ -70,7 +70,7 @@ handle_info({'EXIT', Pid, Reason}, State) -> NewWorkers -> if Reason == normal -> ok; true -> LogMsg = "~p : indexer process ~p exited with ~p", - couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + couch_log:error(LogMsg, [?MODULE, Pid, Reason]) end, {noreply, spawn_workers(State#{workers := NewWorkers})} end; @@ -90,36 +90,12 @@ spawn_workers(State) -> } = State, case length(Workers) < NumWorkers of true -> - Pid = spawn_worker(), + Pid = couch_views_indexer:spawn_link(), spawn_workers(State#{workers := [Pid | Workers]}); false -> State end. -spawn_worker() -> - couch_views_indexer:spawn_link(). - - -blocking_acceptor(Parent) -> - case couch_views_jobs:accept() of - not_found -> - blocking_acceptor(Parent); - {ok, Job, JobData} -> - gen_server:cast(Parent, {job, Job, JobData}) - end. - - -check_finished_process(#{acceptor_pid := Pid} = State, Pid) -> - State1 = State#{acceptor_pid := undefined}, - spawn_acceptor(State1); - -check_finished_process(State, Pid) -> - #{workers := Workers} = State, - Workers1 = maps:remove(Pid, Workers), - State#{workers := Workers1}. - - - num_workers() -> config:get_integer("couch_views", "max_workers", ?MAX_WORKERS). diff --git a/src/couch_views/src/couch_views_util.erl b/src/couch_views/src/couch_views_util.erl index d7ed29f..b88cfcd 100644 --- a/src/couch_views/src/couch_views_util.erl +++ b/src/couch_views/src/couch_views_util.erl @@ -19,6 +19,7 @@ -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include("couch_views.hrl").
