Is this very useful? We already have retry logic in couch_api_wrap_httpc generically for all #httpdb operations.
On Sat, Mar 12, 2011 at 09:25, <[email protected]> wrote: > > Author: fdmanana > Date: Sat Mar 12 17:25:33 2011 > New Revision: 1080950 > > URL: http://svn.apache.org/viewvc?rev=1080950&view=rev > Log: > Replication manager: restart replications that end up in an error state > > Closes COUCHDB-1085 > > Modified: > couchdb/trunk/src/couchdb/couch_replication_manager.erl > couchdb/trunk/src/couchdb/couch_replicator.erl > > Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1080950&r1=1080949&r2=1080950&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original) > +++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Sat Mar 12 > 17:25:33 2011 > @@ -27,6 +27,7 @@ > -record(rep_state, { > rep, > starting, > + retries_left, > max_retries > }). > > @@ -113,14 +114,36 @@ handle_call({rep_complete, RepId}, _From > {reply, ok, State}; > > handle_call({rep_error, RepId, Error}, _From, State) -> > - #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId), > - couch_replicator:cancel_replication(RepId), > - true = ets:delete(?REP_TO_STATE, RepId), > - true = ets:delete(?DOC_TO_REP, DocId), > - ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s", > - [pp_rep_id(RepId), DocId, to_binary(error_reason(Error))]), > - update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]), > - {reply, ok, State}; > + #rep_state{ > + rep = #rep{doc_id = DocId} = Rep, > + retries_left = RetriesLeft, > + max_retries = MaxRetries > + } = RepState = rep_state(RepId), > + NewState = case RetriesLeft > 0 of > + false -> > + couch_replicator:cancel_replication(RepId), > + true = ets:delete(?REP_TO_STATE, RepId), > + true = ets:delete(?DOC_TO_REP, DocId), > + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): > ~s" > + "~nReached maximum retry attempts (~p).", > + [pp_rep_id(RepId), DocId, > + to_binary(error_reason(Error)), MaxRetries]), > + State; > + true -> > + NewRepState = RepState#rep_state{retries_left = RetriesLeft - 1}, > + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), > + Wait = wait_period(NewRepState), > + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): > ~s" > + "~nRestarting replication in ~p seconds.", > + [pp_rep_id(RepId), DocId, > + to_binary(error_reason(Error)), Wait]), > + Server = self(), > + Pid = spawn_link(fun() -> start_replication(Server, Rep, Wait) end), > + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]} > + end, > + % TODO: maybe add error reason to replication document > + update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]), > + {reply, ok, NewState}; > > handle_call(Msg, From, State) -> > ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", > @@ -332,6 +355,7 @@ maybe_start_replication(State, DocId, Re > RepState = #rep_state{ > rep = Rep, > starting = true, > + retries_left = State#state.max_retries, > max_retries = State#state.max_retries > }, > true = ets:insert(?REP_TO_STATE, {RepId, RepState}), > @@ -339,9 +363,7 @@ maybe_start_replication(State, DocId, Re > ?LOG_INFO("Attempting to start replication `~s` (document `~s`).", > [pp_rep_id(RepId), DocId]), > Server = self(), > - Pid = spawn_link(fun() -> > - start_replication(Server, Rep, State#state.max_retries) > - end), > + Pid = spawn_link(fun() -> start_replication(Server, Rep, 0) end), > State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; > #rep_state{rep = #rep{doc_id = DocId}} -> > State; > @@ -367,32 +389,15 @@ maybe_tag_rep_doc(DocId, {RepProps}, Rep > end. > > > -start_replication(Server, #rep{id = RepId, doc_id = DocId} = Rep, > MaxRetries) -> > +start_replication(Server, #rep{id = RepId, doc_id = DocId} = Rep, Wait) -> > + ok = timer:sleep(Wait * 1000), > case (catch couch_replicator:async_replicate(Rep)) of > {ok, _} -> > ok = gen_server:call(Server, {rep_started, RepId}, infinity), > ?LOG_INFO("Document `~s` triggered replication `~s`", > [DocId, pp_rep_id(RepId)]); > Error -> > - keep_retrying(Server, Rep, Error, ?INITIAL_WAIT, MaxRetries) > - end. > - > - > -keep_retrying(Server, Rep, Error, _Wait, 0) -> > - ok = gen_server:call(Server, {rep_start_failure, Rep, Error}, infinity); > - > -keep_retrying(Server, #rep{doc_id = DocId} = Rep, Error, Wait, RetriesLeft) > -> > - ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. " > - "Retrying in ~p seconds", [pp_rep_id(Rep), DocId, Error, Wait]), > - ok = timer:sleep(Wait * 1000), > - case (catch couch_replicator:async_replicate(Rep)) of > - {ok, _} -> > - ok = gen_server:call(Server, {rep_started, Rep#rep.id}, infinity), > - #rep_state{max_retries = MaxRetries} = rep_state(Rep#rep.id), > - ?LOG_INFO("Document `~s` triggered replication `~s` after ~p > attempts", > - [DocId, pp_rep_id(Rep), MaxRetries - RetriesLeft + 1]); > - NewError -> > - keep_retrying(Server, Rep, NewError, Wait * 2, RetriesLeft - 1) > + ok = gen_server:call(Server, {rep_error, RepId, Error}, infinity) > end. > > > @@ -502,3 +507,12 @@ error_reason({error, Reason}) -> > Reason; > error_reason(Reason) -> > Reason. > + > + > +wait_period(#rep_state{max_retries = Max, retries_left = Left}) -> > + wait_period(Max - Left, ?INITIAL_WAIT). > + > +wait_period(1, T) -> > + T; > +wait_period(N, T) when N > 1 -> > + wait_period(N - 1, 2 * T). > > Modified: couchdb/trunk/src/couchdb/couch_replicator.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1080950&r1=1080949&r2=1080950&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_replicator.erl (original) > +++ couchdb/trunk/src/couchdb/couch_replicator.erl Sat Mar 12 17:25:33 2011 > @@ -105,7 +105,7 @@ async_replicate(#rep{id = {BaseId, Ext}, > ChildSpec = { > RepChildId, > {gen_server, start_link, [?MODULE, Rep, []]}, > - transient, > + temporary, > 1, > worker, > [?MODULE] > >
