Enable changes for cluster access This mostly involves rewriting filter functions so that they aren't anonymous closures. This breaks clusters when code is upgraded which is particularly problematic for the _change feed since its a long lived request.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/e09b8074 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/e09b8074 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/e09b8074 Branch: refs/heads/import Commit: e09b8074fec59a508905b700c5252df7eb5b5338 Parents: 15b84c0 Author: Paul J. Davis <paul.joseph.da...@gmail.com> Authored: Mon Mar 11 13:39:42 2013 -0500 Committer: Paul J. Davis <paul.joseph.da...@gmail.com> Committed: Tue Feb 4 17:03:25 2014 -0600 ---------------------------------------------------------------------- src/couch_changes.erl | 393 ++++++++++++++++++++++----------------------- 1 file changed, 196 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e09b8074/src/couch_changes.erl ---------------------------------------------------------------------- diff --git a/src/couch_changes.erl b/src/couch_changes.erl index d36b45f..4346109 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -18,11 +18,12 @@ get_changes_timeout/2, wait_db_updated/3, get_rest_db_updated/1, - make_filter_fun/4, - main_only_filter/1, - all_docs_filter/1 + configure_filter/4, + filter/3 ]). +-export([changes_enumerator/2]). + % For the builtin filter _docs_ids, this is the maximum number % of documents for which we trigger the optimized code path. -define(MAX_DOC_IDS, 100). @@ -51,8 +52,8 @@ handle_changes(Args1, Req, Db0) -> dir = Dir, since = Since } = Args1, - {FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0), - Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs}, + Filter = configure_filter(FilterName, Style, Req, Db0), + Args = Args1#changes_args{filter_fun = Filter}, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), StartSeq = case Dir of @@ -120,136 +121,153 @@ get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> get_callback_acc(Callback) when is_function(Callback, 2) -> {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}. -make_filter_fun([$_ | _] = FilterName, Style, Req, Db) -> - builtin_filter_fun(FilterName, Style, Req, Db); -make_filter_fun(_, main_only, _, _) -> - fun ?MODULE:main_only_filter/1; -make_filter_fun(_, all_docs, _, _) -> - fun ?MODULE:all_docs_filter/1; -make_filter_fun(FilterName, Style, Req, Db) -> - {os_filter_fun(FilterName, Style, Req, Db), []}. - -os_filter_fun(FilterName, Style, Req, Db) -> - case [list_to_binary(couch_httpd:unquote(Part)) - || Part <- string:tokens(FilterName, "/")] of - [] -> - fun(_Db2, #doc_info{revs=Revs}) -> - builtin_results(Style, Revs) - end; - [DName, FName] -> - DesignId = <<"_design/", DName/binary>>, - DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]), - % validate that the ddoc has the filter fun - #doc{body={Props}} = DDoc, - couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), - fun(Db2, DocInfo) -> - DocInfos = - case Style of - main_only -> - [DocInfo]; - all_docs -> - [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs] - end, - Docs = [Doc || {ok, Doc} <- [ - couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts]) - || DocInfo2 <- DocInfos]], - {ok, Passes} = couch_query_servers:filter_docs( - Req, Db2, DDoc, FName, Docs - ), - [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} - || {Pass, #doc{revs={RevPos,[RevId|_]}}} - <- lists:zip(Passes, Docs), Pass == true] - end; - _Else -> - throw({bad_request, - "filter parameter must be of the form `designname/filtername`"}) + +configure_filter("_doc_ids", Style, Req, _Db) -> + {doc_ids, Style, get_doc_ids(Req)}; +configure_filter("_design", Style, _Req, _Db) -> + {design_docs, Style}; +configure_filter("_view", Style, Req, Db) -> + ViewName = couch_httpd:qs_value(Req, "view", ""), + if ViewName /= "" -> ok; true -> + throw({bad_request, "`view` filter parameter is not provided."}) + end, + ViewNameParts = string:tokens(ViewName, "/"), + case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of + [DName, VName] -> + {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), + check_member_exists(DDoc, [<<"views">>, VName]), + {view, Style, DDoc, VName}; + [] -> + Msg = "`view` must be of the form `designname/viewname`", + throw({bad_request, Msg}) + end; +configure_filter([$_ | _], _Style, _Req, _Db) -> + throw({bad_request, "unknown builtin filter name"}); +configure_filter("", main_only, _Req, _Db) -> + {default, main_only}; +configure_filter("", all_docs, _Req, _Db) -> + {default, all_docs}; +configure_filter(FilterName, Style, Req, Db) -> + FilterNameParts = string:tokens(FilterName, "/"), + case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of + [DName, FName] -> + {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), + check_member_exists(DDoc, [<<"filters">>, FName]), + {custom, Style, Req, DDoc, FName}; + [] -> + {default, Style}; + _Else -> + Msg = "`filter` must be of the form `designname/filtername`", + throw({bad_request, Msg}) end. -builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) -> - DocIds = couch_util:get_value(<<"doc_ids">>, Props), - {filter_docids(DocIds, Style), DocIds}; -builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) -> + +filter(Db, #full_doc_info{}=FDI, Filter) -> + filter(Db, couch_doc:to_doc_info(FDI), Filter); +filter(_Db, DocInfo, {default, Style}) -> + apply_style(DocInfo, Style); +filter(_Db, DocInfo, {doc_ids, Style, DocIds}) -> + case lists:member(DocInfo#doc_info.id, DocIds) of + true -> + apply_style(DocInfo, Style); + false -> + [] + end; +filter(_Db, DocInfo, {design_docs, Style}) -> + case DocInfo#doc_info.id of + <<"_design", _/binary>> -> + apply_style(DocInfo, Style); + _ -> + [] + end; +filter(Db, DocInfo, {view, Style, DDoc, VName}) -> + Docs = open_revs(Db, DocInfo, Style), + {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs), + filter_revs(Passes, Docs); +filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> + Req = case Req0 of + {json_req, _} -> Req0; + #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)} + end, + Docs = open_revs(Db, DocInfo, Style), + {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), + filter_revs(Passes, Docs). + + +get_doc_ids({json_req, {Props}}) -> + check_docids(couch_util:get_value(<<"doc_ids">>, Props)); +get_doc_ids(#httpd{method='POST'}=Req) -> {Props} = couch_httpd:json_body_obj(Req), - DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil), - {filter_docids(DocIds, Style), DocIds}; -builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) -> + check_docids(couch_util:get_value(<<"doc_ids">>, Props)); +get_doc_ids(#httpd{method='GET'}=Req) -> DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), - {filter_docids(DocIds, Style), DocIds}; -builtin_filter_fun("_design", Style, _Req, _Db) -> - {filter_designdoc(Style), []}; -builtin_filter_fun("_view", Style, Req, Db) -> - ViewName = couch_httpd:qs_value(Req, "view", ""), - {filter_view(ViewName, Style, Db), []}; -builtin_filter_fun(_FilterName, _Style, _Req, _Db) -> - throw({bad_request, "unknown builtin filter name"}). - -main_only_filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]. - -all_docs_filter(#doc_info{revs=Revs}) -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs]. - -filter_docids(DocIds, Style) when is_list(DocIds)-> - fun(_Db, #doc_info{id=DocId, revs=Revs}) -> - case lists:member(DocId, DocIds) of - true -> - builtin_results(Style, Revs); - _ -> [] - end + check_docids(DocIds); +get_doc_ids(_) -> + throw({bad_request, no_doc_ids_provided}). + + +check_docids(DocIds) when is_list(DocIds) -> + lists:foreach(fun + (DocId) when not is_binary(DocId) -> + Msg = "`doc_ids` filter parameter is not a list of binaries.", + throw({bad_request, Msg}); + (_) -> ok + end, DocIds), + DocIds; +check_docids(_) -> + Msg = "`doc_ids` filter parameter is not a list of binaries.", + throw({bad_request, Msg}). + + +open_ddoc(#db{name= <<"shards/", _/binary>> =ShardName}, DDocId) -> + {_, Ref} = spawn_monitor(fun() -> + exit(fabric:open_doc(mem3:dbname(ShardName), DDocId, [])) + end), + receive + {'DOWN', Ref, _, _, {ok, _}=Response} -> + Response; + {'DOWN', Ref, _, _, Response} -> + throw(Response) end; -filter_docids(_, _) -> - throw({bad_request, "`doc_ids` filter parameter is not a list."}). - -filter_designdoc(Style) -> - fun(_Db, #doc_info{id=DocId, revs=Revs}) -> - case DocId of - <<"_design", _/binary>> -> - builtin_results(Style, Revs); - _ -> [] - end +open_ddoc(Db, DDocId) -> + case couch_db:open_doc(Db, DDocId, [ejson_body]) of + {ok, _} = Resp -> Resp; + Else -> throw(Else) end. -filter_view("", _Style, _Db) -> - throw({bad_request, "`view` filter parameter is not provided."}); -filter_view(ViewName, Style, Db) -> - case [list_to_binary(couch_httpd:unquote(Part)) - || Part <- string:tokens(ViewName, "/")] of - [] -> - throw({bad_request, "Invalid `view` parameter."}); - [DName, VName] -> - DesignId = <<"_design/", DName/binary>>, - DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]), - % validate that the ddoc has the filter fun - #doc{body={Props}} = DDoc, - couch_util:get_nested_json_value({Props}, [<<"views">>, VName]), - fun(Db2, DocInfo) -> - DocInfos = - case Style of - main_only -> - [DocInfo]; - all_docs -> - [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs] - end, - Docs = [Doc || {ok, Doc} <- [ - couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts]) - || DocInfo2 <- DocInfos]], - {ok, Passes} = couch_query_servers:filter_view( - DDoc, VName, Docs - ), - [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} - || {Pass, #doc{revs={RevPos,[RevId|_]}}} - <- lists:zip(Passes, Docs), Pass == true] - end - end. - -builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) -> - case Style of - main_only -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; - all_docs -> - [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} - || #rev_info{rev=R} <- Revs] - end. + +check_member_exists(#doc{body={Props}}, Path) -> + couch_util:get_nested_json_value({Props}, Path). + + +apply_style(#doc_info{revs=Revs}, main_only) -> + [#rev_info{rev=Rev} | _] = Revs, + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; +apply_style(#doc_info{revs=Revs}, all_docs) -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs]. + + +open_revs(Db, DocInfo, Style) -> + DocInfos = case Style of + main_only -> [DocInfo]; + all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs] + end, + OpenOpts = [deleted, conflicts], + % Relying on list comprehensions to silence errors + OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos], + [Doc || {ok, Doc} <- OpenResults]. + + +filter_revs(Passes, Docs) -> + lists:flatmap(fun + ({true, #doc{revs={RevPos, [RevId | _]}}}) -> + RevStr = couch_doc:rev_to_str({RevPos, RevId}), + Change = {[{<<"rev">>, RevStr}]}, + [Change]; + (_) -> + [] + end, lists:zip(Passes, Docs)). + get_changes_timeout(Args, Callback) -> #changes_args{ @@ -292,13 +310,13 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) - conflicts = Conflicts, limit = Limit, feed = ResponseType, - filter_fun = FilterFun + filter_fun = Filter } = Args, #changes_acc{ db = Db, seq = StartSeq, prepend = Prepend, - filter = FilterFun, + filter = Filter, callback = Callback, user_acc = UserAcc, resp_type = ResponseType, @@ -311,100 +329,81 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) - send_changes(Args, Acc0, FirstRound) -> #changes_args{ - dir = Dir, - filter = FilterName, - filter_args = FilterArgs + dir = Dir } = Args, #changes_acc{ db = Db, - seq = StartSeq + seq = StartSeq, + filter = Filter } = Acc0, - case FirstRound of - true -> - case FilterName of - "_doc_ids" when length(FilterArgs) =< ?MAX_DOC_IDS -> - send_changes_doc_ids( - FilterArgs, Db, StartSeq, Dir, fun changes_enumerator/2, Acc0); - "_design" -> - send_changes_design_docs( - Db, StartSeq, Dir, fun changes_enumerator/2, Acc0); + EnumFun = fun ?MODULE:changes_enumerator/2, + case can_optimize(FirstRound, Filter) of + {true, Fun} -> + Fun(Db, StartSeq, Dir, EnumFun, Acc0, Filter); _ -> - couch_db:changes_since( - Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0) - end; - false -> - couch_db:changes_since( - Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0) + couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc0) end. -send_changes_doc_ids(DocIds, Db, StartSeq, Dir, Fun, Acc0) -> +can_optimize(true, {doc_ids, _Style, DocIds}) + when length(DocIds) =< ?MAX_DOC_IDS -> + {true, fun send_changes_doc_ids/6}; +can_optimize(true, {design_docs, _Style}) -> + {true, fun send_changes_design_docs/6}; +can_optimize(_, _) -> + false. + + +send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> Lookups = couch_btree:lookup(Db#db.id_tree, DocIds), - FullDocInfos = lists:foldl( - fun({ok, FDI}, Acc) -> - [FDI | Acc]; - (not_found, Acc) -> - Acc - end, - [], Lookups), - send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0). + FullInfos = lists:foldl(fun + ({ok, FDI}, Acc) -> [FDI | Acc]; + (not_found, Acc) -> Acc + end, [], Lookups), + send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). -send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) -> +send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> FoldFun = fun(FullDocInfo, _, Acc) -> {ok, [FullDocInfo | Acc]} end, KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, _, FullDocInfos} = couch_btree:fold( - Db#db.id_tree, FoldFun, [], KeyOpts), - send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0). + {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts), + send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> FoldFun = case Dir of - fwd -> - fun lists:foldl/3; - rev -> - fun lists:foldr/3 + fwd -> fun lists:foldl/3; + rev -> fun lists:foldr/3 end, GreaterFun = case Dir of - fwd -> - fun(A, B) -> A > B end; - rev -> - fun(A, B) -> A =< B end + fwd -> fun(A, B) -> A > B end; + rev -> fun(A, B) -> A =< B end end, - DocInfos = lists:foldl( - fun(FDI, Acc) -> - DI = couch_doc:to_doc_info(FDI), - case GreaterFun(DI#doc_info.high_seq, StartSeq) of - true -> - [DI | Acc]; - false -> - Acc - end - end, - [], FullDocInfos), + DocInfos = lists:foldl(fun(FDI, Acc) -> + DI = couch_doc:to_doc_info(FDI), + case GreaterFun(DI#doc_info.high_seq, StartSeq) of + true -> [DI | Acc]; + false -> Acc + end + end, [], FullDocInfos), SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos), FinalAcc = try - FoldFun( - fun(DocInfo, Acc) -> - case Fun(DocInfo, Acc) of + FoldFun(fun(DocInfo, Acc) -> + case Fun(DocInfo, Acc) of {ok, NewAcc} -> NewAcc; {stop, NewAcc} -> throw({stop, NewAcc}) - end - end, - Acc0, SortedDocInfos) + end + end, Acc0, SortedDocInfos) catch - throw:{stop, Acc} -> - Acc + {stop, Acc} -> Acc end, case Dir of - fwd -> - {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}}; - rev -> - {ok, FinalAcc} + fwd -> {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}}; + rev -> {ok, FinalAcc} end. @@ -458,12 +457,12 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc) when ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> #changes_acc{ - filter = FilterFun, callback = Callback, + filter = Filter, callback = Callback, user_acc = UserAcc, limit = Limit, db = Db, timeout = Timeout, timeout_fun = TimeoutFun } = Acc, #doc_info{high_seq = Seq} = DocInfo, - Results0 = FilterFun(Db, DocInfo), + Results0 = filter(Db, DocInfo, Filter), Results = [Result || Result <- Results0, Result /= null], %% TODO: I'm thinking this should be < 1 and not =< 1 Go = if Limit =< 1 -> stop; true -> ok end, @@ -484,12 +483,12 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc) end; changes_enumerator(DocInfo, Acc) -> #changes_acc{ - filter = FilterFun, callback = Callback, prepend = Prepend, + filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, timeout = Timeout, timeout_fun = TimeoutFun } = Acc, #doc_info{high_seq = Seq} = DocInfo, - Results0 = FilterFun(Db, DocInfo), + Results0 = filter(Db, DocInfo, Filter), Results = [Result || Result <- Results0, Result /= null], Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of