Repository: couchdb-chttpd Updated Branches: refs/heads/2724-chunked-buffering f7b6f62c1 -> 45bb53436 (forced update)
Buffer rows for normal/longpoll feeds This patch causes the coordinator to accumulate data in its own buffer and reduce the number of calls to write data on the socket. The size of the buffer is configurable: [httpd] chunked_response_buffer = 1490 The default is chosen to approximately fill a standard Ethernet frame. COUCHDB-2724 Project: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/commit/b18805f9 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/tree/b18805f9 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/diff/b18805f9 Branch: refs/heads/2724-chunked-buffering Commit: b18805f921a7a6f587f1810949e27eb002cb942e Parents: f45c8b2 Author: Adam Kocoloski <[email protected]> Authored: Mon Jun 22 21:45:26 2015 -0400 Committer: Adam Kocoloski <[email protected]> Committed: Wed Jul 22 16:57:48 2015 -0400 ---------------------------------------------------------------------- src/chttpd_db.erl | 45 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/blob/b18805f9/src/chttpd_db.erl ---------------------------------------------------------------------- diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl index 71c6cec..3247d83 100644 --- a/src/chttpd_db.erl +++ b/src/chttpd_db.erl @@ -40,7 +40,10 @@ feed, mochi, prepend = "", - responding = false + responding = false, + buffer = [], + bufsize = 0, + threshold }). -define(IS_ALL_DOCS(T), ( @@ -83,6 +86,8 @@ handle_changes_req1(#httpd{}=Req, Db) -> ChangesArgs = Args0#changes_args{ filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db) }, + % Default to ~filling the payload of a standard Ethernet frame + Max = config:get_integer("httpd", "chunked_response_buffer", 1490), case ChangesArgs#changes_args.feed of "normal" -> T0 = os:timestamp(), @@ -91,12 +96,21 @@ handle_changes_req1(#httpd{}=Req, Db) -> DeltaT = timer:now_diff(os:timestamp(), T0) / 1000, couch_stats:update_histogram([couchdb, dbinfo], DeltaT), chttpd:etag_respond(Req, Etag, fun() -> - Acc0 = #cacc{feed = normal, etag = Etag, mochi = Req}, + Acc0 = #cacc{ + feed = normal, + etag = Etag, + mochi = Req, + threshold = Max + }, fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs) end); Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" -> couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]), - Acc0 = #cacc{feed = list_to_atom(Feed), mochi = Req}, + Acc0 = #cacc{ + feed = list_to_atom(Feed), + mochi = Req, + threshold = Max + }, try fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs) after @@ -166,16 +180,17 @@ changes_callback(start, Acc) -> {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk), {ok, Acc#cacc{mochi = Resp, responding = true}}; changes_callback({change, Change}, Acc) -> - #cacc{prepend = Prepend, mochi = Resp} = Acc, - {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]), - {ok, Acc#cacc{prepend = ",\r\n", mochi = Resp1}}; + Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)], + Len = iolist_size(Data), + maybe_flush_changes_feed(Acc, Data, Len); changes_callback({stop, EndSeq, Pending}, Acc) -> - #cacc{mochi = Resp} = Acc, + #cacc{buffer = Buf, mochi = Resp} = Acc, {ok, Resp1} = case is_old_couch(Resp) of true -> - chttpd:send_delayed_chunk(Resp, "\n],\n\"last_seq\":0}\n"); + chttpd:send_delayed_chunk(Resp, [Buf | "\n],\n\"last_seq\":0}\n"]); false -> chttpd:send_delayed_chunk(Resp, [ + Buf, "\n],\n\"last_seq\":", ?JSON_ENCODE(EndSeq), ",\"pending\":", @@ -197,6 +212,20 @@ changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc changes_callback({error, Reason}, Acc) -> chttpd:send_delayed_error(Acc#cacc.mochi, Reason). +maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len) + when Size > 0 andalso (Size + Len) > Max -> + #cacc{buffer = Buffer, mochi = Resp} = Acc, + {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer), + {ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize=Len, mochi = R1}}; +maybe_flush_changes_feed(Acc0, Data, Len) -> + #cacc{buffer = Buf, bufsize = Size} = Acc0, + Acc = Acc0#cacc{ + prepend = ",\r\n", + buffer = [Buf | Data], + bufsize = Size + Len + }, + {ok, Acc}. + is_old_couch(Resp) -> MochiReq = chttpd:get_delayed_req(Resp), case MochiReq:get_header_value("user-agent") of
