Updated Branches: refs/heads/1994-merge-rcouch 3aca55450 -> 5f03520fa
couch_mrview: couch_mrview_changes:handle_changes Similar to couch_changes:handle_changes but for view changes. It add support for longpolling, normal and continuous stream The API differs from the one for doc by beeing independant from the transport: the support of HTTP will be added on top for example. This API will be also used to replace the view filter in the current _changes API. Also add unittests. Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/5f03520f Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/5f03520f Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/5f03520f Branch: refs/heads/1994-merge-rcouch Commit: 5f03520faa79159dbf48b3490cc01e8cbb6e8e6e Parents: 3aca554 Author: Benoit Chesneau <[email protected]> Authored: Fri Jan 31 13:13:23 2014 +0100 Committer: Benoit Chesneau <[email protected]> Committed: Fri Jan 31 18:41:38 2014 +0100 ---------------------------------------------------------------------- apps/couch_index/src/couch_index_server.erl | 7 + apps/couch_mrview/src/couch_mrview_changes.erl | 173 +++++++++++++++++ .../couch_mrview/src/couch_mrview_test_util.erl | 2 + apps/couch_mrview/test/09-index-events.t | 17 +- apps/couch_mrview/test/10-index-changes.t | 194 +++++++++++++++++++ 5 files changed, 392 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/5f03520f/apps/couch_index/src/couch_index_server.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_server.erl b/apps/couch_index/src/couch_index_server.erl index 86791db..facde14 100644 --- a/apps/couch_index/src/couch_index_server.erl +++ b/apps/couch_index/src/couch_index_server.erl @@ -159,6 +159,8 @@ reset_indexes(DbName, Root) -> MRef = erlang:monitor(process, Pid), gen_server:cast(Pid, delete), receive {'DOWN', MRef, _, _, _} -> ok end, + couch_index_event:notify({index_delete, + {DbName, DDocId, couch_mrview_index}}), rem_from_ets(DbName, Sig, DDocId, Pid) end, lists:foreach(Fun, ets:lookup(?BY_DB, DbName)), @@ -193,6 +195,11 @@ update_notify({ddoc_updated, {DbName, DDocId}}) -> fun({_DbName, {_DDocId, Sig}}) -> case ets:lookup(?BY_SIG, {DbName, Sig}) of [{_, IndexPid}] -> + %% notify to event listeners that the index has been + %% updated + couch_index_event:notify({index_update, + {DbName, DDocId, + couch_mrview_index}}), (catch gen_server:cast(IndexPid, ddoc_updated)); [] -> ok http://git-wip-us.apache.org/repos/asf/couchdb/blob/5f03520f/apps/couch_mrview/src/couch_mrview_changes.erl ---------------------------------------------------------------------- diff --git a/apps/couch_mrview/src/couch_mrview_changes.erl b/apps/couch_mrview/src/couch_mrview_changes.erl new file mode 100644 index 0000000..2b8f910 --- /dev/null +++ b/apps/couch_mrview/src/couch_mrview_changes.erl @@ -0,0 +1,173 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. +% +-module(couch_mrview_changes). + +-export([handle_changes/6]). + +-include_lib("couch/include/couch_db.hrl"). + +-record(vst, {dbname, + ddoc, + view, + view_options, + since, + callback, + acc, + user_timeout, + timeout, + heartbeat, + timeout_acc=0, + notifier, + stream}). + +-type changes_stream() :: true | false | once. +-type changes_options() :: [{stream, changes_stream()} | + {since, integer()} | + {view_options, list()} | + {timeout, integer()} | + {heartbeat, true | integer()}]. + +-export_type([changes_stream/0]). +-export_type([changes_options/0]). + +%% @doc function returning changes in a streaming fashion if needed. +-spec handle_changes(binary(), binary(), binary(), function(), term(), + changes_options()) -> ok | {error, term()}. +handle_changes(DbName, DDocId, View, Fun, Acc, Options) -> + Since = proplists:get_value(since, Options, 0), + Stream = proplists:get_value(stream, Options, false), + ViewOptions = proplists:get_value(view_options, Options, []), + + State0 = #vst{dbname=DbName, + ddoc=DDocId, + view=View, + view_options=ViewOptions, + since=Since, + callback=Fun, + acc=Acc}, + + case view_changes_since(State0) of + {ok, #vst{since=LastSeq, acc=Acc2}=State} -> + case Stream of + true -> + start_loop(State#vst{stream=true}, Options); + once when LastSeq =:= Since -> + start_loop(State#vst{stream=once}, Options); + _ -> + Fun(stop, {LastSeq, Acc2}) + end; + {stop, #vst{since=LastSeq, acc=Acc2}} -> + Fun(stop, {LastSeq, Acc2}); + Error -> + Error + end. + +start_loop(#vst{dbname=DbName, ddoc=DDocId}=State, Options) -> + {UserTimeout, Timeout, Heartbeat} = changes_timeout(Options), + Notifier = index_update_notifier(DbName, DDocId), + try + loop(State#vst{notifier=Notifier, + user_timeout=UserTimeout, + timeout=Timeout, + heartbeat=Heartbeat}) + after + couch_index_event:stop(Notifier) + end. + +loop(#vst{since=Since, callback=Callback, acc=Acc, + user_timeout=UserTimeout, timeout=Timeout, + heartbeat=Heartbeat, timeout_acc=TimeoutAcc, + stream=Stream}=State) -> + receive + index_update -> + case view_changes_since(State) of + {ok, State2} when Stream =:= true -> + loop(State2#vst{timeout_acc=0}); + {ok, #vst{since=LastSeq, acc=Acc2}} -> + Callback(stop, {LastSeq, Acc2}); + {stop, #vst{since=LastSeq, acc=Acc2}} -> + Callback(stop, {LastSeq, Acc2}) + end; + index_delete -> + Callback(stop, {Since, Acc}) + after Timeout -> + TimeoutAcc2 = TimeoutAcc + Timeout, + case UserTimeout =< TimeoutAcc2 of + true -> + Callback(stop, {Since, Acc}); + false when Heartbeat =:= true -> + case Callback(heartbeat, Acc) of + {ok, Acc2} -> + loop(State#vst{acc=Acc2, timeout_acc=TimeoutAcc2}); + {stop, Acc2} -> + Callback(stop, {Since, Acc2}) + end; + _ -> + Callback(stop, {Since, Acc}) + end + end. + +changes_timeout(Options) -> + DefaultTimeout = list_to_integer( + couch_config:get("httpd", "changes_timeout", "60000") + ), + UserTimeout = proplists:get_value(timeout, Options, DefaultTimeout), + {Timeout, Heartbeat} = case proplists:get_value(heartbeat, Options) of + undefined -> {UserTimeout, false}; + true -> + T = erlang:min(DefaultTimeout, UserTimeout), + {T, true}; + H -> + T = erlang:min(H, UserTimeout), + {T, true} + end, + {UserTimeout, Timeout, Heartbeat}. + +view_changes_since(#vst{dbname=DbName, ddoc=DDocId, view=View, + view_options=Options, since=Since, + callback=Callback, acc=UserAcc}=State) -> + Wrapper = fun ({{Seq, _Key, _DocId}, _Val}=KV, {Go, Acc2, OldSeq}) -> + LastSeq = if OldSeq < Seq -> Seq; + true -> OldSeq + end, + + case Callback(KV, Acc2) of + {ok, Acc3} -> {ok, {Go, Acc3, LastSeq}}; + {stop, Acc3} -> {stop, {stop, Acc3, LastSeq}} + end + end, + + Acc0 = {ok, UserAcc, Since}, + case couch_mrview:view_changes_since(DbName, DDocId, View, Since, + Wrapper, Options, Acc0) of + {ok, {Go, UserAcc2, Since2}}-> + {Go, State#vst{since=Since2, acc=UserAcc2}}; + Error -> + Error + end. + +index_update_notifier(#db{name=DbName}, DDocId) -> + index_update_notifier(DbName, DDocId); +index_update_notifier(DbName, DDocId) -> + Self = self(), + {ok, NotifierPid} = couch_index_event:start_link(fun + ({index_update, {Name, Id, couch_mrview_index}}) + when Name =:= DbName, Id =:= DDocId -> + Self ! index_update; + ({index_delete, {Name, Id, couch_mrview_index}}) + when Name =:= DbName, Id =:= DDocId -> + Self ! index_delete; + (_) -> + ok + end), + NotifierPid. http://git-wip-us.apache.org/repos/asf/couchdb/blob/5f03520f/apps/couch_mrview/src/couch_mrview_test_util.erl ---------------------------------------------------------------------- diff --git a/apps/couch_mrview/src/couch_mrview_test_util.erl b/apps/couch_mrview/src/couch_mrview_test_util.erl index c67f24f..bcf8a87 100644 --- a/apps/couch_mrview/src/couch_mrview_test_util.erl +++ b/apps/couch_mrview/src/couch_mrview_test_util.erl @@ -34,6 +34,8 @@ new_db(Name, Type) -> {ok, Db} = couch_db:create(Name, [{user_ctx, ?ADMIN}]), save_docs(Db, [ddoc(Type)]). +delete_db(Name) -> + couch_server:delete(Name, [{user_ctx, ?ADMIN}]). save_docs(Db, Docs) -> {ok, _} = couch_db:update_docs(Db, Docs, []), http://git-wip-us.apache.org/repos/asf/couchdb/blob/5f03520f/apps/couch_mrview/test/09-index-events.t ---------------------------------------------------------------------- diff --git a/apps/couch_mrview/test/09-index-events.t b/apps/couch_mrview/test/09-index-events.t index 90654b8..1489e4e 100644 --- a/apps/couch_mrview/test/09-index-events.t +++ b/apps/couch_mrview/test/09-index-events.t @@ -15,7 +15,7 @@ % the License. main(_) -> - etap:plan(2), + etap:plan(4), case (catch test()) of ok -> etap:end_tests(); @@ -30,6 +30,7 @@ test() -> test_util:start_couch(), {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes), test_update_event(Db), + test_delete_event(Db), test_util:stop_couch(), ok. @@ -44,3 +45,17 @@ test_update_event(Db) -> etap:is(Event, Expect, "index update events OK") end, couch_index_event:stop(Pid). + +test_delete_event(Db) -> + ok = couch_mrview:refresh(Db, <<"_design/bar">>), + {ok, Pid} = couch_index_event:start_link(self()), + + etap:ok(is_pid(Pid), "event handler added"), + couch_mrview_test_util:delete_db(<<"foo">>), + Expect = {index_delete, {<<"foo">>, <<"_design/bar">>, + couch_mrview_index}}, + receive + Event -> + etap:is(Event, Expect, "index delete events OK") + end, + couch_index_event:stop(Pid). http://git-wip-us.apache.org/repos/asf/couchdb/blob/5f03520f/apps/couch_mrview/test/10-index-changes.t ---------------------------------------------------------------------- diff --git a/apps/couch_mrview/test/10-index-changes.t b/apps/couch_mrview/test/10-index-changes.t new file mode 100644 index 0000000..627376f --- /dev/null +++ b/apps/couch_mrview/test/10-index-changes.t @@ -0,0 +1,194 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap + +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +main(_) -> + etap:plan(6), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + timer:sleep(300), + ok. + +test() -> + test_util:start_couch(), + {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes), + test_normal_changes(Db), + test_stream_once(Db), + test_stream_once_since(Db), + test_stream_once_timeout(Db), + test_stream_once_heartbeat(Db), + test_stream(Db), + test_util:stop_couch(), + ok. + +test_normal_changes(Db) -> + Result = run_query(Db, []), + Expect = {ok, 11, [ + {{2, 1, <<"1">>}, 1}, + {{3, 10, <<"10">>}, 10}, + {{4, 2, <<"2">>}, 2}, + {{5, 3, <<"3">>}, 3}, + {{6, 4, <<"4">>}, 4}, + {{7, 5, <<"5">>}, 5}, + {{8, 6, <<"6">>}, 6}, + {{9, 7, <<"7">>}, 7}, + {{10, 8, <<"8">>}, 8}, + {{11, 9, <<"9">>}, 9} + ]}, + etap:is(Result, Expect, "normal changes worked."). + +test_stream_once(Db) -> + Result = run_query(Db, [{stream, once}]), + Expect = {ok, 11, [ + {{2, 1, <<"1">>}, 1}, + {{3, 10, <<"10">>}, 10}, + {{4, 2, <<"2">>}, 2}, + {{5, 3, <<"3">>}, 3}, + {{6, 4, <<"4">>}, 4}, + {{7, 5, <<"5">>}, 5}, + {{8, 6, <<"6">>}, 6}, + {{9, 7, <<"7">>}, 7}, + {{10, 8, <<"8">>}, 8}, + {{11, 9, <<"9">>}, 9} + ]}, + etap:is(Result, Expect, "stream once since 0 worked."). + + +test_stream_once_since(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 11}, + {stream, once}]), + Self ! {result, Result} + end), + + spawn(fun() -> + timer:sleep(1000), + {ok, Db1} = save_doc(Db, 11), + couch_mrview:refresh(Db1, <<"_design/bar">>) + end), + + Expect = {ok,12,[{{12,11,<<"11">>},11}]}, + + receive + {result, Result} -> + etap:is(Result, Expect, "normal changes worked.") + after 5000 -> + io:format("never got the change", []) + end. + + +test_stream_once_timeout(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 12}, + {stream, once}, + {timeout, 3000}]), + Self ! {result, Result} + end), + + + + Expect = {ok, 12, []}, + + receive + {result, Result} -> + etap:is(Result, Expect, "got timeout.") + after 5000 -> + io:format("never got the change", []) + end. + +test_stream_once_heartbeat(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 12}, + {stream, once}, + {heartbeat, 1000}]), + Self ! {result, Result} + end), + + spawn(fun() -> + timer:sleep(3000), + {ok, Db1} = save_doc(Db, 12), + couch_mrview:refresh(Db1, <<"_design/bar">>) + end), + + Expect = {ok,13,[heartbeat, + heartbeat, + heartbeat, + {{13,12,<<"12">>},12}]}, + + + + receive + {result, Result} -> + etap:is(Result, Expect, "heartbeat OK.") + after 5000 -> + io:format("never got the change", []) + end. + + +test_stream(Db) -> + Self = self(), + spawn(fun() -> + Result = run_query(Db, [{since, 13}, + stream, + {timeout, 3000}]), + Self ! {result, Result} + end), + + spawn(fun() -> + timer:sleep(1000), + {ok, Db1} = save_doc(Db, 13), + couch_mrview:refresh(Db1, <<"_design/bar">>), + {ok, Db2} = save_doc(Db1, 14), + couch_mrview:refresh(Db2, <<"_design/bar">>) + end), + + Expect = {ok, 15,[{{14,13,<<"13">>},13}, + {{15,14,<<"14">>},14}]}, + + receive + {result, Result} -> + etap:is(Result, Expect, "stream OK.") + after 5000 -> + io:format("never got the change", []) + end. + + +save_doc(Db, Id) -> + Doc = couch_mrview_test_util:doc(Id), + {ok, _Rev} = couch_db:update_doc(Db, Doc, []), + {ok, _} = couch_db:ensure_full_commit(Db), + couch_db:reopen(Db). + +run_query(Db, Opts) -> + Fun = fun + (stop, {LastSeq, Acc}) -> + {ok, LastSeq, Acc}; + (heartbeat, Acc) -> + {ok, [heartbeat | Acc]}; + (Event, Acc) -> + {ok, [Event | Acc]} + end, + couch_mrview:refresh(Db, <<"_design/bar">>), + {ok, LastSeq, R} = couch_mrview_changes:handle_changes(Db, <<"_design/bar">>, + <<"baz">>, Fun, [], Opts), + {ok, LastSeq, lists:reverse(R)}.
