Author: fdmanana
Date: Sat Sep 17 05:45:20 2011
New Revision: 1171900
URL: http://svn.apache.org/viewvc?rev=1171900&view=rev
Log:
Simpler replication cancelation
In some scenarios it's impossible to cancel a replication by
posting to /_replicate, namely:
1) A filtered replication is started, the filter's code is
updated in the source database, therefore's a subsequent
cancel request will not generate the old replication ID
anymore, has it got a different filter code;
2) Dynamically changing the httpd port will also result in the
impossibility of computing the right replication ID
Finally, it's also nicer for users to not need to remember the
exact replication object posted before to /_replicate.
The new approach, in addition to the current approach, allows
something as simple as:
POST /_replicate
{"replication_id": "0a81b645497e6270611ec3419767a584+continuous+create_target",
"cancel": true}
The replication ID can be obtained from a continuous replication
request's response (field "_local_id"), _active_tasks (field
"replication_id") or from the log. Aliases "_local_id" and "id" are
allowed instead of "replication_id".
Closes COUCHDB-1271.
This is a backport of revision 1171899 from trunk.
Modified:
couchdb/branches/1.2.x/share/www/script/test/replication.js
couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl
couchdb/branches/1.2.x/src/couchdb/couch_replicator_utils.erl
Modified: couchdb/branches/1.2.x/share/www/script/test/replication.js
URL:
http://svn.apache.org/viewvc/couchdb/branches/1.2.x/share/www/script/test/replication.js?rev=1171900&r1=1171899&r2=1171900&view=diff
==============================================================================
--- couchdb/branches/1.2.x/share/www/script/test/replication.js (original)
+++ couchdb/branches/1.2.x/share/www/script/test/replication.js Sat Sep 17
05:45:20 2011
@@ -1663,6 +1663,114 @@ couchTests.replication = function(debug)
TEquals("bad_request", e.error);
}
+
+ // Test that we can cancel a replication just by POSTing an object
+ // like {"replication_id": Id, "cancel": true}. The replication ID
+ // can be obtained from a continuous replication request response
+ // (_local_id field), from _active_tasks or from the log
+ populateDb(sourceDb, makeDocs(1, 6));
+ populateDb(targetDb, []);
+
+ repResult = CouchDB.replicate(
+ CouchDB.protocol + host + "/" + sourceDb.name,
+ targetDb.name,
+ {
+ body: {
+ continuous: true,
+ create_target: true
+ }
+ }
+ );
+ TEquals(true, repResult.ok);
+ TEquals('string', typeof repResult._local_id);
+
+ xhr = CouchDB.request("GET", "/_active_tasks");
+ tasks = JSON.parse(xhr.responseText);
+
+ var repId;
+ for (j = 0; j < tasks.length; j++) {
+ if (tasks[j].replication_id === repResult._local_id) {
+ repId = tasks[j].replication_id;
+ }
+ }
+
+ TEquals(repResult._local_id, repId, "Replication found in _active_tasks");
+ xhr = CouchDB.request(
+ "POST", "/_replicate", {
+ body: JSON.stringify({"replication_id": repId, "cancel": true}),
+ headers: {"Content-Type": "application/json"}
+ });
+ TEquals(200, xhr.status, "Replication cancel request success");
+
+ xhr = CouchDB.request("GET", "/_active_tasks");
+ tasks = JSON.parse(xhr.responseText);
+ repId = null;
+ for (j = 0; j < tasks.length; j++) {
+ if (tasks[j].replication_id === repResult._local_id) {
+ repId = tasks[j].replication_id;
+ }
+ }
+ TEquals(null, repId, "Replication was canceled");
+
+ xhr = CouchDB.request(
+ "POST", "/_replicate", {
+ body: JSON.stringify({"replication_id": repResult._local_id, "cancel":
true}),
+ headers: {"Content-Type": "application/json"}
+ });
+ TEquals(404, xhr.status, "2nd replication cancel failed");
+
+ // Non-admin user can not cancel replications triggered by other users
+ var userDoc = CouchDB.prepareUserDoc({
+ name: "tony",
+ roles: ["mafia"]
+ }, "soprano");
+ usersDb = new CouchDB("test_suite_auth", {"X-Couch-Full-Commit":"false"});
+ server_config = [
+ {
+ section: "couch_httpd_auth",
+ key: "authentication_db",
+ value: usersDb.name
+ }
+ ];
+
+ run_on_modified_server(server_config, function() {
+ populateDb(sourceDb, makeDocs(1, 6));
+ populateDb(targetDb, []);
+ TEquals(true, usersDb.save(userDoc).ok);
+
+ repResult = CouchDB.replicate(
+ CouchDB.protocol + host + "/" + sourceDb.name,
+ targetDb.name,
+ {
+ body: {
+ continuous: true
+ }
+ }
+ );
+ TEquals(true, repResult.ok);
+ TEquals('string', typeof repResult._local_id);
+
+ TEquals(true, CouchDB.login("tony", "soprano").ok);
+ TEquals('tony', CouchDB.session().userCtx.name);
+
+ xhr = CouchDB.request(
+ "POST", "/_replicate", {
+ body: JSON.stringify({"replication_id": repResult._local_id, "cancel":
true}),
+ headers: {"Content-Type": "application/json"}
+ });
+ TEquals(401, xhr.status, "Unauthorized to cancel replication");
+ TEquals("unauthorized", JSON.parse(xhr.responseText).error);
+
+ TEquals(true, CouchDB.logout().ok);
+
+ xhr = CouchDB.request(
+ "POST", "/_replicate", {
+ body: JSON.stringify({"replication_id": repResult._local_id, "cancel":
true}),
+ headers: {"Content-Type": "application/json"}
+ });
+ TEquals(200, xhr.status, "Authorized to cancel replication");
+ });
+
// cleanup
usersDb.deleteDb();
sourceDb.deleteDb();
Modified: couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl
URL:
http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl?rev=1171900&r1=1171899&r2=1171900&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl Sat Sep 17 05:45:20
2011
@@ -72,10 +72,15 @@
}).
-replicate(#rep{id = RepId, options = Options} = Rep) ->
+replicate(#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep) ->
case get_value(cancel, Options, false) of
true ->
- cancel_replication(RepId);
+ case get_value(id, Options, nil) of
+ nil ->
+ cancel_replication(RepId);
+ RepId2 ->
+ cancel_replication(RepId2, UserCtx)
+ end;
false ->
{ok, Listener} = rep_result_listener(RepId),
Result = do_replication_loop(Rep),
@@ -84,12 +89,12 @@ replicate(#rep{id = RepId, options = Opt
end.
-do_replication_loop(#rep{id = {BaseId,_} = Id, options = Options} = Rep) ->
+do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
case async_replicate(Rep) of
{ok, _Pid} ->
case get_value(continuous, Options, false) of
true ->
- {ok, {continuous, ?l2b(BaseId)}};
+ {ok, {continuous, ?l2b(BaseId ++ Ext)}};
false ->
wait_for_result(Id)
end;
@@ -176,20 +181,47 @@ wait_for_result(RepId) ->
cancel_replication({BaseId, Extension}) ->
FullRepId = BaseId ++ Extension,
+ ?LOG_INFO("Canceling replication `~s`...", [FullRepId]),
case supervisor:terminate_child(couch_rep_sup, FullRepId) of
ok ->
+ ?LOG_INFO("Replication `~s` canceled.", [FullRepId]),
case supervisor:delete_child(couch_rep_sup, FullRepId) of
ok ->
- {ok, {cancelled, ?l2b(BaseId)}};
+ {ok, {cancelled, ?l2b(FullRepId)}};
{error, not_found} ->
- {ok, {cancelled, ?l2b(BaseId)}};
+ {ok, {cancelled, ?l2b(FullRepId)}};
Error ->
Error
end;
Error ->
+ ?LOG_ERROR("Error canceling replication `~s`: ~p", [FullRepId, Error]),
Error
end.
+cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
+ case lists:member(<<"_admin">>, Roles) of
+ true ->
+ cancel_replication(RepId);
+ false ->
+ {BaseId, Ext} = RepId,
+ case lists:keysearch(
+ BaseId ++ Ext, 1, supervisor:which_children(couch_rep_sup)) of
+ {value, {_, Pid, _, _}} when is_pid(Pid) ->
+ case (catch gen_server:call(Pid, get_details, infinity)) of
+ {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
+ cancel_replication(RepId);
+ {ok, _} ->
+ throw({unauthorized,
+ <<"Can't cancel a replication triggered by another
user">>});
+ {'EXIT', {noproc, {gen_server, call, _}}} ->
+ {error, not_found};
+ Error ->
+ throw(Error)
+ end;
+ _ ->
+ {error, not_found}
+ end
+ end.
init(InitArgs) ->
try
@@ -243,6 +275,7 @@ do_init(#rep{options = Options, id = {Ba
couch_task_status:add_task([
{type, replication},
+ {replication_id, ?l2b(BaseId ++ Ext)},
{source, ?l2b(SourceName)},
{target, ?l2b(TargetName)},
{continuous, get_value(continuous, Options, false)},
@@ -348,6 +381,9 @@ handle_info({'EXIT', Pid, Reason}, #rep_
end.
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+ {reply, {ok, Rep}, State};
+
handle_call({report_seq_done, Seq, StatsInc}, From,
#rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done =
HighestDone,
current_through_seq = ThroughSeq, stats = Stats} = State) ->
Modified: couchdb/branches/1.2.x/src/couchdb/couch_replicator_utils.erl
URL:
http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_replicator_utils.erl?rev=1171900&r1=1171899&r2=1171900&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_replicator_utils.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_replicator_utils.erl Sat Sep 17
05:45:20 2011
@@ -31,16 +31,22 @@
parse_rep_doc({Props}, UserCtx) ->
ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
Options = make_options(Props),
- Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams,
Options),
- Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams,
Options),
- Rep = #rep{
- source = Source,
- target = Target,
- options = Options,
- user_ctx = UserCtx,
- doc_id = get_value(<<"_id">>, Props)
- },
- {ok, Rep#rep{id = replication_id(Rep)}}.
+ case get_value(cancel, Options, false) andalso
+ (get_value(id, Options, nil) =/= nil) of
+ true ->
+ {ok, #rep{options = Options, user_ctx = UserCtx}};
+ false ->
+ Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams,
Options),
+ Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams,
Options),
+ Rep = #rep{
+ source = Source,
+ target = Target,
+ options = Options,
+ user_ctx = UserCtx,
+ doc_id = get_value(<<"_id">>, Props)
+ },
+ {ok, Rep#rep{id = replication_id(Rep)}}
+ end.
replication_id(#rep{options = Options} = Rep) ->
@@ -229,6 +235,10 @@ convert_options([])->
[];
convert_options([{<<"cancel">>, V} | R]) ->
[{cancel, V} | convert_options(R)];
+convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+ IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
+ Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
+ [{id, Id} | convert_options(R)];
convert_options([{<<"create_target">>, V} | R]) ->
[{create_target, V} | convert_options(R)];
convert_options([{<<"continuous">>, V} | R]) ->