This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch jobs-new-data
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d0c4f8e297eb3387895a2329a929deaee25f637e
Author: Garren Smith <[email protected]>
AuthorDate: Mon Mar 9 18:38:10 2020 +0200

    couch_jobs resubmit updates job data
    
    When a job is either pending or finished and the job is resubmitted
    with new data the job data is updated.
---
 src/couch_jobs/src/couch_jobs.erl        |  8 +++++++
 src/couch_jobs/src/couch_jobs_fdb.erl    | 25 +++++++++++++++------
 src/couch_jobs/test/couch_jobs_tests.erl | 37 ++++++++++++++++++++++++++++++++
 3 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/src/couch_jobs/src/couch_jobs.erl 
b/src/couch_jobs/src/couch_jobs.erl
index c134f5a..bd9e353 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -27,6 +27,7 @@
     finish/3,
     resubmit/2,
     resubmit/3,
+    resubmit/4,
     is_resubmitted/1,
     update/2,
     update/3,
@@ -151,6 +152,13 @@ resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) ->
     end).
 
 
+-spec resubmit(jtx(), job(), job_data(), scheduled_time()) -> {ok, job()} | 
{error, any()}.
+resubmit(Tx, #{jlock := <<_/binary>>} = Job, Data, SchedTime) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Job, Data, SchedTime)
+    end).
+
+
 -spec is_resubmitted(job()) -> true | false.
 is_resubmitted(#{job := true} = Job) ->
     maps:get(resubmit, Job, false).
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl 
b/src/couch_jobs/src/couch_jobs_fdb.erl
index 8c1ab7a..7ff38fa 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -23,6 +23,7 @@
     accept/4,
     finish/3,
     resubmit/3,
+    resubmit/4,
     update/3,
 
     set_type_timeout/3,
@@ -98,7 +99,7 @@ add(#{jtx := true} = JTx0, Type, JobId, Data, STime) ->
             Key = job_key(JTx, Job),
             case erlfdb:wait(erlfdb:get(Tx, Key)) of
                 <<_/binary>> ->
-                    {ok, Job1} = resubmit(JTx, Job, STime),
+                    {ok, Job1} = resubmit(JTx, Job, Data, STime),
                     #{seq := Seq, state := State, data := Data1} = Job1,
                     {ok, State, Seq, Data1};
                 not_found ->
@@ -205,8 +206,11 @@ finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = 
Job, Data) when
             {error, halt}
     end.
 
+resubmit(JTx0, Job, NewSTime) ->
+    resubmit(JTx0, Job, undefined, NewSTime).
 
-resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) ->
+
+resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewData, NewSTime) ->
     #{tx := Tx} = JTx = get_jtx(JTx0),
     #{type := Type, id := JobId} = Job,
     Key = job_key(JTx, Job),
@@ -218,11 +222,12 @@ resubmit(#{jtx := true} = JTx0, #{job := true} = Job, 
NewSTime) ->
             end,
             case job_state(JLock, Seq) of
                 finished ->
-                    ok = maybe_enqueue(JTx, Type, JobId, STime, true, Data),
+                    ok = maybe_enqueue(JTx, Type, JobId, STime, true, NewData),
+                    NewData1 = update_job_data(Data, NewData),
                     Job1 = Job#{
                         seq => ?PENDING_SEQ,
                         state => pending,
-                        data => Data
+                        data => NewData1
                     },
                     {ok, Job1};
                 pending when STime == OldSTime ->
@@ -237,15 +242,16 @@ resubmit(#{jtx := true} = JTx0, #{job := true} = Job, 
NewSTime) ->
                     },
                     {ok, Job1};
                 pending ->
-                    JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime},
+                    JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime, data = 
NewData},
                     set_job_val(Tx, Key, JV1),
                     couch_jobs_pending:remove(JTx, Type, JobId, OldSTime),
                     couch_jobs_pending:enqueue(JTx, Type, STime, JobId),
+                    NewData1 = update_job_data(Data, NewData),
                     Job1 = Job#{
                         stime => STime,
                         seq => ?PENDING_SEQ,
                         state => pending,
-                        data => Data
+                        data => NewData1
                     },
                     {ok, Job1};
                 running ->
@@ -705,3 +711,10 @@ get_md_version_age(Version) ->
 update_md_version_timestamp(Version) ->
     Ts = erlang:system_time(second),
     ets:insert(?MODULE, {?MD_TIMESTAMP_ETS_KEY, Version, Ts}).
+
+
+update_job_data(Data, undefined) ->
+    Data;
+
+update_job_data(_Data, NewData) ->
+    NewData.
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl 
b/src/couch_jobs/test/couch_jobs_tests.erl
index 9d8e2df..e22023e 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -47,6 +47,9 @@ couch_jobs_basic_test_() ->
                     fun accept_blocking/1,
                     fun job_processor_update/1,
                     fun resubmit_enqueues_job/1,
+                    fun resubmit_pending_updates_job_data/1,
+                    fun resubmit_finished_updates_job_data/1,
+                    fun resubmit_running_does_not_update_job_data/1,
                     fun resubmit_custom_schedtime/1,
                     fun accept_max_schedtime/1,
                     fun accept_no_schedule/1,
@@ -426,6 +429,40 @@ resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
     end).
 
 
+resubmit_pending_updates_job_data(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        Data1 = #{<<"test">> => 1},
+        Data2 = #{<<"test">> => 2},
+        ok = couch_jobs:add(?TX, T, J, Data1),
+        ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+        ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
+    end).
+
+
+resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        Data1 = #{<<"test">> => 1},
+        Data2 = #{<<"test">> => 2},
+        ok = couch_jobs:add(?TX, T, J, Data1),
+        {ok, Job1, #{}} = couch_jobs:accept(T),
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+        ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+        ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
+    end).
+
+
+resubmit_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        Data1 = #{<<"test">> => 1},
+        Data2 = #{<<"test">> => 2},
+        ok = couch_jobs:add(?TX, T, J, Data1),
+        {ok, Job1, #{}} = couch_jobs:accept(T),
+        ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+        ?assertMatch({ok, _, Data1}, couch_jobs:accept(T))
+    end).
+
+
 resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
     ?_test(begin
         ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)),

Reply via email to