This is an automated email from the ASF dual-hosted git repository. jan pushed a commit to branch rebase/access-2023 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6226d65b6fe4d8bbe6cbcbbfb64e6e29aa2e889b Author: Jan Lehnardt <[email protected]> AuthorDate: Mon Jun 27 10:54:36 2022 +0200 feat(access): add access handling to replicator --- src/couch_replicator/src/couch_replicator.erl | 8 +++++- .../src/couch_replicator_scheduler_job.erl | 31 +++++++++++++++++----- .../couch_replicator_error_reporting_tests.erl | 6 ++--- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index 34c745c5d..24927f8a2 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -78,7 +78,13 @@ replicate(PostBody, Ctx) -> false -> check_authorization(RepId, UserCtx), {ok, Listener} = rep_result_listener(RepId), - Result = do_replication_loop(Rep), + Result = case do_replication_loop(Rep) of % TODO: review why we need this + {ok, {ResultJson}} -> + {PublicRepId, _} = couch_replicator_ids:replication_id(Rep), % TODO: check with options + {ok, {[{<<"replication_id">>, ?l2b(PublicRepId)} | ResultJson]}}; + Else -> + Else + end, couch_replicator_notifier:stop(Listener), Result end. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index b211da85b..9f7e4814e 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -66,6 +66,8 @@ rep_starttime, src_starttime, tgt_starttime, + src_access, + tgt_access, % checkpoint timer timer, changes_queue, @@ -682,6 +684,8 @@ init_state(Rep) -> rep_starttime = StartTime, src_starttime = get_value(<<"instance_start_time">>, SourceInfo), tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), + src_access = get_value(<<"access">>, SourceInfo), + tgt_access = get_value(<<"access">>, TargetInfo), session_id = couch_uuids:random(), source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options, true), @@ -794,8 +798,10 @@ do_checkpoint(State) -> rep_starttime = ReplicationStartTime, src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, + src_access = SrcAccess, + tgt_access = TgtAccess, stats = Stats, - rep_details = #rep{options = Options}, + rep_details = #rep{options = Options, user_ctx = UserCtx}, session_id = SessionId } = State, case commit_to_both(Source, Target) of @@ -867,11 +873,9 @@ do_checkpoint(State) -> try {SrcRevPos, SrcRevId} = update_checkpoint( - Source, SourceLog#doc{body = NewRepHistory}, source - ), + Source, SourceLog#doc{body = NewRepHistory}, SrcAccess, UserCtx, source), {TgtRevPos, TgtRevId} = update_checkpoint( - Target, TargetLog#doc{body = NewRepHistory}, target - ), + Target, TargetLog#doc{body = NewRepHistory}, TgtAccess, UserCtx, target), NewState = State#rep_state{ checkpoint_history = NewRepHistory, committed_seq = NewTsSeq, @@ -899,8 +903,12 @@ do_checkpoint(State) -> end. update_checkpoint(Db, Doc, DbType) -> + update_checkpoint(Db, Doc, false, #user_ctx{}, DbType). +update_checkpoint(Db, Doc) -> + update_checkpoint(Db, Doc, false, #user_ctx{}). +update_checkpoint(Db, Doc, Access, UserCtx, DbType) -> try - update_checkpoint(Db, Doc) + update_checkpoint(Db, Doc, Access, UserCtx) catch throw:{checkpoint_commit_failure, Reason} -> throw( @@ -910,7 +918,14 @@ update_checkpoint(Db, Doc, DbType) -> ) end. -update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> +update_checkpoint(Db, #doc{id = LogId} = Doc0, Access, UserCtx) -> + % if db has _access, then: + % get userCtx from replication and splice into doc _access + Doc = case Access of + true -> Doc0#doc{access = [UserCtx#user_ctx.name]}; + _False -> Doc0 + end, + try case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of {ok, PosRevId} -> @@ -920,6 +935,8 @@ update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> end catch throw:conflict -> + % TODO: An admin could have changed the access on the checkpoint doc. + % However unlikely, we can handle this gracefully here. case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} -> % This means that we were able to update successfully the diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl index d9c6a1048..fb7bbe688 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -109,7 +109,7 @@ t_fail_changes_queue({_Ctx, {Source, Target}}) -> RepPid = couch_replicator_test_helper:get_pid(RepId), State = sys:get_state(RepPid), - ChangesQueue = element(20, State), + ChangesQueue = element(22, State), ?assert(is_process_alive(ChangesQueue)), {ok, Listener} = rep_result_listener(RepId), @@ -126,7 +126,7 @@ t_fail_changes_manager({_Ctx, {Source, Target}}) -> RepPid = couch_replicator_test_helper:get_pid(RepId), State = sys:get_state(RepPid), - ChangesManager = element(21, State), + ChangesManager = element(23, State), ?assert(is_process_alive(ChangesManager)), {ok, Listener} = rep_result_listener(RepId), @@ -143,7 +143,7 @@ t_fail_changes_reader_proc({_Ctx, {Source, Target}}) -> RepPid = couch_replicator_test_helper:get_pid(RepId), State = sys:get_state(RepPid), - ChangesReader = element(22, State), + ChangesReader = element(24, State), ?assert(is_process_alive(ChangesReader)), {ok, Listener} = rep_result_listener(RepId),
