This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 14321badcfae21f428e63024b546f671f07b9355 Author: Paul J. Davis <[email protected]> AuthorDate: Wed Jul 17 13:26:31 2019 -0500 Simplify worker vs. indexer distinction This just turns an indexer into a job handler and removes the worker concept entirely. couch_views_server just starts `max_workers` indexer process that each wait for a job to process. Once processing is finished the worker exits and couch_views_server spawns a new indexer to replace it. --- src/couch_views/src/couch_views_indexer.erl | 270 ++++++++++++++-------------- src/couch_views/src/couch_views_worker.erl | 44 ----- 2 files changed, 132 insertions(+), 182 deletions(-) diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index e9f0b41..1a84116 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -13,55 +13,106 @@ -module(couch_views_indexer). -export([ - update/2, - update/4, - - % For tests - map_docs/2, - write_doc/4 + spawn_link/0 ]). --include("couch_views.hrl"). +-export([ + init/0 +]). + +-include_lib("couch_views/include/couch_views.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("fabric/src/fabric2.hrl"). -include_lib("eunit/include/eunit.hrl"). -% TODO: +% TODO: % * Handle timeouts of transaction and other errors -update(Db, Mrst) -> - Noop = fun (_) -> ok end, - update(Db, Mrst, Noop, []). - - -update(#{} = Db, Mrst, ProgressCallback, ProgressArgs) - when is_function(ProgressCallback, 6) -> - try - Seq = couch_views_fdb:get_update_seq(Db, Mrst), - State = #{ - since_seq => Seq, - count => 0, - limit => config:get_integer("couch_views", "change_limit", 100), - doc_acc => [], - last_seq => Seq, - callback => ProgressCallback, - callback_args => ProgressArgs, - mrst => Mrst - }, - update_int(Db, State) - catch error:database_does_not_exist -> - #{db_prefix := DbPrefix} = Db, - couch_log:notice("couch_views_indexer stopped" - "- ~p database does not exist", [DbPrefix]) - end. + +spawn_link() -> + proc_lib:spawn_link(?MODULE, init, []). + + +init() -> + {ok, Job, Data} = couch_jobs:accept(?INDEX_JOB_TYPE, #{}), + + #{ + <<"db_name">> := DbName, + <<"ddoc_id">> := DDocId, + <<"sig">> := Sig + } = Data, + + {ok, Db} = fabric2_db:open(DbName, []), + {ok, DDoc} = fabric2_db:open_doc(Db, DDocId), + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + + if Mrst#mrst.sig == Sig -> ok; true -> + couch_jobs:finish(Job, Data#{error => mismatched_signature}), + exit(normal) + end, + + State = #{ + tx_db => undefined, + db_seq => undefined, + view_seq => undefined, + last_seq => undefined, + count => 0, + limit => num_changes(), + doc_acc => [], + design_opts => Mrst#mrst.design_opts + }, + + update(Db, Mrst, State). + + +update(#{} = Db, Mrst0, State0) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + % In the first iteration of update we need + % to populate our db and view sequences + State1 = case State0 of + #{db_seq := undefined} -> + State0#{ + tx_db := TxDb, + db_seq := fabric2_db:get_update_seq(TxDb), + view_seq := couch_views_fdb:get_update_seq(TxDb, Mrst) + }; + _ -> + State0#{ + tx_db := TxDb + } + end, + + {ok, State2} = fold_changes(State1), + + #{ + count := Count, + limit := Limit, + doc_acc := DocAcc, + last_seq := LastSeq + } = State2, + + {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc), + write_docs(Db, Mrst1, MappedResults, State2), + + case Count < Limit of + true -> + report_progress(State2, finished); + false -> + report_progress(State2, update), + State3 = maps:merge(FinalState, #{ + count => 0, + doc_acc => [], + db_seq => LastSeq, + last_seq => 0, + mrst => Mrst1 + }), + + end). update_int(#{} = Db, State) -> - {ok, FinalState} = fabric2_fdb:transactional(Db, fun(TxDb) -> - State1 = maps:put(tx_db, TxDb, State), - fold_changes(State1) - end), + #{ count := Count, @@ -73,8 +124,8 @@ update_int(#{} = Db, State) -> mrst := Mrst } = FinalState, - {MappedResults, Mrst1} = map_docs(Mrst, DocAcc), - write_docs(Db, Mrst1, MappedResults, FinalState), + {MappedDocs, Mrst1} = map_docs(Mrst, DocAcc), + write_docs(Db, Mrst1, MappedDocs, FinalState), case Count < Limit of true -> @@ -94,13 +145,13 @@ update_int(#{} = Db, State) -> fold_changes(State) -> #{ - since_seq := SinceSeq, + view_seq := SinceSeq, limit := Limit, tx_db := TxDb } = State, - fabric2_db:fold_changes(TxDb, SinceSeq, - fun process_changes/2, State, [{limit, Limit}]). + Fun = fun process_changes/2, + fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]). process_changes(Change, Acc) -> @@ -108,7 +159,7 @@ process_changes(Change, Acc) -> doc_acc := DocAcc, count := Count, tx_db := TxDb, - mrst := Mrst + design_opts := DesignOpts } = Acc, #{ @@ -117,8 +168,7 @@ process_changes(Change, Acc) -> deleted := Deleted } = Change, - IncludeDesign = lists:keymember(<<"include_design">>, 1, - Mrst#mrst.design_opts), + IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts), Acc1 = case {Id, IncludeDesign} of {<<"_design/", _/binary>>, false} -> @@ -126,16 +176,13 @@ process_changes(Change, Acc) -> maps:merge(Acc, #{ count => Count + 1, last_seq => LastSeq - }); + }); _ -> - % Making a note here that we should make fetching all the docs % a parallel fdb operation - Doc = if Deleted -> []; true -> - case fabric2_db:open_doc(TxDb, Id) of - {ok, Doc0} -> Doc0; - {not_found, _} -> [] - end + {ok, Doc} = case Deleted of + true -> {ok, []}; + false -> fabric2_db:open_doc(TxDb, Id) end, Change1 = maps:put(doc, Doc, Change), @@ -150,113 +197,60 @@ process_changes(Change, Acc) -> map_docs(Mrst, Docs) -> % Run all the non deleted docs through the view engine and - Mrst1 = get_query_server(Mrst), + Mrst1 = start_query_server(Mrst), QServer = Mrst1#mrst.qserver, - MapFun = fun (#{deleted := true} = Change) -> maps:put(results, [], Change); - (Change) -> #{doc := Doc} = Change, couch_stats:increment_counter([couchdb, mrview, map_doc]), {ok, RawResults} = couch_query_servers:map_doc_raw(QServer, Doc), JsonResults = couch_query_servers:raw_to_ejson(RawResults), - ListResults = [[list_to_tuple(Res) || Res <- FunRs] - || FunRs <- JsonResults], + ListResults = lists:map(fun(ViewResults) -> + [list_to_tuple(Res) || Res <- ViewResults] + end, JsonResults), maps:put(results, ListResults, Change) end, - MappedResults = lists:map(MapFun, Docs), - {MappedResults, Mrst1}. + {Mrst1, lists:map(MapFun, Docs)}. -start_query_server(#mrst{} = Mrst) -> +write_docs(TxDb, Mrst, Docs, State) -> #mrst{ - language=Language, - lib=Lib, - views=Views + views = Views, + sig = Sig } = Mrst, - Defs = [View#mrview.def || View <- Views], - {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib), - Mrst#mrst{qserver=QServer}. + #{ + last_seq := LastSeq + } = State, -get_query_server(#mrst{} = Mrst) -> - case Mrst#mrst.qserver of - nil -> start_query_server(Mrst); - _ -> Mrst - end. + ViewIds = [View#mrview.id_num || View <- Views], + + lists:foreach(fun(Doc) -> + couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc) + end, Docs), + couch_views_fdb:update_view_seq(TxDb, Sig, LastSeq). -write_docs(Db, Mrst, Docs, State) -> + +start_query_server(#mrst{} = Mrst) -> #mrst{ - views = Views, - sig = Sig + language = Language, + lib = Lib, + views = Views } = Mrst, + Defs = [View#mrview.def || View <- Views], + {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib), + Mrst#mrst{qserver = QServer}. - #{ - callback := Cb, - callback_args := CallbackArgs - } = State, - IdxNames = lists:map(fun (View) -> - View#mrview.id_num - end, Views), - - lists:foreach(fun (Doc) -> - #{sequence := Seq} = Doc, - fabric2_fdb:transactional(Db, fun(TxDb) -> - couch_views_fdb:update_view_seq(TxDb, Sig, Seq), - Cb(TxDb, update, CallbackArgs, Db, Mrst, Seq), - write_doc(TxDb, Sig, Doc, IdxNames) - end) - end, Docs). - - -write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) -> - #{id := DocId} = Doc, - lists:foreach(fun (IdxName) -> - maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName) - end, ViewIds); - -write_doc(TxDb, Sig, Doc, ViewIds) -> - #{id := DocId, results := Results} = Doc, - lists:foreach(fun - ({IdxName, []}) -> - maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName); - ({IdxName, IdxResults}) -> - lists:foldl(fun (IdxResult, DocIdsCleared) -> - {IdxKey, _} = IdxResult, - OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig, - DocId, IdxName), - IsAlreadyCleared = lists:member(DocId, DocIdsCleared), - case OldIdxKey == not_found orelse IsAlreadyCleared == true of - true -> - couch_views_fdb:set_id_index(TxDb, Sig, IdxName, - DocId, IdxKey), - couch_views_fdb:set_map_index_results(TxDb, Sig, - IdxName, DocId, IdxResults); - false -> - couch_views_fdb:clear_id_index(TxDb, Sig, - DocId, IdxName), - couch_views_fdb:clear_map_index(TxDb, Sig, IdxName, - DocId, OldIdxKey), - couch_views_fdb:set_id_index(TxDb, Sig, DocId, - IdxName, IdxKey), - couch_views_fdb:set_map_index_results(TxDb, Sig, - IdxName, DocId, IdxResults) - end, - [DocId | DocIdsCleared] - end, [], IdxResults) - end, lists:zip(ViewIds, Results)). - - -maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName) -> - OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig, - DocId, IdxName), - if OldIdxKey == not_found -> ok; true -> - couch_views_fdb:clear_id_index(TxDb, Sig, - DocId, IdxName), - couch_views_fdb:clear_map_index(TxDb, Sig, IdxName, - DocId, OldIdxKey) + +report_progress(State, UpdateType) -> + case UpdateType of + update -> + couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq); + finished -> + couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq) end. + diff --git a/src/couch_views/src/couch_views_worker.erl b/src/couch_views/src/couch_views_worker.erl deleted file mode 100644 index fa641d5..0000000 --- a/src/couch_views/src/couch_views_worker.erl +++ /dev/null @@ -1,44 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_views_worker). - --export([ - start/2, - job_progress/6 -]). - - -start(Job, JobData) -> - {ok, Db, Mrst} = get_indexing_info(JobData), - % maybe we should spawn here - couch_views_indexer:update(Db, Mrst, fun job_progress/6, Job). - - -job_progress(Tx, Progress, Job, Db, Mrst, LastSeq) -> - case Progress of - update -> - couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq); - finished -> - couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq) - end. - - -get_indexing_info(JobData) -> - #{ - <<"db_name">> := DbName, - <<"ddoc_id">> := DDocId - } = JobData, - {ok, Db} = fabric2_db:open(DbName, []), - {ok, DDoc} = fabric2_db:open_doc(Db, DDocId), - {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), - {ok, Db, Mrst}.
