Author: fdmanana
Date: Tue Apr 12 11:06:52 2011
New Revision: 1091371
URL: http://svn.apache.org/viewvc?rev=1091371&view=rev
Log:
Replication manager refactoring
Update the state of replication documents outside the replication manager
gen_server. This allows for a faster transition of replication states without
adding substantial complexity, more or less similar to what is done in the
1.1.x branch.
Modified:
couchdb/trunk/src/couchdb/couch_replication_manager.erl
couchdb/trunk/src/couchdb/couch_replicator.erl
Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1091371&r1=1091370&r2=1091371&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Tue Apr 12 11:06:52
2011
@@ -13,6 +13,10 @@
-module(couch_replication_manager).
-behaviour(gen_server).
+% public API
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
-export([code_change/3, terminate/2]).
@@ -28,8 +32,7 @@
rep,
starting,
retries_left,
- max_retries,
- last_error
+ max_retries
}).
-import(couch_replicator_utils, [
@@ -44,7 +47,6 @@
-record(state, {
changes_feed_loop = nil,
db_notifier = nil,
- rep_notifier = nil,
rep_db_name = nil,
rep_start_pids = [],
max_retries
@@ -54,6 +56,44 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+replication_started(#rep{id = {BaseId, _} = RepId}) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ update_rep_doc(DocId, [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+ ?LOG_INFO("Document `~s` triggered replication `~s`",
+ [DocId, pp_rep_id(RepId)])
+ end.
+
+
+replication_completed(#rep{id = RepId}) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+ ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+ ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+ [pp_rep_id(RepId), DocId])
+ end.
+
+
+replication_error(#rep{id = RepId}, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ % TODO: maybe add error reason to replication document
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
+ ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+ end.
+
+
init(_) ->
process_flag(trap_exit, true),
?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
@@ -72,7 +112,6 @@ init(_) ->
changes_feed_loop = Loop,
rep_db_name = RepDbName,
db_notifier = db_update_notifier(),
- rep_notifier = rep_notifier(),
max_retries = list_to_integer(
couch_config:get("replicator", "max_replication_retry_count",
"10"))
}}.
@@ -81,41 +120,22 @@ init(_) ->
handle_call({rep_db_update, Change}, _From, State) ->
{reply, ok, process_update(State, Change)};
-handle_call({rep_started, {BaseId, _Ext} = RepId}, _From, State) ->
+handle_call({rep_started, RepId}, _From, State) ->
case rep_state(RepId) of
nil ->
ok;
- #rep_state{rep = #rep{doc_id = DocId}} = RepState ->
+ RepState ->
true = ets:insert(
- ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}),
- update_rep_doc(DocId, [
- {<<"_replication_state">>, <<"triggered">>},
- {<<"_replication_id">>, ?l2b(BaseId)}]),
- ?LOG_INFO("Document `~s` triggered replication `~s`",
- [DocId, pp_rep_id(RepId)])
+ ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}})
end,
{reply, ok, State};
handle_call({rep_complete, RepId}, _From, State) ->
- case rep_state(RepId) of
- nil ->
- ok;
- #rep_state{rep = #rep{doc_id = DocId}} ->
- update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}])
- end,
+ true = ets:delete(?REP_TO_STATE, RepId),
{reply, ok, State};
handle_call({rep_error, RepId, Error}, _From, State) ->
- case rep_state(RepId) of
- nil ->
- {reply, ok, State};
- #rep_state{rep = #rep{doc_id = DocId}} = RepState ->
- NewRepState = RepState#rep_state{last_error = Error},
- true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
- % TODO: maybe add error reason to replication document
- update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
- {reply, ok, State}
- end;
+ {reply, ok, replication_error(State, RepId, Error)};
handle_call(Msg, From, State) ->
?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
@@ -151,10 +171,6 @@ handle_info({'EXIT', From, Reason}, #sta
?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
{stop, {db_update_notifier_died, Reason}, State};
-handle_info({'EXIT', From, Reason}, #state{rep_notifier = From} = State) ->
- ?LOG_ERROR("Replication notifier died. Reason: ~p", [Reason]),
- {stop, {rep_notifier_died, Reason}, State};
-
handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
% one of the replication start processes terminated successfully
{noreply, State#state{rep_start_pids = Pids -- [From]}};
@@ -168,8 +184,7 @@ terminate(_Reason, State) ->
#state{
rep_start_pids = StartPids,
changes_feed_loop = Loop,
- db_notifier = DbNotifier,
- rep_notifier = RepNotifier
+ db_notifier = DbNotifier
} = State,
stop_all_replications(),
lists:foreach(
@@ -180,7 +195,6 @@ terminate(_Reason, State) ->
[Loop | StartPids]),
true = ets:delete(?REP_TO_STATE),
true = ets:delete(?DOC_TO_REP),
- couch_replication_notifier:stop(RepNotifier),
couch_db_update_notifier:stop(DbNotifier).
@@ -249,31 +263,6 @@ db_update_notifier() ->
Notifier.
-rep_notifier() ->
- Server = self(),
- {ok, Notifier} = couch_replication_notifier:start_link(
- fun({finished, RepId, _CheckPointHistory}) ->
- case rep_state(RepId) of
- nil ->
- ok;
- #rep_state{} ->
- % TODO: maybe add replication stats to the doc
- ok = gen_server:call(Server, {rep_complete, RepId}, infinity)
- end;
- ({error, RepId, Error}) ->
- case rep_state(RepId) of
- nil ->
- ok;
- #rep_state{} ->
- ok = gen_server:call(
- Server, {rep_error, RepId, Error}, infinity)
- end;
- (_) ->
- ok
- end),
- Notifier.
-
-
restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State)
->
stop_all_replications(),
lists:foreach(
@@ -306,8 +295,8 @@ process_update(State, {Change}) ->
<<"completed">> ->
replication_complete(DocId),
State;
- <<"error">> ->
- replication_error(State, DocId)
+ _ ->
+ State
end
end.
@@ -379,11 +368,13 @@ start_replication(Server, #rep{id = RepI
replication_complete(DocId) ->
case ets:lookup(?DOC_TO_REP, DocId) of
[{DocId, RepId}] ->
- couch_replicator:cancel_replication(RepId),
- true = ets:delete(?REP_TO_STATE, RepId),
- true = ets:delete(?DOC_TO_REP, DocId),
- ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
- [pp_rep_id(RepId), DocId]);
+ case rep_state(RepId) of
+ nil ->
+ couch_replicator:cancel_replication(RepId);
+ #rep_state{} ->
+ ok
+ end,
+ true = ets:delete(?DOC_TO_REP, DocId);
_ ->
ok
end.
@@ -402,19 +393,18 @@ rep_doc_deleted(DocId) ->
end.
-replication_error(State, DocId) ->
- case ets:lookup(?DOC_TO_REP, DocId) of
- [{DocId, RepId}] ->
- maybe_retry_replication(rep_state(RepId), State);
- _ ->
- State
+replication_error(State, RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ State;
+ RepState ->
+ maybe_retry_replication(RepState, Error, State)
end.
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, State) ->
+maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State)
->
#rep_state{
rep = #rep{id = RepId, doc_id = DocId},
- max_retries = MaxRetries,
- last_error = Error
+ max_retries = MaxRetries
} = RepState,
couch_replicator:cancel_replication(RepId),
true = ets:delete(?REP_TO_STATE, RepId),
@@ -424,11 +414,10 @@ maybe_retry_replication(#rep_state{retri
[pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
State;
-maybe_retry_replication(RepState, State) ->
+maybe_retry_replication(RepState, Error, State) ->
#rep_state{
rep = #rep{id = RepId, doc_id = DocId} = Rep,
- retries_left = RetriesLeft,
- last_error = Error
+ retries_left = RetriesLeft
} = RepState,
NewRepState = RepState#rep_state{retries_left = RetriesLeft - 1},
true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1091371&r1=1091370&r2=1091371&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Tue Apr 12 11:06:52 2011
@@ -278,6 +278,8 @@ do_init(#rep{options = Options, id = {Ba
?LOG_DEBUG("Missing rev finder pids are: ~p", [MissingRevFinders]),
?LOG_DEBUG("Worker pids are: ~p", [Workers]),
+ couch_replication_manager:replication_started(Rep),
+
{ok, State#rep_state{
missing_revs_queue = MissingRevsQueue,
changes_queue = ChangesQueue,
@@ -443,10 +445,11 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-terminate(normal, #rep_state{rep_details = #rep{id = RepId},
+terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
checkpoint_history = CheckpointHistory} = State) ->
terminate_cleanup(State),
- couch_replication_notifier:notify({finished, RepId, CheckpointHistory});
+ couch_replication_notifier:notify({finished, RepId, CheckpointHistory}),
+ couch_replication_manager:replication_completed(Rep);
terminate(shutdown, State) ->
% cancelled replication throught ?MODULE:cancel_replication/1
@@ -456,12 +459,13 @@ terminate(Reason, State) ->
#rep_state{
source_name = Source,
target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId}
+ rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
} = State,
?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
[BaseId ++ Ext, Source, Target, to_binary(Reason)]),
terminate_cleanup(State),
- couch_replication_notifier:notify({error, RepId, Reason}).
+ couch_replication_notifier:notify({error, RepId, Reason}),
+ couch_replication_manager:replication_error(Rep, Reason).
terminate_cleanup(State) ->