couch_replicator: add replication using changes in a view Instead of a database, the replicator can now filter the documents using a view index. All documents having a key emitted in the view can be replicated.
View parameters can be used. Which means that you can replicate results corresponding to a key in a view or a range. Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/09d7a60c Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/09d7a60c Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/09d7a60c Branch: refs/heads/1994-merge-rcouch Commit: 09d7a60c29e4e800b6e80efd2337e9b744ae1229 Parents: 7999202 Author: benoitc <[email protected]> Authored: Sun Feb 9 00:43:23 2014 +0100 Committer: Paul J. Davis <[email protected]> Committed: Wed Feb 12 18:16:58 2014 -0600 ---------------------------------------------------------------------- src/couch_replicator.erl | 39 ++++++++++++++++++++++++++++++---- src/couch_replicator.hrl | 2 ++ src/couch_replicator_api_wrap.erl | 25 +++++++++++++++++++++- src/couch_replicator_utils.erl | 27 +++++++++++++++++++++-- 4 files changed, 86 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl index d470c8a..10aaf37 100644 --- a/src/couch_replicator.erl +++ b/src/couch_replicator.erl @@ -70,7 +70,9 @@ target_monitor = nil, source_seq = nil, use_checkpoints = true, - checkpoint_interval = 5000 + checkpoint_interval = 5000, + type = db, + view = nil }). @@ -533,7 +535,8 @@ cancel_timer(#rep_state{timer = Timer} = State) -> init_state(Rep) -> #rep{ source = Src, target = Tgt, - options = Options, user_ctx = UserCtx + options = Options, user_ctx = UserCtx, + type = Type, view = View } = Rep, {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]), {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}], @@ -547,6 +550,17 @@ init_state(Rep) -> {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), StartSeq1 = get_value(since_seq, Options, StartSeq0), StartSeq = {0, StartSeq1}, + + SourceSeq = case Type of + db -> get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ); + view -> + {DDoc, VName} = View, + {ok, VInfo} = couch_replicator_api_wrap:get_view_info(Source, DDoc, + VName), + get_value(<<"update_seq">>, VInfo, ?LOWEST_SEQ) + end, + + #doc{body={CheckpointHistory}} = SourceLog, State = #rep_state{ rep_details = Rep, @@ -571,9 +585,12 @@ init_state(Rep) -> start_db_compaction_notifier(Target, self()), source_monitor = db_monitor(Source), target_monitor = db_monitor(Target), - source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), + source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options, true), - checkpoint_interval = get_value(checkpoint_interval, Options, 5000) + checkpoint_interval = get_value(checkpoint_interval, Options, + 5000), + type = Type, + view = View }, State#rep_state{timer = start_timer(State)}. @@ -914,6 +931,20 @@ db_monitor(#db{} = Db) -> db_monitor(_HttpDb) -> nil. +source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq, + type = view, view = {DDoc, VName}}) -> + case (catch couch_replicator_api_wrap:get_view_info( + Db#httpdb{retries = 3}, DDoc, VName)) of + {ok, Info} -> + get_value(<<"update_seq">>, Info, Seq); + _ -> + Seq + end; + +source_cur_seq(#rep_state{source = Db, source_seq = Seq, + type = view, view = {DDoc, VName}}) -> + {ok, Info} = couch_replicator_api_wrap:get_view_info(Db, DDoc, VName), + get_value(<<"update_seq">>, Info, Seq); source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) -> case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator.hrl ---------------------------------------------------------------------- diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl index 018aa4b..1eee88e 100644 --- a/src/couch_replicator.hrl +++ b/src/couch_replicator.hrl @@ -18,6 +18,8 @@ target, options, user_ctx, + type = db, + view = nil, doc_id }). http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator_api_wrap.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl index 311025b..1e0e660 100644 --- a/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator_api_wrap.erl @@ -19,6 +19,7 @@ % Many options and apis aren't yet supported here, they are added as needed. -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include("couch_replicator_api_wrap.hrl"). -export([ @@ -26,6 +27,7 @@ db_open/3, db_close/1, get_db_info/1, + get_view_info/3, update_doc/3, update_doc/4, update_docs/3, @@ -121,6 +123,16 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) -> {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}. +get_view_info(#httpdb{} = Db, DDocId, ViewName) -> + Path = iolist_to_binary([DDocId, "/_view/", ViewName, "/_info"]), + send_req(Db, [{path, Path}], + fun(200, _, {Props}) -> + {ok, Props} + end); +get_view_info(#db{name = DbName}, DDocId, ViewName) -> + couch_mrview:get_view_info(DbName, DDocId, ViewName). + + ensure_full_commit(#httpdb{} = Db) -> send_req( Db, @@ -439,7 +451,8 @@ changes_since(Db, Style, StartSeq, UserFun, Options) -> }, QueryParams = get_value(query_params, Options, {[]}), Req = changes_json_req(Db, Filter, QueryParams, Options), - ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db), + ChangesFeedFun = couch_httpd_changes:handle_changes(Args, {json_req, Req}, + Db), ChangesFeedFun(fun({change, Change, _}, _) -> UserFun(json_to_doc_info(Change)); (_, _) -> @@ -454,6 +467,10 @@ maybe_add_changes_filter_q_args(BaseQS, Options) -> undefined -> BaseQS; FilterName -> + %% get list of view attributes + ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)], + ViewFields = ["key" | ViewFields0], + {Params} = get_value(query_params, Options, {[]}), [{"filter", ?b2l(FilterName)} | lists:foldl( fun({K, V}, QSAcc) -> @@ -461,6 +478,12 @@ maybe_add_changes_filter_q_args(BaseQS, Options) -> case lists:keymember(Ks, 1, QSAcc) of true -> QSAcc; + false when FilterName =:= <<"_view">> -> + V1 = case lists:member(Ks, ViewFields) of + true -> ?JSON_ENCODE(V); + false -> couch_util:to_list(V) + end, + [{Ks, V1} | QSAcc]; false -> [{Ks, couch_util:to_list(V)} | QSAcc] end http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator_utils.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl index 0baddc2..085df43 100644 --- a/src/couch_replicator_utils.erl +++ b/src/couch_replicator_utils.erl @@ -37,13 +37,34 @@ parse_rep_doc({Props}, UserCtx) -> true -> {ok, #rep{options = Options, user_ctx = UserCtx}}; false -> - Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options), - Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options), + Source = parse_rep_db(get_value(<<"source">>, Props), + ProxyParams, Options), + Target = parse_rep_db(get_value(<<"target">>, Props), + ProxyParams, Options), + + + {RepType, View} = case get_value(<<"filter">>, Props) of + <<"_view">> -> + {QP} = get_value(query_params, Options, {[]}), + ViewParam = get_value(<<"view">>, QP), + View1 = case re:split(ViewParam, <<"/">>) of + [DName, ViewName] -> + {<< "_design/", DName/binary >>, ViewName}; + _ -> + throw({bad_request, "Invalid `view` parameter."}) + end, + {view, View1}; + _ -> + {db, nil} + end, + Rep = #rep{ source = Source, target = Target, options = Options, user_ctx = UserCtx, + type = RepType, + view = View, doc_id = get_value(<<"_id">>, Props, null) }, {ok, Rep#rep{id = replication_id(Rep)}} @@ -100,6 +121,8 @@ maybe_append_filters(Base, DocIds -> [DocIds] end; + <<"_", _/binary>> = Filter -> + [Filter, get_value(query_params, Options, {[]})]; Filter -> [filter_code(Filter, Source, UserCtx), get_value(query_params, Options, {[]})]
