Updated Branches: refs/heads/master ffd7112c1 -> 7cd04e8ce
Failing etap for heartbeats skipped Jira-1289 Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/7cd04e8c Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/7cd04e8c Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/7cd04e8c Branch: refs/heads/master Commit: 7cd04e8ce4af1e3bd69068affb47e8b89c1615fa Parents: bcbcb42 Author: Bob Dionne <[email protected]> Authored: Fri Nov 25 10:44:46 2011 -0500 Committer: Bob Dionne <[email protected]> Committed: Mon Nov 28 06:15:11 2011 -0500 ---------------------------------------------------------------------- test/etap/073-changes.t | 96 ++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 92 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/7cd04e8c/test/etap/073-changes.t ---------------------------------------------------------------------- diff --git a/test/etap/073-changes.t b/test/etap/073-changes.t index efefa5d..97f6860 100755 --- a/test/etap/073-changes.t +++ b/test/etap/073-changes.t @@ -51,7 +51,7 @@ test_db_name() -> <<"couch_test_changes">>. main(_) -> test_util:init_code_path(), - etap:plan(39), + etap:plan(43), case (catch test()) of ok -> etap:end_tests(); @@ -69,6 +69,7 @@ test() -> test_by_doc_ids_with_since(), test_by_doc_ids_continuous(), test_design_docs_only(), + test_heartbeat(), couch_server_sup:stop(), ok. @@ -278,7 +279,7 @@ test_design_docs_only() -> ChangesArgs = #changes_args{ filter = "_design" }, - Consumer = spawn_consumer(test_db_name(), ChangesArgs, {json, null}), + Consumer = spawn_consumer(test_db_name(), ChangesArgs, {json_req, null}), {Rows, LastSeq} = wait_finished(Consumer), {ok, Db2} = couch_db:open_int(test_db_name(), []), @@ -298,7 +299,7 @@ test_design_docs_only() -> {[{<<"_id">>, <<"_design/foo">>}, {<<"_rev">>, Rev3}, {<<"_deleted">>, true}]}), - Consumer2 = spawn_consumer(test_db_name(), ChangesArgs, {json, null}), + Consumer2 = spawn_consumer(test_db_name(), ChangesArgs, {json_req, null}), {Rows2, LastSeq2} = wait_finished(Consumer2), UpSeq2 = UpSeq + 1, @@ -314,6 +315,70 @@ test_design_docs_only() -> stop(Consumer2), delete_db(Db). +test_heartbeat() -> + {ok, Db} = create_db(test_db_name()), + + {ok, Rev3} = save_doc(Db, {[ + {<<"_id">>, <<"_design/foo">>}, + {<<"language">>, <<"javascript">>}, + {<<"filters">>, {[ + {<<"foo">>, <<"function(doc) { if ((doc._id == 'doc10') || + (doc._id == 'doc11') || + (doc._id == 'doc12')) { + return true; + } else { + return false; + }}">> + }]}} + ]}), + + ChangesArgs = #changes_args{ + filter = "foo/foo", + feed = "continuous", + timeout = 10000, + heartbeat = 1000 + }, + Consumer = spawn_consumer(test_db_name(), ChangesArgs, {json_req, null}), + + {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), + timer:sleep(200), + {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), + timer:sleep(200), + {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), + timer:sleep(200), + {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), + timer:sleep(200), + {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), + timer:sleep(200), + {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), + timer:sleep(200), + {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), + timer:sleep(200), + {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), + timer:sleep(200), + {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), + Heartbeats = get_heartbeats(Consumer), + etap:is(Heartbeats, 2, "Received 2 heartbeats now"), + {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), + timer:sleep(200), + {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), + timer:sleep(200), + {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), + Heartbeats2 = get_heartbeats(Consumer), + etap:is(Heartbeats2, 3, "Received 3 heartbeats now"), + Rows = get_rows(Consumer), + etap:is(length(Rows), 3, "Received 3 changes rows"), + + {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}), + timer:sleep(200), + {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}), + timer:sleep(200), + Heartbeats3 = get_heartbeats(Consumer), + etap:is(Heartbeats3, 6, "Received 6 heartbeats now"), + stop(Consumer), + couch_db:close(Db), + delete_db(Db). + save_doc(Db, Json) -> Doc = couch_doc:from_json_obj(Json), @@ -331,6 +396,16 @@ get_rows(Consumer) -> etap:bail("Timeout getting rows from consumer") end. +get_heartbeats(Consumer) -> + Ref = make_ref(), + Consumer ! {get_heartbeats, Ref}, + receive + {hearthbeats, Ref, HeartBeats} -> + HeartBeats + after 3000 -> + etap:bail("Timeout getting heartbeats from consumer") + end. + clear_rows(Consumer) -> Ref = make_ref(), @@ -388,6 +463,7 @@ wait_finished(_Consumer) -> spawn_consumer(DbName, ChangesArgs0, Req) -> Parent = self(), spawn(fun() -> + put(heartbeat_count, 0), Callback = fun({change, {Change}, _}, _, Acc) -> Id = couch_util:get_value(<<"id">>, Change), Seq = couch_util:get_value(<<"seq">>, Change), @@ -396,11 +472,20 @@ spawn_consumer(DbName, ChangesArgs0, Req) -> ({stop, LastSeq}, _, Acc) -> Parent ! {consumer_finished, lists:reverse(Acc), LastSeq}, stop_loop(Parent, Acc); + (timeout, _, Acc) -> + put(heartbeat_count, get(heartbeat_count) + 1), + maybe_pause(Parent, Acc); (_, _, Acc) -> maybe_pause(Parent, Acc) end, {ok, Db} = couch_db:open_int(DbName, []), - ChangesArgs = ChangesArgs0#changes_args{timeout = 10, heartbeat = 10}, + ChangesArgs = case (ChangesArgs0#changes_args.timeout =:= undefined) + andalso (ChangesArgs0#changes_args.heartbeat =:= undefined) of + true -> + ChangesArgs0#changes_args{timeout = 10, heartbeat = 10}; + false -> + ChangesArgs0 + end, FeedFun = couch_changes:handle_changes(ChangesArgs, Req, Db), try FeedFun({Callback, []}) @@ -416,6 +501,9 @@ maybe_pause(Parent, Acc) -> {get_rows, Ref} -> Parent ! {rows, Ref, lists:reverse(Acc)}, maybe_pause(Parent, Acc); + {get_heartbeats, Ref} -> + Parent ! {hearthbeats, Ref, get(heartbeat_count)}, + maybe_pause(Parent, Acc); {reset, Ref} -> Parent ! {ok, Ref}, maybe_pause(Parent, []);
