Add basic buffering support for other feed types With this code it is possible that changes are buffered for a long period of time and not sent out. Will work on addressing that next.
COUCHDB-2724 Project: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/commit/b9261343 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/tree/b9261343 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/diff/b9261343 Branch: refs/heads/2724-chunked-buffering Commit: b92613432e9278b144176ddf427c389b9cd5d694 Parents: b18805f Author: Adam Kocoloski <[email protected]> Authored: Wed Jun 24 13:47:01 2015 -0400 Committer: Adam Kocoloski <[email protected]> Committed: Wed Jul 22 16:57:49 2015 -0400 ---------------------------------------------------------------------- src/chttpd_db.erl | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/blob/b9261343/src/chttpd_db.erl ---------------------------------------------------------------------- diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl index 3247d83..27e1795 100644 --- a/src/chttpd_db.erl +++ b/src/chttpd_db.erl @@ -126,17 +126,18 @@ changes_callback(start, #cacc{feed = continuous} = Acc) -> {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200), {ok, Acc#cacc{mochi = Resp, responding = true}}; changes_callback({change, Change}, #cacc{feed = continuous} = Acc) -> - #cacc{mochi = Resp} = Acc, - {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]), - {ok, Acc#cacc{mochi = Resp1}}; + Data = [?JSON_ENCODE(Change) | "\n"], + Len = iolist_size(Data), + maybe_flush_changes_feed(Acc, Data, Len); changes_callback({stop, EndSeq0, Pending}, #cacc{feed = continuous} = Acc) -> - #cacc{mochi = Resp} = Acc, + #cacc{mochi = Resp, buffer = Buf} = Acc, EndSeq = case is_old_couch(Resp) of true -> 0; false -> EndSeq0 end, Row = {[ {<<"last_seq">>, EndSeq}, {<<"pending">>, Pending} ]}, - {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [?JSON_ENCODE(Row) | "\n"]), + Data = [Buf, ?JSON_ENCODE(Row) | "\n"], + {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Data), chttpd:end_delayed_json_response(Resp1); % callbacks for eventsource feed (newline-delimited eventsource Objects) @@ -149,23 +150,23 @@ changes_callback(start, #cacc{feed = eventsource} = Acc) -> {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers), {ok, Acc#cacc{mochi = Resp, responding = true}}; changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) -> - #cacc{mochi = Resp} = Acc, Seq = proplists:get_value(seq, ChangeProp), Chunk = [ "data: ", ?JSON_ENCODE(Change), "\n", "id: ", ?JSON_ENCODE(Seq), "\n\n" ], - {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk), - {ok, Acc#cacc{mochi = Resp1}}; + Len = iolist_size(Chunk), + maybe_flush_changes_feed(Acc, Chunk, Len); changes_callback(timeout, #cacc{feed = eventsource} = Acc) -> #cacc{mochi = Resp} = Acc, Chunk = "event: heartbeat\ndata: \n\n", {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk), {ok, {"eventsource", Resp1}}; changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) -> - Resp = Acc#cacc.mochi, - chttpd:end_delayed_json_response(Resp); + #cacc{mochi = Resp, buffer = Buf} = Acc, + {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf), + chttpd:end_delayed_json_response(Resp1); % callbacks for longpoll and normal (single JSON Object) changes_callback(start, #cacc{feed = normal} = Acc) ->
