This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5d6068e004f1976191c9327a97d9b33c452e64f3 Author: Paul J. Davis <[email protected]> AuthorDate: Thu Jul 18 13:58:57 2019 -0500 Move fdb logic to couch_views_fdb --- src/couch_views/src/couch_views.erl | 45 +----- src/couch_views/src/couch_views_fdb.erl | 147 +++++++++++++++-- src/couch_views/src/couch_views_reader.erl | 250 ++++++++++++----------------- 3 files changed, 245 insertions(+), 197 deletions(-) diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl index 65af1bf..7deb54d 100644 --- a/src/couch_views/src/couch_views.erl +++ b/src/couch_views/src/couch_views.erl @@ -43,23 +43,22 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> false -> ok end, - Args = mrargs_to_map(QueryArgs2), - ok = maybe_update_view(Db, Mrst, Args), + ok = maybe_update_view(Db, Mrst, QueryArgs2), try - couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args) + couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, QueryArgs2) after - UpdateAfter = maps:get(update, Args) == lazy, + UpdateAfter = QueryArgs2#mrargs.update == lazy, if UpdateAfter == false -> ok; true -> couch_views_jobs:build_view_async(Db, Mrst) end end. -maybe_update_view(_Db, _Mrst, #{update := false}) -> +maybe_update_view(_Db, _Mrst, #mrargs{update = false}) -> ok; -maybe_update_view(_Db, _Mrst, #{update := lazy}) -> +maybe_update_view(_Db, _Mrst, #mrargs{update = laze}) -> ok; maybe_update_view(Db, Mrst, _Args) -> @@ -83,39 +82,5 @@ is_reduce_view({Reduce, _, _}) -> Reduce =:= red. -mrargs_to_map(#mrargs{} = Args) -> - process_args(#{ - start_key => Args#mrargs.start_key, - start_key_docid => Args#mrargs.start_key_docid, - end_key => Args#mrargs.end_key, - end_key_docid => Args#mrargs.end_key_docid, - keys => Args#mrargs.keys, - direction => Args#mrargs.direction, - limit => Args#mrargs.limit, - skip => Args#mrargs.skip, - update => Args#mrargs.update, - multi_get => Args#mrargs.multi_get, - inclusive_end => Args#mrargs.inclusive_end, - include_docs => Args#mrargs.include_docs, - doc_options => Args#mrargs.doc_options, - update_seq => Args#mrargs.update_seq, - conflicts => Args#mrargs.conflicts, - sorted => Args#mrargs.sorted - }). - - -process_args(#{} = Args) -> - Args1 = remove_ununsed_values(Args), - Defaults = #{ - direction => fwd, - inclusive_end => true, - update => true, - skip => 0, - limit => ?MAX_VIEW_LIMIT - }, - - maps:merge(Defaults, Args1). - - remove_ununsed_values(Args) -> maps:filter(fun (_, V) -> V /= undefined end, Args). diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl index f47f1b1..57ed5f1 100644 --- a/src/couch_views/src/couch_views_fdb.erl +++ b/src/couch_views/src/couch_views_fdb.erl @@ -16,7 +16,9 @@ get_update_seq/2, set_update_seq/3, - write_rows/4 + fold_map_idx/5, + + write_doc/4 ]). @@ -54,18 +56,54 @@ set_view_seq(TxDb, Sig, Seq) -> ok = erlfdb:set(Tx, SeqKey, Seq). +fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + + MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId), + {Fun, Acc} = case fabric2_util:get_value(dir, Options, fwd) of + fwd -> + FwdAcc = #{ + prefix => MapIdxPrefix, + next => key, + key => undefined, + sort_key => undefined, + docid => undefined, + dupe_id => undefined, + callback => Callback, + acc => Acc0, + }, + {fun fold_fwd/2, FwdAcc} + rev -> + RevAcc #{ + prefix => MapIdxPrefix, + next => value, + value => undefined, + sort_key => undefined, + docid => undefined, + dupe_id => undefined, + callback => Callback, + acc => Acc0 + }, + {fun fold_rev/2, RevAcc} + end, + + fabric2_db:fold_range(TxDb, MapIdxPrefix, Fun, Acc, Options). + + write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) -> #{ id := DocId } = Doc, - ViewKeys = get_view_keys(TxDb, Sig, DocId), + ExistingViewKeys = get_view_keys(TxDb, Sig, DocId), - clear_id_idx(TxDb, Sig, DocId), + sclear_id_idx(TxDb, Sig, DocId), lists:foreach(fun({ViewId, ViewKeys}) -> clear_map_idx(TxDb, Sig, ViewId, ViewKeys) - end, ViewKeys). - + end, ExistingViewKeys); write_doc(TxDb, Sig, Doc, ViewIds) -> #{ @@ -79,7 +117,7 @@ write_doc(TxDb, Sig, Doc, ViewIds) -> ExistingViewKeys = get_view_keys(TxDb, Sig, DocId), - ok = clear_id_idx(TxDb, Sig, DocId), + clear_id_idx(TxDb, Sig, DocId), lists:foreach(fun({ViewId, NewRows}) -> ExistingKeys = fabric2_util:get_value(ViewId, ExistingViewKeys, []), @@ -88,6 +126,95 @@ write_doc(TxDb, Sig, Doc, ViewIds) -> end, lists:zip(ViewIds, Results)). +fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) -> + #{ + prefix := Prefix + } = Acc, + + {{SortKey, DocId}, _DupeId, ?VIEW_ROW_KEY} = + erlfdb_tuple:unpack(RowKey, Prefix), + Acc#{ + next := val, + key := couch_views_encoding:decode(EncodedOriginalKey), + sort_key := SortKey, + docid := DocId, + dupe_id := DupeId + }; + +fold_fwd({RowKey, EncodedValue}, #{next := val} = Acc) -> + #{ + prefix := Prefix, + key := Key, + sort_key := SortKey, + docid := DocId, + dupe_id := DocId, + callback := UserCallback + acc := UserAcc0 + } = Acc, + + % We're asserting there that this row is paired + % correctly with the previous row by relying on + % a badmatch if any of these values don't match. + {{SortKey, DocId}, DupeId, ?VIEW_ROW_VAL} = + erlfdb_tuple:unpack(RowKey, Prefix), + + Value = couch_views_encoding:decode(EncodedValue), + NewAcc = UserCallback(DocId, Key, Value, UserAcc0), + + #{ + next := key, + key := undefined, + sort_key := undefined, + docid := undefined, + dupe_id := undefined, + acc := UserAcc1 + }. + + +fold_rev({RowKey, EncodedValue}, #{next := value} = Acc) -> + #{ + prefix := Prefix + } = Acc, + + {{SortKey, DocId}, _DupeId, ?VIEW_ROW_VAL} = + erlfdb_tuple:unpack(RowKey, Prefix), + Acc#{ + next := key, + value := couch_views_encoding:decode(EncodedValue), + sort_key := SortKey, + docid := DocId, + dupe_id := DupeId + }; + +fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) -> + #{ + prefix := Prefix, + value := Value, + sort_key := SortKey, + docid := DocId, + dupe_id := DocId, + callback := UserCallback + acc := UserAcc0 + } = Acc, + + % We're asserting there that this row is paired + % correctly with the previous row by relying on + % a badmatch if any of these values don't match. + {{SortKey, DocId}, DupeId, ?VIEW_ROW_KEY} = + erlfdb_tuple:unpack(RowKey, Prefix), + + Key = couch_views_encoding:decode(EncodedOriginalKey), + NewAcc = UserCallback(DocId, Key, Value, UserAcc0), + + #{ + next := val, + value := undefined, + sort_key := undefined, + docid := undefined, + dupe_id := undefined, + acc := UserAcc1 + }. + clear_id_idx(TxDb, Sig, DocId) -> #{ tx := Tx, @@ -141,10 +268,10 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) -> MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId), lists:foreach(fun({DupeId, Key1, Key2, Val}) -> - KeyKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_KEY), - ValKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_VAL), - ok = erlfdn:store(Tx, KeyKey, Key2), - ok = erlfdb:store(Tx, ValKey, Val) + KK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_KEY), + VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VAL), + ok = erlfdn:store(Tx, KK, Key2), + ok = erlfdb:store(Tx, VK, Val) end, KVsToAdd). diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl index f4e768a..56b23f2 100644 --- a/src/couch_views/src/couch_views_reader.erl +++ b/src/couch_views/src/couch_views_reader.erl @@ -22,174 +22,130 @@ -include_lib("fabric/src/fabric2.hrl"). -read(Db, DDoc, ViewName, Callback, Acc0, Args) -> - #{name := DbName} = Db, - - {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), +read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) -> #mrst{ sig = Sig, views = Views } = Mrst, - IdxName = get_idx_name(ViewName, Views), - State0 = #{ - acc => Acc0, - skip => maps:get(skip, Args, 0), - include_docs => maps:get(include_docs, Args, false), - db => Db - }, + ViewId = get_view_id(ViewName, Views), + Opts = mrargs_to_fdb_options(Args), + Fun = fun handle_row/4, + + try + % Need to add total_rows support + Meta = {meta, [{total_rows, null}, {offset, null}]}, + UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)), + + fabric2_fdb:transactional(Db, fun(TxDb) -> + Acc0 = #{ + db => TxDb + skip => Args#mrargs.skip, + mrargs => Args, + callback => UserCallback, + acc => UserAcc1 + }, + + Acc1 = couch_views_fdb:fold_map_idx( + TxDb, + Sig, + ViewId, + Opts, + Fun, + Acc0 + ), + + #{ + acc := UserAcc2 + } = Acc1, + + maybe_stop(Callback(complete, UserAcc2) + end) + catch throw:{done, Out} -> + {ok, Out} + end. - DefaultOpts = [{streaming_mode, want_all}], - {Start, End, QueryOpts} = convert_args_to_fdb(Db, Sig, IdxName, Args, - DefaultOpts), - Opts = QueryOpts ++ DefaultOpts, - fabric2_fdb:transactional(Db, fun(TxDb) -> - Future = couch_views_fdb:get_map_range(TxDb, Start, End, Opts), +handle_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 -> + {ok, Acc#{skip := Skip - 1}}; - UnPack = get_unpack_fun(TxDb, Opts, Callback), - State1 = lists:foldl(UnPack, State0, erlfdb:wait(Future)), +handle_row(DocId, Key, Value, Acc) -> + #{ + db := TxDb + mrargs := Args, + callback := UserCallback, + acc := UserAcc + } = Acc, + + BaseRow = [ + {id, DocId}, + {key, Key}, + {value, Value} + ], + + Row = BaseRow ++ if not IncludeDocs -> []; true -> + DocOpts0 = Args#mrargs.doc_options, + DocOpts1 = OpenOpts0 ++ case Args#mrargs.conflicts of + true -> [conflicts]; + false -> [] + end, + DocObj = case fabric2_db:open_doc(Db, DocId, DocOpts1) of + {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts1); + {not_found, _} -> null + end, + [{doc, DocObj}] + end, - #{acc := Acc1} = State1, - Callback(complete, Acc1) - end). + UserAcc1 = maybe_stop(Callback({row, Row1}, UserAcc0)), + Acc#{acc := UserAcc1} -get_idx_name(ViewName, Views) -> - {value, View} = lists:search(fun (View) -> +get_view_id(ViewName, Views) -> + {value, View} = lists:search(fun(View) -> lists:member(ViewName, View#mrview.map_names) end, Views), View#mrview.id_num. -convert_args_to_fdb(Db, Sig, IdxName, Args, Opts) -> - #{ - direction := Direction - } = Args, - - {Start1, End1} = get_range_keys(Db, Sig, IdxName, Args), - - Opts1 = case maps:is_key(limit, Args) of - false -> - Opts; - true -> - Skip = maps:get(skip, Args, 0), - Limit = maps:get(limit, Args), - % Limit is multiplied by two because there are two rows per key - % value. - % Skip is added because that is done in the fold so we need - % to fetch the number of documents - % along with the docs we would skip. - % Limit = (Doc limit + Skip) * Num of Rows per Map KV - [{limit, (Limit + Skip) * 2} | Opts] - end, - - Opts2 = case Direction of - fwd -> - Opts1; - rev -> - [{reverse, true} | Opts1] - end, - {Start1, End1, Opts2}. - - -get_range_keys(Db, Sig, IdxName, Args) -> - #{ - inclusive_end := InclusiveEnd, - direction := Direction +mrargs_to_fdb_options(Args) -> + #mrargs{ + start_key = StartKey, + start_key_docid = StartKeyDocId, + end_key = EndKey, + end_key_docid = EndKeyDocId, + direction = Direction, + limit = Limit, + inclusive_end = InclusiveEnd } = Args, - {MapStartKey, MapEndKey} = case Direction of - fwd -> {start_key, end_key}; - rev -> {end_key, start_key} + StartKeyOpts = case {StartKey, StartKeyDocId} of + {undefined, _} -> + []; + {StartKey, undefined} -> + [{start_key, {StartKey}}]; + {_, _} -> + [{start_key, {StartKey, StartKeyDocId}}] end, - {Start0, End0} = couch_views_fdb:get_map_range_keys(Db, Sig, IdxName), - - Start1 = case maps:is_key(MapStartKey, Args) of - false -> - Start0; - true -> - StartKey = maps:get(MapStartKey, Args), - Start = couch_views_fdb:get_map_index_key(Db, Sig, IdxName, - StartKey), - erlfdb_key:first_greater_or_equal(Start) - end, - - End1 = case maps:is_key(MapEndKey, Args) of - false -> - End0; - true -> - EndKey = maps:get(MapEndKey, Args), - EndBin = couch_views_fdb:get_map_index_key(Db, Sig, IdxName, - EndKey), - EndBin1 = case InclusiveEnd of - true -> <<EndBin/binary, 16#FF>>; - false -> EndBin - end, - erlfdb_key:first_greater_than(EndBin1) - end, - {Start1, End1}. - - -get_unpack_fun(TxDb, Opts, Callback) -> - UnPackFwd = fun({K, V}, State) -> - case couch_views_fdb:unpack_map_row(TxDb, K, V) of - {key, _Id, RowKey} -> - State#{current_key => RowKey}; - {value, Id, RowValue} -> - #{ - current_key := RowKey - } = State, - process_map_row(Id, RowKey, RowValue, State, Callback) - end + EndKeyOpts = case {EndKey, EndKeyDocId} of + {undefined, _} -> + []; + {EndKey, undefined} when InclusiveEnd -> + [{end_key, {EndKey}}]; + {EndKey, undefined} -> + [{end_key_gt, {EndKey}}]; + {EndKey, EndKeyDocId} when InclusiveEnd -> + [{end_key, {EndKey, EndKeyDocId}}]; + {EndKey, EndKeyDocId} -> + [{end_key_gt, {EndKey, EndKeyDocId}}] end, - UnPackRev = fun({K, V}, State) -> - case couch_views_fdb:unpack_map_row(TxDb, K, V) of - {key, Id, RowKey} -> - #{ - current_value := RowValue - } = State, - process_map_row(Id, RowKey, RowValue, State, Callback); - {value, _Id, RowValue} -> - State#{current_value => RowValue} - end - end, - - case lists:keyfind(reverse, 1, Opts) of - {reverse, true} -> UnPackRev; - _ -> UnPackFwd - end. + [ + {dir, Direction}, + {limit, Limit * 2}, + {streaming_mode, want_all} + ] ++ StartKeyOpts ++ EndKeyOpts. -process_map_row(Id, RowKey, RowValue, State, Callback) -> - #{ - acc := Acc, - skip := Skip, - db := Db - } = State, - - case Skip > 0 of - true -> - State#{skip := Skip -1}; - false -> - Row = [{id, Id}, {key, RowKey}, {value, RowValue}], - - IncludeDoc = maps:get(include_docs, State, false), - Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc), - - {ok, AccNext} = Callback({row, Row1}, Acc), - State#{acc := AccNext} - end. - - -maybe_include_doc(_Db, _Id, Row, false) -> - Row; - -maybe_include_doc(Db, Id, Row, true) -> - Doc1 = case fabric2_db:open_doc(Db, Id) of - {ok, Doc} -> couch_doc:to_json_obj(Doc, []); - {not_found, _} -> [] - end, - Row ++ [{doc, Doc1}]. +maybe_stop({ok, Acc}) -> Acc; +maybe_stop({stop, Acc}) -> throw({done, Acc}).
