Ok, so here's a start at reworking some of the memory management and buffering calculations. It fixes the regression where attachment memory wasn't being included in the memory utilization numbers, and it also includes ibrowse memory utilization for attachments (which is larger than Couch's).

The decision to flush the buffer (to disk or to the remote target server) is dependent on the number of docs in the buffer, the approximate number of attachments, and the memory utilization. I estimate the number of attachments as 0.5*nlinks, since every attachment download spawns two processes: one dedicated ibrowse worker and the attachment receiver. The dedicated ibrowse workers get the attachments out of the connection pool and let us keep a better eye on their memory usage.

Each of the thresholds is currently just defined as a macro at the top of the module. I haven't done any work on adjusting these thresholds dynamically or checkpointing as a function of elapsed time.

The replication module is getting pretty hairy again; in my opinion its probably time to refactor out the attachment stuff into its own module. I may get around to that tomorrow if no one objects.

Best, Adam

On May 16, 2009, at 2:58 PM, [email protected] wrote:

Author: kocolosk
Date: Sat May 16 18:58:18 2009
New Revision: 775507

URL: http://svn.apache.org/viewvc?rev=775507&view=rev
Log:
replicator memory management and buffer flush calculation updates

* new should_flush fun considers ndocs, nattachments, memory in making decision
* memory utilized by attachment receivers is accounted for
* download attachments using standalone connections instead of conn pool. This prevents a document request from getting stuck behind a huge attachment, which would prevent us from triggering a buffer flush in time. We also consider the memory utilization of the standalone ibrowse connection in should_flush

Modified:
   couchdb/trunk/src/couchdb/couch_rep.erl

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=775507&r1=775506&r2=775507&view=diff
= = = = = = = = ======================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Sat May 16 18:58:18 2009
@@ -17,7 +17,12 @@

-export([replicate/2]).

+-define(BUFFER_NDOCS, 1000).
+-define(BUFFER_NATTACHMENTS, 50).
+-define(BUFFER_MEMORY, 10000000). %% bytes
+
-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").

%% @spec replicate(Source::binary(), Target::binary()) ->
%%      {ok, Stats} | {error, Reason}
@@ -202,7 +207,8 @@
    ets:update_counter(Stats, docs_read, length(Docs)),

    %% save them (maybe in a buffer)
-    {NewBuffer, NewContext} = case couch_util:should_flush() of
+    {NewBuffer, NewContext} =
+    case should_flush(lists:flatlength([Docs|Buffer])) of
        true ->
            Docs2 = lists:flatten([Docs|Buffer]),
{ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
@@ -222,7 +228,7 @@
    ets:update_counter(State#state.stats, total_revs, RevsCount),
    case State#state.listeners of
    [] ->
-        % still waiting for the first listener to sen a request
+        % still waiting for the first listener to send a request
        {noreply, State#state{current_seq=LastSeq,done=true}};
    _ ->
        {stop, normal, ok, State#state{current_seq=LastSeq}}
@@ -327,13 +333,13 @@
        [Id, couch_doc:rev_to_str(Rev), Error]),
    dump_update_errors(Rest).

-attachment_loop(ReqId) ->
+attachment_loop(ReqId, Conn) ->
    couch_util:should_flush(),
    receive
        {From, {set_req_id, NewId}} ->
            %% we learn the ReqId to listen for
            From ! {self(), {ok, NewId}},
-            attachment_loop(NewId);
+            attachment_loop(NewId, Conn);
        {ibrowse_async_headers, ReqId, Status, Headers} ->
%% we got header, give the controlling process a chance to react
            receive
@@ -343,37 +349,42 @@
                    receive
                        {From, continue} ->
                            %% normal case
-                            attachment_loop(ReqId);
+                            attachment_loop(ReqId, Conn);
                        {From, fail} ->
                            %% error, failure code
                            ?LOG_ERROR(
"streaming attachment failed with status ~p",
                                [Status]),
+                            catch ibrowse:stop_worker_process(Conn),
                            exit(attachment_request_failed);
                        {From, stop_ok} ->
%% stop looping, controller will start a new loop
+                            catch ibrowse:stop_worker_process(Conn),
                            stop_ok
                    end
            end,
-            attachment_loop(ReqId);
+            attachment_loop(ReqId, Conn);
        {ibrowse_async_response, ReqId, {chunk_start,_}} ->
-            attachment_loop(ReqId);
+            attachment_loop(ReqId, Conn);
        {ibrowse_async_response, ReqId, chunk_end} ->
-            attachment_loop(ReqId);
+            attachment_loop(ReqId, Conn);
        {ibrowse_async_response, ReqId, {error, Err}} ->
            ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
+            catch ibrowse:stop_worker_process(Conn),
            exit(attachment_request_failed);
        {ibrowse_async_response, ReqId, Data} ->
            receive {From, gimme_data} -> From ! {self(), Data} end,
-            attachment_loop(ReqId);
-        {ibrowse_async_response_end, ReqId} -> ok
+            attachment_loop(ReqId, Conn);
+        {ibrowse_async_response_end, ReqId} ->
+            catch ibrowse:stop_worker_process(Conn),
+            exit(normal)
    end.

attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type, Length}}) ->
    #http_db{uri=DbUrl, headers=Headers} = DbS,
    {Pos, [RevId|_]} = Rev,
Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(? b2l(Name)),
-        "?rev=", couch_doc:rev_to_str({Pos,RevId})]),
+        "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
    ?LOG_DEBUG("Attachment URL ~p", [Url]),
    {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name,
        Type, Length),
