On Mon, Jan 24, 2011 at 3:09 PM, <[email protected]> wrote: > Author: fdmanana > Date: Mon Jan 24 14:09:06 2011 > New Revision: 1062783 > > URL: http://svn.apache.org/viewvc?rev=1062783&view=rev > Log: > Replicator DB: on restart, make several attempts to restart the replications > > Now on restart, the replicator database listener will make up to 10 attempts > to restart each replication. Before each attempt, it waits, using an > exponential > backoff strategy, before doing the next attempt. > This is very useful because when one server restarts, other servers that are > endpoints of its replications, may not be online yet. > > > Modified: > couchdb/trunk/share/www/script/test/replicator_db.js > couchdb/trunk/src/couchdb/couch_rep_db_listener.erl > > Modified: couchdb/trunk/share/www/script/test/replicator_db.js > URL: > http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/replicator_db.js?rev=1062783&r1=1062782&r2=1062783&view=diff > ============================================================================== > --- couchdb/trunk/share/www/script/test/replicator_db.js (original) > +++ couchdb/trunk/share/www/script/test/replicator_db.js Mon Jan 24 14:09:06 > 2011 > @@ -805,9 +805,16 @@ couchTests.replicator_db = function(debu > restartServer(); > continuous_replication_survives_restart(); > > - repDb.deleteDb(); > - restartServer(); > - run_on_modified_server(server_config, error_state_replication); > +/* > + * Disabled, since error state would be set on the document only after > + * the exponential backoff retry done by the replicator database listener > + * terminates, which takes too much time for a unit test. > + */ > +/* > + * repDb.deleteDb(); > + * restartServer(); > + * run_on_modified_server(server_config, error_state_replication); > + */ > > > // cleanup > > Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1062783&r1=1062782&r2=1062783&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original) > +++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 14:09:06 > 2011 > @@ -20,6 +20,8 @@ > > -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). > -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). > +-define(MAX_RETRIES, 10). > +-define(INITIAL_WAIT, 5). > > -record(state, { > changes_feed_loop = nil, > @@ -58,6 +60,29 @@ init(_) -> > handle_call({rep_db_update, Change}, _From, State) -> > {reply, ok, process_update(State, Change)}; > > +handle_call({triggered, {BaseId, _}}, _From, State) -> > + case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of > + [{BaseId, {DocId, true}}] -> > + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}); > + _ -> > + ok > + end, > + {reply, ok, State}; > + > +handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> > + DocId = get_value(<<"_id">>, Props), > + [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), > + ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " > + "the document `~s`. Last error reason was: ~p", > + [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]), > + couch_rep:update_rep_doc( > + RepDoc, > + [{<<"_replication_state">>, <<"error">>}, > + {<<"_replication_id">>, ?l2b(BaseId)}]), > + true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), > + true = ets:delete(?DOC_ID_TO_REP_ID, DocId), > + {reply, ok, State}; > + > handle_call(Msg, From, State) -> > ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p", > [Msg, From]), > @@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, Js > {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), > case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of > [] -> > - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), > + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), > true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), > + Server = self(), > Pid = spawn_link(fun() -> > - start_replication(JsonRepDoc, RepId, UserCtx) > + start_replication(Server, JsonRepDoc, RepId, UserCtx) > end), > State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; > - [{BaseId, DocId}] -> > + [{BaseId, {DocId, _}}] -> > State; > - [{BaseId, OtherDocId}] -> > + [{BaseId, {OtherDocId, false}}] -> > ?LOG_INFO("The replication specified by the document `~s` was already" > " triggered by the document `~s`", [DocId, OtherDocId]), > maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), > + State; > + [{BaseId, {OtherDocId, true}}] -> > + ?LOG_INFO("The replication specified by the document `~s` is already" > + " being triggered by the document `~s`", [DocId, OtherDocId]), > + maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), > State > end. > > @@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, > end. > > > -start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) -> > +start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) -> > case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of > Pid when is_pid(Pid) -> > ?LOG_INFO("Document `~s` triggered replication `~s`", > [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), > + ok = gen_server:call(Server, {triggered, RepId}, infinity), > couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); > Error -> > - couch_rep:update_rep_doc( > - RepDoc, > - [ > - {<<"_replication_state">>, <<"error">>}, > - {<<"_replication_id">>, ?l2b(Base)} > - ] > - ), > - ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), > Error]) > + keep_retrying( > + Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, > ?MAX_RETRIES) > + end. > + > + > +keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) -> > + ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity); > + > +keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) -> > + ?LOG_ERROR("Error starting replication `~s`: ~p. " > + "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]), > + ok = timer:sleep(Wait * 1000), > + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of > + Pid when is_pid(Pid) -> > + ok = gen_server:call(Server, {triggered, RepId}, infinity), > + {RepProps} = RepDoc, > + ?LOG_INFO("Document `~s` triggered replication `~s` after ~p > attempts", > + [get_value(<<"_id">>, RepProps), pp_rep_id(RepId), > + ?MAX_RETRIES - RetriesLeft + 1]), > + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); > + NewError -> > + keep_retrying( > + Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft > - 1) > end. > > > > >
shouldn't MAX_RETRY be a config setting ?
