Author: fdmanana
Date: Mon Jan 24 14:10:21 2011
New Revision: 1062784
URL: http://svn.apache.org/viewvc?rev=1062784&view=rev
Log:
Merge revision 1062783 from trunk
Replicator DB: on restart, make several attempts to restart the replications
Now on restart, the replicator database listener will make up to 10 attempts
to restart each replication. Before each attempt, it waits, using an exponential
backoff strategy, before doing the next attempt.
This is very useful because when one server restarts, other servers that are
endpoints of its replications, may not be online yet.
Modified:
couchdb/branches/1.1.x/share/www/script/test/replicator_db.js
couchdb/branches/1.1.x/src/couchdb/couch_rep_db_listener.erl
Modified: couchdb/branches/1.1.x/share/www/script/test/replicator_db.js
URL:
http://svn.apache.org/viewvc/couchdb/branches/1.1.x/share/www/script/test/replicator_db.js?rev=1062784&r1=1062783&r2=1062784&view=diff
==============================================================================
--- couchdb/branches/1.1.x/share/www/script/test/replicator_db.js (original)
+++ couchdb/branches/1.1.x/share/www/script/test/replicator_db.js Mon Jan 24
14:10:21 2011
@@ -805,9 +805,16 @@ couchTests.replicator_db = function(debu
restartServer();
continuous_replication_survives_restart();
- repDb.deleteDb();
- restartServer();
- run_on_modified_server(server_config, error_state_replication);
+/*
+ * Disabled, since error state would be set on the document only after
+ * the exponential backoff retry done by the replicator database listener
+ * terminates, which takes too much time for a unit test.
+ */
+/*
+ * repDb.deleteDb();
+ * restartServer();
+ * run_on_modified_server(server_config, error_state_replication);
+ */
// cleanup
Modified: couchdb/branches/1.1.x/src/couchdb/couch_rep_db_listener.erl
URL:
http://svn.apache.org/viewvc/couchdb/branches/1.1.x/src/couchdb/couch_rep_db_listener.erl?rev=1062784&r1=1062783&r2=1062784&view=diff
==============================================================================
--- couchdb/branches/1.1.x/src/couchdb/couch_rep_db_listener.erl (original)
+++ couchdb/branches/1.1.x/src/couchdb/couch_rep_db_listener.erl Mon Jan 24
14:10:21 2011
@@ -20,6 +20,8 @@
-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
+-define(MAX_RETRIES, 10).
+-define(INITIAL_WAIT, 5).
-record(state, {
changes_feed_loop = nil,
@@ -58,6 +60,29 @@ init(_) ->
handle_call({rep_db_update, Change}, _From, State) ->
{reply, ok, process_update(State, Change)};
+handle_call({triggered, {BaseId, _}}, _From, State) ->
+ case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
+ [{BaseId, {DocId, true}}] ->
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}});
+ _ ->
+ ok
+ end,
+ {reply, ok, State};
+
+handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
+ DocId = get_value(<<"_id">>, Props),
+ [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+ ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
+ "the document `~s`. Last error reason was: ~p",
+ [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]),
+ couch_rep:update_rep_doc(
+ RepDoc,
+ [{<<"_replication_state">>, <<"error">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
+ true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+ {reply, ok, State};
+
handle_call(Msg, From, State) ->
?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
[Msg, From]),
@@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, Js
{BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+ Server = self(),
Pid = spawn_link(fun() ->
- start_replication(JsonRepDoc, RepId, UserCtx)
+ start_replication(Server, JsonRepDoc, RepId, UserCtx)
end),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
- [{BaseId, DocId}] ->
+ [{BaseId, {DocId, _}}] ->
State;
- [{BaseId, OtherDocId}] ->
+ [{BaseId, {OtherDocId, false}}] ->
?LOG_INFO("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ State;
+ [{BaseId, {OtherDocId, true}}] ->
+ ?LOG_INFO("The replication specified by the document `~s` is already"
+ " being triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
State
end.
@@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc,
end.
-start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
+start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
Pid when is_pid(Pid) ->
?LOG_INFO("Document `~s` triggered replication `~s`",
[get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+ ok = gen_server:call(Server, {triggered, RepId}, infinity),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
- couch_rep:update_rep_doc(
- RepDoc,
- [
- {<<"_replication_state">>, <<"error">>},
- {<<"_replication_id">>, ?l2b(Base)}
- ]
- ),
- ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId),
Error])
+ keep_retrying(
+ Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES)
+ end.
+
+
+keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
+ ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
+
+keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
+ ?LOG_ERROR("Error starting replication `~s`: ~p. "
+ "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]),
+ ok = timer:sleep(Wait * 1000),
+ case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+ Pid when is_pid(Pid) ->
+ ok = gen_server:call(Server, {triggered, RepId}, infinity),
+ {RepProps} = RepDoc,
+ ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
+ [get_value(<<"_id">>, RepProps), pp_rep_id(RepId),
+ ?MAX_RETRIES - RetriesLeft + 1]),
+ couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+ NewError ->
+ keep_retrying(
+ Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft -
1)
end.