Ensure heartbeats are not skipped In both the normal and continuous feeds filter functions may return false. This patch ensures heartbeats are not skipped.
Jira-1289 Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/bcbcb427 Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/bcbcb427 Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/bcbcb427 Branch: refs/heads/master Commit: bcbcb4270665c2bfb620435b1fa736bc337c619d Parents: ffd7112 Author: Bob Dionne <[email protected]> Authored: Fri Nov 25 10:44:15 2011 -0500 Committer: Bob Dionne <[email protected]> Committed: Mon Nov 28 06:13:14 2011 -0500 ---------------------------------------------------------------------- src/couchdb/couch_changes.erl | 139 ++++++++++++++++++++++++------------ 1 files changed, 92 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/bcbcb427/src/couchdb/couch_changes.erl ---------------------------------------------------------------------- diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index 267f3d7..6858d77 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -29,7 +29,9 @@ resp_type, limit, include_docs, - conflicts + conflicts, + timeout, + timeout_fun }). %% @type Req -> #httpd{} | {json_req, JsonObj()} @@ -49,6 +51,14 @@ handle_changes(Args1, Req, Db) -> fwd -> Since end, + % begin timer to deal with heartbeat when filter function fails + case Args#changes_args.heartbeat of + undefined -> + erlang:erase(last_changes_heartbeat); + Val when is_integer(Val); Val =:= true -> + put(last_changes_heartbeat, now()) + end, + if Feed == "continuous" orelse Feed == "longpoll" -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), @@ -62,16 +72,12 @@ handle_changes(Args1, Req, Db) -> ), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), + Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, + <<"">>, Timeout, TimeoutFun), try keep_sending_changes( - Args, - Callback, - UserAcc2, - Db, - StartSeq, - <<"">>, - Timeout, - TimeoutFun, + Args#changes_args{dir=fwd}, + Acc0, true) after couch_db_update_notifier:stop(Notify), @@ -82,14 +88,13 @@ handle_changes(Args1, Req, Db) -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), + Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, + UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun), {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} = send_changes( Args#changes_args{feed="normal"}, - Callback, - UserAcc2, - Db, - StartSeq, - <<>>, + Acc0, true), end_sending_changes(Callback, UserAcc3, LastSeq, Feed) end @@ -255,18 +260,15 @@ start_sending_changes(_Callback, UserAcc, "continuous") -> start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). -send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, FirstRound) -> +build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ include_docs = IncludeDocs, conflicts = Conflicts, limit = Limit, feed = ResponseType, - dir = Dir, - filter = FilterName, - filter_args = FilterArgs, filter_fun = FilterFun } = Args, - Acc0 = #changes_acc{ + #changes_acc{ db = Db, seq = StartSeq, prepend = Prepend, @@ -276,8 +278,21 @@ send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, FirstRound) -> resp_type = ResponseType, limit = Limit, include_docs = IncludeDocs, - conflicts = Conflicts - }, + conflicts = Conflicts, + timeout = Timeout, + timeout_fun = TimeoutFun + }. + +send_changes(Args, Acc0, FirstRound) -> + #changes_args{ + dir = Dir, + filter = FilterName, + filter_args = FilterArgs + } = Args, + #changes_acc{ + db = Db, + seq = StartSeq + } = Acc0, case FirstRound of true -> case FilterName of @@ -367,8 +382,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> end. -keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, - TimeoutFun, FirstRound) -> +keep_sending_changes(Args, Acc0, FirstRound) -> #changes_args{ feed = ResponseType, limit = Limit, @@ -377,13 +391,10 @@ keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, {ok, ChangesAcc} = send_changes( Args#changes_args{dir=fwd}, - Callback, - UserAcc, - Db, - StartSeq, - Prepend, + Acc0, FirstRound), #changes_acc{ + db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit } = ChangesAcc, @@ -392,28 +403,25 @@ keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); true -> case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of - {updated, UserAcc3} -> - % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]), + {updated, UserAcc4} -> DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], case couch_db:open(Db#db.name, DbOptions1) of {ok, Db2} -> keep_sending_changes( - Args#changes_args{limit=NewLimit}, - Callback, - UserAcc3, - Db2, - EndSeq, - Prepend2, - Timeout, - TimeoutFun, - false - ); + Args#changes_args{limit=NewLimit}, + ChangesAcc#changes_acc{ + db = Db2, + user_acc = UserAcc4, + seq = EndSeq, + prepend = Prepend2, + timeout = Timeout, + timeout_fun = TimeoutFun}, + false); _Else -> end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) end; - {stop, UserAcc3} -> - % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]), - end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType) + {stop, UserAcc4} -> + end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType) end end. @@ -423,24 +431,34 @@ end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> #changes_acc{ filter = FilterFun, callback = Callback, - user_acc = UserAcc, limit = Limit, db = Db + user_acc = UserAcc, limit = Limit, db = Db, + timeout = Timeout, timeout_fun = TimeoutFun } = Acc, #doc_info{high_seq = Seq} = DocInfo, Results0 = FilterFun(Db, DocInfo), Results = [Result || Result <- Results0, Result /= null], + %% TODO: I'm thinking this should be < 1 and not =< 1 Go = if Limit =< 1 -> stop; true -> ok end, case Results of [] -> - {Go, Acc#changes_acc{seq = Seq}}; + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + case Done of + stop -> + {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; + ok -> + {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} + end; _ -> ChangesRow = changes_row(Results, DocInfo, Acc), UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc), + reset_heartbeat(), {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}} end; changes_enumerator(DocInfo, Acc) -> #changes_acc{ filter = FilterFun, callback = Callback, prepend = Prepend, - user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db + user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, + timeout = Timeout, timeout_fun = TimeoutFun } = Acc, #doc_info{high_seq = Seq} = DocInfo, Results0 = FilterFun(Db, DocInfo), @@ -448,10 +466,17 @@ changes_enumerator(DocInfo, Acc) -> Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of [] -> - {Go, Acc#changes_acc{seq = Seq}}; + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + case Done of + stop -> + {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; + ok -> + {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} + end; _ -> ChangesRow = changes_row(Results, DocInfo, Acc), UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), {Go, Acc#changes_acc{ seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2, limit = Limit - 1}} @@ -504,3 +529,23 @@ get_rest_db_updated(UserAcc) -> after 0 -> {updated, UserAcc} end. + +reset_heartbeat() -> + put(last_changes_heartbeat,now()). + +maybe_heartbeat(Timeout, TimeoutFun, Acc) -> + Now = now(), + Before = get(last_changes_heartbeat), + case Before of + undefined -> + {ok, Acc}; + _ -> + case timer:now_diff(Now, Before) div 1000 >= Timeout of + true -> + Acc2 = TimeoutFun(Acc), + put(last_changes_heartbeat, Now), + Acc2; + false -> + {ok, Acc} + end + end.
