This is an automated email from the ASF dual-hosted git repository.

jan pushed a commit to branch rebase/access-2023
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6226d65b6fe4d8bbe6cbcbbfb64e6e29aa2e889b
Author: Jan Lehnardt <[email protected]>
AuthorDate: Mon Jun 27 10:54:36 2022 +0200

    feat(access): add access handling to replicator
---
 src/couch_replicator/src/couch_replicator.erl      |  8 +++++-
 .../src/couch_replicator_scheduler_job.erl         | 31 +++++++++++++++++-----
 .../couch_replicator_error_reporting_tests.erl     |  6 ++---
 3 files changed, 34 insertions(+), 11 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator.erl 
b/src/couch_replicator/src/couch_replicator.erl
index 34c745c5d..24927f8a2 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -78,7 +78,13 @@ replicate(PostBody, Ctx) ->
         false ->
             check_authorization(RepId, UserCtx),
             {ok, Listener} = rep_result_listener(RepId),
-            Result = do_replication_loop(Rep),
+            Result = case do_replication_loop(Rep) of % TODO: review why we 
need this
+            {ok, {ResultJson}} ->
+                {PublicRepId, _} = couch_replicator_ids:replication_id(Rep), % 
TODO: check with options
+                {ok, {[{<<"replication_id">>, ?l2b(PublicRepId)} | 
ResultJson]}};
+            Else ->
+                Else
+        end,
             couch_replicator_notifier:stop(Listener),
             Result
     end.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl 
b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index b211da85b..9f7e4814e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -66,6 +66,8 @@
     rep_starttime,
     src_starttime,
     tgt_starttime,
+    src_access,
+    tgt_access,
     % checkpoint timer
     timer,
     changes_queue,
@@ -682,6 +684,8 @@ init_state(Rep) ->
         rep_starttime = StartTime,
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+        src_access = get_value(<<"access">>, SourceInfo),
+        tgt_access = get_value(<<"access">>, TargetInfo),
         session_id = couch_uuids:random(),
         source_seq = SourceSeq,
         use_checkpoints = get_value(use_checkpoints, Options, true),
@@ -794,8 +798,10 @@ do_checkpoint(State) ->
         rep_starttime = ReplicationStartTime,
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
+        src_access = SrcAccess,
+        tgt_access = TgtAccess,
         stats = Stats,
-        rep_details = #rep{options = Options},
+        rep_details = #rep{options = Options, user_ctx = UserCtx},
         session_id = SessionId
     } = State,
     case commit_to_both(Source, Target) of
@@ -867,11 +873,9 @@ do_checkpoint(State) ->
 
             try
                 {SrcRevPos, SrcRevId} = update_checkpoint(
-                    Source, SourceLog#doc{body = NewRepHistory}, source
-                ),
+                    Source, SourceLog#doc{body = NewRepHistory}, SrcAccess, 
UserCtx, source),
                 {TgtRevPos, TgtRevId} = update_checkpoint(
-                    Target, TargetLog#doc{body = NewRepHistory}, target
-                ),
+                    Target, TargetLog#doc{body = NewRepHistory}, TgtAccess, 
UserCtx, target),
                 NewState = State#rep_state{
                     checkpoint_history = NewRepHistory,
                     committed_seq = NewTsSeq,
@@ -899,8 +903,12 @@ do_checkpoint(State) ->
     end.
 
 update_checkpoint(Db, Doc, DbType) ->
+    update_checkpoint(Db, Doc, false, #user_ctx{}, DbType).
+update_checkpoint(Db, Doc) ->
+    update_checkpoint(Db, Doc, false, #user_ctx{}).
+update_checkpoint(Db, Doc, Access, UserCtx, DbType) ->
     try
-        update_checkpoint(Db, Doc)
+        update_checkpoint(Db, Doc, Access, UserCtx)
     catch
         throw:{checkpoint_commit_failure, Reason} ->
             throw(
@@ -910,7 +918,14 @@ update_checkpoint(Db, Doc, DbType) ->
             )
     end.
 
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+update_checkpoint(Db, #doc{id = LogId} = Doc0, Access, UserCtx) ->
+    % if db has _access, then:
+    %    get userCtx from replication and splice into doc _access
+    Doc = case Access of
+        true -> Doc0#doc{access = [UserCtx#user_ctx.name]};
+        _False -> Doc0
+    end,
+
     try
         case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
             {ok, PosRevId} ->
@@ -920,6 +935,8 @@ update_checkpoint(Db, #doc{id = LogId, body = LogBody} = 
Doc) ->
         end
     catch
         throw:conflict ->
+            % TODO: An admin could have changed the access on the checkpoint 
doc.
+            %       However unlikely, we can handle this gracefully here.
             case (catch couch_replicator_api_wrap:open_doc(Db, LogId, 
[ejson_body])) of
                 {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
                     % This means that we were able to update successfully the
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index d9c6a1048..fb7bbe688 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -109,7 +109,7 @@ t_fail_changes_queue({_Ctx, {Source, Target}}) ->
 
     RepPid = couch_replicator_test_helper:get_pid(RepId),
     State = sys:get_state(RepPid),
-    ChangesQueue = element(20, State),
+    ChangesQueue = element(22, State),
     ?assert(is_process_alive(ChangesQueue)),
 
     {ok, Listener} = rep_result_listener(RepId),
@@ -126,7 +126,7 @@ t_fail_changes_manager({_Ctx, {Source, Target}}) ->
 
     RepPid = couch_replicator_test_helper:get_pid(RepId),
     State = sys:get_state(RepPid),
-    ChangesManager = element(21, State),
+    ChangesManager = element(23, State),
     ?assert(is_process_alive(ChangesManager)),
 
     {ok, Listener} = rep_result_listener(RepId),
@@ -143,7 +143,7 @@ t_fail_changes_reader_proc({_Ctx, {Source, Target}}) ->
 
     RepPid = couch_replicator_test_helper:get_pid(RepId),
     State = sys:get_state(RepPid),
-    ChangesReader = element(22, State),
+    ChangesReader = element(24, State),
     ?assert(is_process_alive(ChangesReader)),
 
     {ok, Listener} = rep_result_listener(RepId),

Reply via email to