This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b16b5b25304281aa9529d663c860327ff4e168f2 Author: Paul J. Davis <[email protected]> AuthorDate: Wed Jul 17 10:23:59 2019 -0500 Move jobs logic to couch_view_jobs Now that the couch_jobs API is full baked we can remove the thin wrapper API in couch_views_jobs and just use couch_jobs directly. --- src/couch_views/src/couch_views.erl | 80 ++++--------------- src/couch_views/src/couch_views_jobs.erl | 131 +++++++++++++------------------ 2 files changed, 68 insertions(+), 143 deletions(-) diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl index c059204..65af1bf 100644 --- a/src/couch_views/src/couch_views.erl +++ b/src/couch_views/src/couch_views.erl @@ -44,86 +44,36 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> end, Args = mrargs_to_map(QueryArgs2), - - maybe_build_view(Db, MrSt, Args), + ok = maybe_update_view(Db, Mrst, Args), try couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args) after UpdateAfter = maps:get(update, Args) == lazy, if UpdateAfter == false -> ok; true -> - maybe_add_couch_job(Db, Mrst) + couch_views_jobs:build_view_async(Db, Mrst) end end. -maybe_build_index(_Db, _Mrst, #{update := false}) -> - false; +maybe_update_view(_Db, _Mrst, #{update := false}) -> + ok; -maybe_build_index(_Db, _Mrst, #{update := lazy}) -> - false; +maybe_update_view(_Db, _Mrst, #{update := lazy}) -> + ok; -maybe_build_index(Db, Mrst, _Args) -> - {Status, Seq} = fabric2_fdb:transactional(Db, fun(TxDb) -> - case view_up_to_date(TxDb, Mrst) of - {true, UpdateSeq} -> - {ready, UpdateSeq}; - {false, LatestSeq} -> - maybe_add_couch_job(TxDb, Mrst), - {false, LatestSeq} +maybe_update_view(Db, Mrst, _Args) -> + WaitSeq = fabric2_fdb:transactional(Db, fun(TxDb) -> + DbSeq = fabric2_db:get_update_seq(TxDb), + ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst), + case DbSeq == ViewSeq of + true -> ready; + false -> DbSeq end end), - if Status == ready -> true; true -> - subscribe_and_wait_for_index(Db, Mrst, Seq) - end. - - -view_up_to_date(Db, Mrst) -> - fabric2_fdb:transactional(Db, fun(TxDb) -> - UpdateSeq = couch_views_fdb:get_update_seq(TxDb, Mrst), - LastChange = fabric2_fdb:get_last_change(TxDb), - {UpdateSeq == LastChange, LastChange} - end). - - -maybe_add_couch_job(TxDb, Mrst) -> - case couch_views_jobs:status(TxDb, Mrst) of - running -> - ok; - pending -> - ok; - Status when Status == finished orelse Status == not_found -> - couch_views_jobs:add(TxDb, Mrst) - end. - - -subscribe_and_wait_for_index(Db, Mrst, Seq) -> - case couch_views_jobs:subscribe(Db, Mrst) of - {error, Error} -> - throw({error, Error}); - {ok, finished, _} -> - ready; - {ok, Subscription, _JobState, _} -> - wait_for_index_ready(Subscription, Db, Mrst, Seq) - end. - - -wait_for_index_ready(Subscription, Db, Mrst, Seq) -> - Out = couch_views_jobs:wait(Subscription), - case Out of - {finished, _JobData} -> - ready; - {pending, _JobData} -> - wait_for_index_ready(Subscription, Db, Mrst, Seq); - {running, #{last_seq := LastSeq}} -> - if LastSeq =< Seq -> ready; true -> - wait_for_index_ready(Subscription, Db, Mrst, Seq) - end; - {running, _JobData} -> - wait_for_index_ready(Subscription, Db, Mrst, Seq); - {error, Error} -> - throw({error, Error}) + if WaitSeq == ready -> ok; true -> + couch_views_jobs:build_view(Db, Mrst, DbSeq) end. diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl index 31ab728..d9c5157 100644 --- a/src/couch_views/src/couch_views_jobs.erl +++ b/src/couch_views/src/couch_views_jobs.erl @@ -13,110 +13,85 @@ -module(couch_views_jobs). -export([ - status/2, - add/2, - - accept/0, - get_job_data/1, - update/5, - finish/5, set_timeout/0, - - subscribe/2, - wait/1, - unsubscribe/1, - - create_job_id/2 + build_view/3, + build_view_async/2 ]). -include("couch_views.hrl"). -% Query request usage of jobs - - -status(TxDb, Mrst) -> - JobId = create_job_id(TxDb, Mrst), - - case couch_jobs:get_job_state(TxDb, ?INDEX_JOB_TYPE, JobId) of - {ok, State} -> State; - {error, not_found} -> not_found; - Error -> Error - end. - - -add(TxDb, Mrst) -> - JobData = create_job_data(TxDb, Mrst, 0), - - JobId = create_job_id(TxDb, Mrst), - JTx = couch_jobs_fdb:get_jtx(TxDb), - couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData). - - -% couch_views_worker api - - -accept() -> - couch_jobs:accept(?INDEX_JOB_TYPE). - - -get_job_data(JobId) -> - couch_jobs:get_job_data(undefined, ?INDEX_JOB_TYPE, JobId). - - -update(JTx, Job, Db, Mrst, LastSeq) -> - JobData = create_job_data(Db, Mrst, LastSeq), - couch_jobs:update(JTx, Job, JobData). - - -finish(JTx, Job, Db, Mrst, LastSeq) -> - JobData = create_job_data(Db, Mrst, LastSeq), - couch_jobs:finish(JTx, Job, JobData). - - set_timeout() -> couch_jobs:set_type_timeout(?INDEX_JOB_TYPE, 6 * 1000). -% Watcher Job api - - -subscribe(Db, Mrst) -> - JobId = create_job_id(Db, Mrst), - couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId). - - -wait(JobSubscription) -> - case couch_jobs:wait(JobSubscription, infinity) of - {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData}; - {timeout} -> {error, timeout} +build_view(Db, Mrst, UpdateSeq) -> + {ok, JobId} = build_view_async(Db, Mrst), + case wait_for_job(JobId, UpdateSeq) of + ok -> ok; + retry -> build_view(Db, Mrst, UpdateSeq) end. -unsubscribe(JobSubscription) -> - couch_jobs:unsubscribe(JobSubscription). +build_view_async(Db, Mrst) -> + JobId = create_job_id(TxDb, Mrst), + JobData = create_job_data(TxDb, Mrst), + ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData). + {ok, JobId}. + + + +wait_for_job(JobId, UpdateSeq) -> + case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of + {ok, Subscription, _State, _Data} -> + wait_for_job(JobId, Subscription, UpdateSeq) + {ok, finished, Data} -> + case Data of + #{view_seq := ViewSeq} when ViewSeq >= UpdateSeq -> + ok; + _ -> + retry + end + end. -% Internal +wait_for_job(JobId, Subscription, UpdateSeq) -> + case wait(Subscription, infinity) of + {error, Error} -> + erlang:error(Error); + {finished, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq -> + ok; + {finished, _} -> + wait_for_job(JobId, UpdateSeq); + {_State, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq -> + couch_jobs:unsubscribe(Subscription), + ok; + {_, _} -> + wait_for_job(JobId, Subscription, UpdateSeq) + end. -create_job_id(#{name := DbName}, #mrst{sig = Sig}) -> +get_id(#{name := DbName}, #mrst{sig = Sig}) -> create_job_id(DbName, Sig); -create_job_id(DbName, Sig) -> +get_id(DbName, Sig) -> <<DbName/binary, Sig/binary>>. -create_job_data(Db, Mrst, LastSeq) -> - #{name := DbName} = Db, - +create_job_data(Db, Mrst) -> #mrst{ idx_name = DDocId } = Mrst, #{ - db_name => DbName, - ddoc_id => DDocId, - last_seq => LastSeq + db_name => fabric2_db:name(Db), + ddoc_id => DDocId }. + + +wait(Subscription) -> + case couch_jobs:wait(JobSubscription, infinity) of + {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData}; + timeout -> {error, timeout} + end.