@@ -389,11 +400,14 @@

make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) ->
    %% start the process that receives attachment data from ibrowse
-    Pid = spawn_link(fun() -> attachment_loop(nil) end),
+    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
+    {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
+    Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),

    %% make the async request
-    Options = [{stream_to, Pid}, {response_format, binary}],
- ReqId = case ibrowse:send_req(Url, Headers, get, [], Options, infinity) of
+    Opts = [{stream_to, Pid}, {response_format, binary}],
+    ReqId =
+ case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of
        {ibrowse_req_id, X} -> X;
        {error, _Reason} -> exit(attachment_request_failed)
    end,
@@ -717,6 +731,46 @@
open_doc_revs(Db, DocId, Revs, Options) ->
    couch_db:open_doc_revs(Db, DocId, Revs, Options).

+%% @spec should_flush() -> true | false
+%% @doc Calculates whether it's time to flush the document buffer. Considers
+%%        - memory utilization
+%%        - number of pending document writes
+%%        - approximate number of pending attachment writes
+should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
+    true;
+should_flush(_DocCount) ->
+    MeAndMyLinks = [self()|element(2,process_info(self(),links))],
+
+    case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
+    true -> true;
+    false ->
+        case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
+        true ->
+            [garbage_collect(Pid) || Pid <- MeAndMyLinks],
+            memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
+        false -> false
+        end
+    end.
+
+%% @spec memory_footprint([pid()]) -> integer()
+%% @doc Sum of process and binary memory utilization for all processes in list
+memory_footprint(PidList) ->
+    ProcessMemory = lists:foldl(fun(Pid, Acc) ->
+        Acc + element(2,process_info(Pid, memory))
+    end, 0, PidList),
+
+    BinaryMemory = lists:foldl(fun(Pid, Acc) ->
+        Acc + binary_memory(Pid)
+    end, 0, PidList),
+
+ ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]),
+    ProcessMemory + BinaryMemory.
+
+%% @spec binary_memory(pid()) -> integer()
+%% @doc Memory utilization of all binaries referenced by this process.
+binary_memory(Pid) ->
+    lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+        0, element(2,process_info(Pid, binary))).

update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
    [] = Options,



Reply via email to