add supports of view changes in the _changes API Now when the option `seq_indexed=true` is set in the design doc, the view filter in _changes will use it to retrieve the results. Compared to the current way, using a view index will be faster to retrieve changes. It also gives the possibility to filter changes by key or get changes in a key range. All the view options can be used.
Note 1: if someone is trying to filter a changes with view options when the views are not indexed by sequence, a 400 error will be returned. Note 2: The changes will only be returned when the view is updated if seq_indexed=true Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-httpd/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-httpd/commit/70409b20 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-httpd/tree/70409b20 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-httpd/diff/70409b20 Branch: refs/heads/1994-merge-rcouch Commit: 70409b20e5be8f0fa5d125cd1e9ec2697c82d909 Parents: c629342 Author: benoitc <[email protected]> Authored: Fri Feb 7 15:38:34 2014 +0100 Committer: Paul J. Davis <[email protected]> Committed: Wed Feb 12 21:05:52 2014 -0600 ---------------------------------------------------------------------- src/couch_httpd_changes.erl | 250 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 246 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-httpd/blob/70409b20/src/couch_httpd_changes.erl ---------------------------------------------------------------------- diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl index 1e431e9..56ce559 100644 --- a/src/couch_httpd_changes.erl +++ b/src/couch_httpd_changes.erl @@ -12,7 +12,9 @@ -module(couch_httpd_changes). --export([handle_changes_req/2]). +-export([handle_changes_req/2, + handle_changes/3, + handle_view_changes/3]). -include_lib("couch/include/couch_db.hrl"). @@ -34,9 +36,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> % on other databases, _changes is free for all. ok end, - handle_changes_req2(Req, Db). -handle_changes_req2(Req, Db) -> MakeCallback = fun(Resp) -> fun({change, {ChangeProp}=Change, _}, "eventsource") -> Seq = proplists:get_value(<<"seq">>, ChangeProp), @@ -72,7 +72,7 @@ handle_changes_req2(Req, Db) -> end end, ChangesArgs = parse_changes_query(Req, Db), - ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + ChangesFun = handle_changes(ChangesArgs, Req, Db), WrapperFun = case ChangesArgs#changes_args.feed of "normal" -> {ok, Info} = couch_db:get_db_info(Db), @@ -116,6 +116,164 @@ handle_changes_req2(Req, Db) -> ) end. + +handle_changes(ChangesArgs, Req, Db) -> + case ChangesArgs#changes_args.filter of + "_view" -> + handle_view_changes(ChangesArgs, Req, Db); + _ -> + couch_changes:handle_changes(ChangesArgs, Req, Db) + end. + +%% wrapper around couch_mrview_changes. +%% This wrapper mimic couch_changes:handle_changes/3 and return a +%% Changefun that can be used by the handle_changes_req function. Also +%% while couch_mrview_changes:handle_changes/6 is returning tha view +%% changes this function return docs corresponding to the changes +%% instead so it can be used to replace the _view filter. +handle_view_changes(ChangesArgs, Req, Db) -> + %% parse view parameter + {DDocId, VName} = parse_view_param(Req), + + %% get view options + Query = case Req of + {json_req, {Props}} -> + {Q} = couch_util:get_value(<<"query">>, Props, {[]}), + Q; + _ -> + couch_httpd:qs(Req) + end, + ViewOptions = parse_view_options(Query, []), + + {ok, Infos} = couch_mrview:get_info(Db, DDocId), + case lists:member(<<"seq_indexed">>, + proplists:get_value(update_options, Infos, [])) of + true -> + handle_view_changes(Db, DDocId, VName, ViewOptions, ChangesArgs, + Req); + false when ViewOptions /= [] -> + ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]), + throw({bad_request, seqs_not_indexed}); + false -> + %% old method we are getting changes using the btree instead + %% which is not efficient, log it + ?LOG_WARN("Get view changes with seq_indexed=false.~n", []), + couch_changes:handle_changes(ChangesArgs, Req, Db) + end. + +handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions, + ChangesArgs, Req) -> + #changes_args{ + feed = ResponseType, + since = Since, + db_open_options = DbOptions} = ChangesArgs, + + Options0 = [{since, Since}, + {view_options, ViewOptions}], + Options = case ResponseType of + "continuous" -> [stream | Options0]; + "eventsource" -> [stream | Options0]; + "longpoll" -> [{stream, once} | Options0]; + _ -> Options0 + end, + + %% reopen the db with the db options given to the changes args + couch_db:close(Db0), + DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions], + {ok, Db} = couch_db:open(DbName, DbOptions1), + + + %% initialise the changes fun + ChangesFun = fun(Callback) -> + Callback(start, ResponseType), + + Acc0 = {"", 0, Db, Callback, ChangesArgs}, + couch_mrview_changes:handle_changes(DbName, DDocId, VName, + fun view_changes_cb/2, + Acc0, Options) + end, + ChangesFun. + + +view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) -> + Callback({stop, LastSeq}, Args#changes_args.feed); + +view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) -> + Callback(timeout, Args#changes_args.feed), + {ok, Acc}; +view_changes_cb({{Seq, _Key, DocId}, _VAl}, + {Prepend, OldLimit, Db0, Callback, Args}=Acc) -> + + #changes_args{ + feed = ResponseType, + limit = Limit} = Args, + + %% if the doc sequence is > to the one in the db record, reopen the + %% database since it means we don't have the latest db value. + Db = case Db0#db.update_seq >= Seq of + true -> Db0; + false -> + {ok, Db1} = couch_db:reopen_db(Db0), + Db1 + end, + + case couch_db:get_doc_info(Db, DocId) of + {ok, DocInfo} -> + %% get change row + ChangeRow = view_change_row(Db, DocInfo, Args), + %% emit change row + Callback({change, ChangeRow, Prepend}, ResponseType), + + %% if we achieved the limit, stop here, else continue. + NewLimit = OldLimit + 1, + if Limit > NewLimit -> + {ok, {<<",\n">>, Db, NewLimit, Callback, Args}}; + true -> + {stop, {<<"">>, Db, NewLimit, Callback, Args}} + end; + {error, not_found} -> + %% doc not found, continue + {ok, Acc}; + Error -> + throw(Error) + end. + + +view_change_row(Db, DocInfo, Args) -> + #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo, + [#rev_info{rev=Rev, deleted=Del} | _] = Revs, + + #changes_args{style=Style, + include_docs=InDoc, + doc_options = DocOpts, + conflicts=Conflicts}=Args, + + Changes = case Style of + main_only -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; + all_docs -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} + || #rev_info{rev=R} <- Revs] + end, + + {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++ + deleted_item(Del) ++ case InDoc of + true -> + Opts = case Conflicts of + true -> [deleted, conflicts]; + false -> [deleted] + end, + Doc = couch_index_util:load_doc(Db, DocInfo, Opts), + case Doc of + null -> + [{doc, null}]; + _ -> + [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] + end; + false -> + [] + end}. + parse_changes_query(Req, Db) -> ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> case {string:to_lower(Key), Value} of @@ -172,3 +330,87 @@ parse_changes_query(Req, Db) -> _ -> ChangesArgs end. + +parse_view_param({json_req, {Props}}) -> + {Query} = couch_util:get_value(<<"query">>, Props), + parse_view_param1(couch_util:get_value(<<"view">>, Query, <<"">>)); +parse_view_param(Req) -> + parse_view_param1(list_to_binary(couch_httpd:qs_value(Req, "view", ""))). + +parse_view_param1(ViewParam) -> + case re:split(ViewParam, <<"/">>) of + [DName, ViewName] -> + {<< "_design/", DName/binary >>, ViewName}; + _ -> + throw({bad_request, "Invalid `view` parameter."}) + end. + +parse_view_options([], Acc) -> + Acc; +parse_view_options([{K, V} | Rest], Acc) -> + Acc1 = case couch_util:to_binary(K) of + <<"reduce">> -> + [{reduce, couch_mrview_http:parse_boolean(V)}]; + <<"key">> -> + V1 = parse_json(V), + [{start_key, V1}, {end_key, V1} | Acc]; + <<"keys">> -> + [{keys, parse_json(V)} | Acc]; + <<"startkey">> -> + [{start_key, parse_json(V)} | Acc]; + <<"start_key">> -> + [{start_key, parse_json(V)} | Acc]; + <<"startkey_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"start_key_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"endkey">> -> + [{end_key, parse_json(V)} | Acc]; + <<"end_key">> -> + [{end_key, parse_json(V)} | Acc]; + <<"endkey_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"end_key_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"limit">> -> + [{limit, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"count">> -> + throw({query_parse_error, <<"QS param `count` is not `limit`">>}); + <<"stale">> when V =:= <<"ok">> orelse V =:= "ok" -> + [{stale, ok} | Acc]; + <<"stale">> when V =:= <<"update_after">> orelse V =:= "update_after" -> + [{stale, update_after} | Acc]; + <<"stale">> -> + throw({query_parse_error, <<"Invalid value for `stale`.">>}); + <<"descending">> -> + case couch_mrview_http:parse_boolean(V) of + true -> + [{direction, rev} | Acc]; + _ -> + [{direction, fwd} | Acc] + end; + <<"skip">> -> + [{skip, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"group">> -> + case couch_mrview_http:parse_booolean(V) of + true -> + [{group_level, exact} | Acc]; + _ -> + [{group_level, 0} | Acc] + end; + <<"group_level">> -> + [{group_level, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"inclusive_end">> -> + [{inclusive_end, couch_mrview_http:parse_boolean(V)}]; + _ -> + Acc + end, + parse_view_options(Rest, Acc1). + +parse_json(V) when is_list(V) -> + ?JSON_DECODE(V); +parse_json(V) -> + V. + +deleted_item(true) -> [{<<"deleted">>, true}]; +deleted_item(_) -> [].
