Remove obsolete fabric_rpc2 module
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/fcc1592a Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/fcc1592a Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/fcc1592a Branch: refs/heads/import Commit: fcc1592aa58d25f8e216221a9094549c4f935757 Parents: 9f09897 Author: Adam Kocoloski <a...@cloudant.com> Authored: Tue Feb 26 14:05:38 2013 -0500 Committer: Adam Kocoloski <a...@cloudant.com> Committed: Tue Feb 26 14:05:38 2013 -0500 ---------------------------------------------------------------------- src/fabric_rpc2.erl | 499 ----------------------------------------------- 1 file changed, 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/fcc1592a/src/fabric_rpc2.erl ---------------------------------------------------------------------- diff --git a/src/fabric_rpc2.erl b/src/fabric_rpc2.erl deleted file mode 100644 index 1099da0..0000000 --- a/src/fabric_rpc2.erl +++ /dev/null @@ -1,499 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_rpc2). - --export([get_db_info/1, get_doc_count/1, get_update_seq/1]). --export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3, - update_docs/3]). --export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). --export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3, - set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]). --export([get_all_security/2]). - --include("fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record (view_acc, { - db, - limit, - include_docs, - conflicts, - doc_info = nil, - offset = nil, - total_rows, - reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, - group_level = 0 -}). - -%% rpc endpoints -%% call to with_db will supply your M:F with a #db{} and then remaining args - -all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> - {ok, Db} = get_or_create_db(DbName, []), - #view_query_args{ - start_key = StartKey, - start_docid = StartDocId, - end_key = EndKey, - end_docid = EndDocId, - limit = Limit, - skip = Skip, - include_docs = IncludeDocs, - direction = Dir, - inclusive_end = Inclusive, - extra = Extra - } = QueryArgs, - set_io_priority(DbName, Extra), - {ok, Total} = couch_db:get_doc_count(Db), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - conflicts = proplists:get_value(conflicts, Extra, false), - limit = Limit+Skip, - total_rows = Total - }, - EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, - Options = [ - {dir, Dir}, - {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, - {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} - ], - {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), - final_response(Total, Acc#view_acc.offset). - -changes(DbName, #changes_args{} = Args, StartSeq) -> - changes(DbName, [Args], StartSeq); -changes(DbName, Options, StartSeq) -> - erlang:put(io_priority, {interactive, DbName}), - #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options), - case get_or_create_db(DbName, []) of - {ok, Db} -> - Enum = fun changes_enumerator/2, - Opts = [{dir,Dir}], - Acc0 = {Db, StartSeq, Args, Options}, - try - {ok, {_, LastSeq, _, _}} = - couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0), - rexi:reply({complete, LastSeq}) - after - couch_db:close(Db) - end; - Error -> - rexi:reply(Error) - end. - -map_view(DbName, DDoc, ViewName, QueryArgs) -> - {ok, Db} = get_or_create_db(DbName, []), - #view_query_args{ - limit = Limit, - skip = Skip, - keys = Keys, - include_docs = IncludeDocs, - stale = Stale, - view_type = ViewType, - extra = Extra - } = QueryArgs, - set_io_priority(DbName, Extra), - {LastSeq, MinSeq} = calculate_seqs(Db, Stale), - Group0 = couch_view_group:design_doc_to_view_group(DDoc), - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - maybe_update_view_group(Pid, LastSeq, Stale), - erlang:monitor(process, Group#group.fd), - View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), - {ok, Total} = couch_view:get_row_count(View), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - conflicts = proplists:get_value(conflicts, Extra, false), - limit = Limit+Skip, - total_rows = Total, - reduce_fun = fun couch_view:reduce_to_count/1 - }, - case Keys of - nil -> - Options = couch_httpd_view:make_key_options(QueryArgs), - {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); - _ -> - Acc = lists:foldl(fun(Key, AccIn) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options = couch_httpd_view:make_key_options(KeyArgs), - {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, - Options), - Out - end, Acc0, Keys) - end, - final_response(Total, Acc#view_acc.offset). - -reduce_view(DbName, Group0, ViewName, QueryArgs) -> - erlang:put(io_priority, {interactive, DbName}), - {ok, Db} = get_or_create_db(DbName, []), - #view_query_args{ - group_level = GroupLevel, - limit = Limit, - skip = Skip, - keys = Keys, - stale = Stale, - extra = Extra - } = QueryArgs, - set_io_priority(DbName, Extra), - GroupFun = group_rows_fun(GroupLevel), - {LastSeq, MinSeq} = calculate_seqs(Db, Stale), - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - maybe_update_view_group(Pid, LastSeq, Stale), - #group{views=Views, def_lang=Lang, fd=Fd} = Group, - erlang:monitor(process, Fd), - {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), - ReduceView = {reduce, NthRed, Lang, View}, - Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, - case Keys of - nil -> - Options0 = couch_httpd_view:make_key_options(QueryArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); - _ -> - lists:map(fun(Key) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options0 = couch_httpd_view:make_key_options(KeyArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) - end, Keys) - end, - rexi:reply(complete). - -calculate_seqs(Db, Stale) -> - LastSeq = couch_db:get_update_seq(Db), - if - Stale == ok orelse Stale == update_after -> - {LastSeq, 0}; - true -> - {LastSeq, LastSeq} - end. - -maybe_update_view_group(GroupPid, LastSeq, update_after) -> - couch_view_group:trigger_group_update(GroupPid, LastSeq); -maybe_update_view_group(_, _, _) -> - ok. - -create_db(DbName) -> - rexi:reply(case couch_server:create(DbName, []) of - {ok, _} -> - ok; - Error -> - Error - end). - -create_shard_db_doc(_, Doc) -> - rexi:reply(mem3_util:write_db_doc(Doc)). - -delete_db(DbName) -> - couch_server:delete(DbName, []). - -delete_shard_db_doc(_, DocId) -> - rexi:reply(mem3_util:delete_db_doc(DocId)). - -get_db_info(DbName) -> - with_db(DbName, [], {couch_db, get_db_info, []}). - -get_doc_count(DbName) -> - with_db(DbName, [], {couch_db, get_doc_count, []}). - -get_update_seq(DbName) -> - with_db(DbName, [], {couch_db, get_update_seq, []}). - -set_security(DbName, SecObj, Options) -> - with_db(DbName, Options, {couch_db, set_security, [SecObj]}). - -get_all_security(DbName, Options) -> - with_db(DbName, Options, {couch_db, get_security, []}). - -set_revs_limit(DbName, Limit, Options) -> - with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). - -open_doc(DbName, DocId, Options) -> - with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). - -open_revs(DbName, Id, Revs, Options) -> - with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). - -get_missing_revs(DbName, IdRevsList) -> - get_missing_revs(DbName, IdRevsList, []). - -get_missing_revs(DbName, IdRevsList, Options) -> - % reimplement here so we get [] for Ids with no missing revs in response - set_io_priority(DbName, Options), - rexi:reply(case get_or_create_db(DbName, Options) of - {ok, Db} -> - Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> - case FullDocInfoResult of - {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> - MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), - {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; - not_found -> - {Id, Revs, []} - end - end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; - Error -> - Error - end). - -update_docs(DbName, Docs0, Options) -> - case proplists:get_value(replicated_changes, Options) of - true -> - X = replicated_changes; - _ -> - X = interactive_edit - end, - Docs = make_att_readers(Docs0), - with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). - -group_info(DbName, Group0) -> - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - rexi:reply(couch_view_group:request_group_info(Pid)). - -reset_validation_funs(DbName) -> - case get_or_create_db(DbName, []) of - {ok, #db{main_pid = Pid}} -> - gen_server:cast(Pid, {load_validation_funs, undefined}); - _ -> - ok - end. - -%% -%% internal -%% - -with_db(DbName, Options, {M,F,A}) -> - set_io_priority(DbName, Options), - case get_or_create_db(DbName, Options) of - {ok, Db} -> - rexi:reply(try - apply(M, F, [Db | A]) - catch Exception -> - Exception; - error:Reason -> - twig:log(error, "rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason, - clean_stack()]), - {error, Reason} - end); - Error -> - rexi:reply(Error) - end. - -get_or_create_db(DbName, Options) -> - case couch_db:open_int(DbName, Options) of - {not_found, no_db_file} -> - twig:log(warn, "~p creating ~s", [?MODULE, DbName]), - couch_server:create(DbName, Options); - Else -> - Else - end. - -view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> - % matches for _all_docs and translates #full_doc_info{} -> KV pair - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI -> - Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, - view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI}); - #doc_info{revs=[#rev_info{deleted=true}|_]} -> - {ok, Acc} - end; -view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> - % calculates the offset for this shard - #view_acc{reduce_fun=Reduce} = Acc, - Offset = Reduce(OffsetReds), - case rexi:sync_reply({total_and_offset, Total, Offset}) of - ok -> - view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); - stop -> - exit(normal); - timeout -> - exit(timeout) - end; -view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> - % we scanned through limit+skip local rows - {stop, Acc}; -view_fold({{Key,Id}, Value}, _Offset, Acc) -> - % the normal case - #view_acc{ - db = Db, - doc_info = DocInfo, - limit = Limit, - conflicts = Conflicts, - include_docs = IncludeDocs - } = Acc, - case Value of {Props} -> - LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined); - _ -> - LinkedDocs = false - end, - if LinkedDocs -> - % we'll embed this at a higher level b/c the doc may be non-local - Doc = undefined; - IncludeDocs -> - IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end, - Options = if Conflicts -> [conflicts]; true -> [] end, - case couch_db:open_doc(Db, IdOrInfo, Options) of - {not_found, deleted} -> - Doc = null; - {not_found, missing} -> - Doc = undefined; - {ok, Doc0} -> - Doc = couch_doc:to_json_obj(Doc0, []) - end; - true -> - Doc = undefined - end, - case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - timeout -> - exit(timeout) - end. - -final_response(Total, nil) -> - case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> - rexi:reply(complete); - stop -> - ok; - timeout -> - exit(timeout) - end; -final_response(_Total, _Offset) -> - rexi:reply(complete). - -%% TODO: handle case of bogus group level -group_rows_fun(exact) -> - fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; -group_rows_fun(0) -> - fun(_A, _B) -> true end; -group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> - fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> - lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); - ({Key1,_}, {Key2,_}) -> - Key1 == Key2 - end. - -reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> - {stop, Acc}; -reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> - send(null, Red, Acc); -reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> - send(Key, Red, Acc); -reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> - send(lists:sublist(K, I), Red, Acc); -reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 -> - send(K, Red, Acc). - - -send(Key, Value, #view_acc{limit=Limit} = Acc) -> - case put(fabric_sent_first_row, true) of - undefined -> - case rexi:sync_reply(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - stop -> - exit(normal); - timeout -> - exit(timeout) - end; - true -> - case rexi:stream(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - timeout -> - exit(timeout) - end - end. - -changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) -> - #changes_args{ - include_docs = IncludeDocs, - filter = Acc - } = Args, - Conflicts = proplists:get_value(conflicts, Options, false), - #doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo, - case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of - [] -> - {ok, {Db, Seq, Args, Options}}; - Results -> - Opts = if Conflicts -> [conflicts]; true -> [] end, - ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts), - Go = rexi:sync_reply(ChangesRow), - {Go, {Db, Seq, Args, Options}} - end. - -changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) -> - Doc = doc_member(Db, DI, Opts), - #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del}; -changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) -> - #change{key=Seq, id=Id, value=Results, deleted=true}; -changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) -> - #change{key=Seq, id=Id, value=Results}. - -doc_member(Shard, DocInfo, Opts) -> - case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of - {ok, Doc} -> - couch_doc:to_json_obj(Doc, []); - Error -> - Error - end. - -possible_ancestors(_FullInfo, []) -> - []; -possible_ancestors(FullInfo, MissingRevs) -> - #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), - LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], - % Find the revs that are possible parents of this rev - lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> - % this leaf is a "possible ancenstor" of the missing - % revs if this LeafPos lessthan any of the missing revs - case lists:any(fun({MissingPos, _}) -> - LeafPos < MissingPos end, MissingRevs) of - true -> - [{LeafPos, LeafRevId} | Acc]; - false -> - Acc - end - end, [], LeafRevs). - -make_att_readers([]) -> - []; -make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> - % % go through the attachments looking for 'follows' in the data, - % % replace with function that reads the data from MIME stream. - Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], - [Doc#doc{atts = Atts} | make_att_readers(Rest)]. - -make_att_reader({follows, Parser}) -> - fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end; -make_att_reader(Else) -> - Else. - -clean_stack() -> - lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, - erlang:get_stacktrace()). - -set_io_priority(DbName, Options) -> - case lists:keyfind(io_priority, 1, Options) of - {io_priority, Pri} -> - erlang:put(io_priority, Pri); - false -> - erlang:put(io_priority, {interactive, DbName}) - end.