Author: fdmanana Date: Sat Sep 17 03:31:12 2011 New Revision: 1171888 URL: http://svn.apache.org/viewvc?rev=1171888&view=rev Log: Improved _active_tasks API
Tasks are now free to set any properties they wish (as an Erlang proplist). Different tasks can have different properties and the status string doesn't exist anymore - instead client applications can build it using more granular properties from _active_tasks. Some of these properties are: 1) "progress" (an integer percentage, for all tasks) 2) "database" (for compactions and indexer tasks) 3) "design_document" (for indexer and view compaction tasks) 4) "source" and "target" (for replications) 5) "docs_read", "docs_written", "doc_write_failures", "missing_revs_found", "missing_revs_checked", "source_seq", "checkpointed_source_seq" and "continuous" for replications Modified: couchdb/trunk/share/www/status.html couchdb/trunk/src/couch_index/src/couch_index_api.erl couchdb/trunk/src/couch_index/src/couch_index_updater.erl couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl couchdb/trunk/src/couchdb/couch_db_updater.erl couchdb/trunk/src/couchdb/couch_replicator.erl couchdb/trunk/src/couchdb/couch_task_status.erl couchdb/trunk/test/etap/090-task-status.t Modified: couchdb/trunk/share/www/status.html URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/status.html?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/share/www/status.html (original) +++ couchdb/trunk/share/www/status.html Sat Sep 17 03:31:12 2011 @@ -76,15 +76,43 @@ specific language governing permissions .appendTo("#status tbody.content"); } else { $.each(tasks, function(idx, task) { + var status, type, object; + + switch (task.type) { + case "database_compaction": + type = "Database compaction"; + object = task.database + (task.retry ? " retry" : ""); + status = "Copied " + task.changes_done + " of " + + task.total_changes + " changes (" + task.progress + "%)"; + break; + case "view_compaction": + type = "View compaction"; + object = task.database + ", " + task.design_document; + status = "Progress " + task.progress + "%"; + break; + case "indexer": + type = "Indexer"; + object = task.database + ", " + task.design_document; + status = "Processed " + task.changes_done + " of " + + task.total_changes + " changes (" + task.progress + "%)"; + break; + case "replication": + type = "Replication"; + object = task.source + " to " + task.target; + status = "Checkpointed source sequence " + + task.checkpointed_source_seq + ", current source sequence " + + task.source_seq + ", progress " + task.progress + "%"; + } + $("<tr><th></th><td class='object'></td><td class='started'>" + "</td><td class='updated'></td><td class='pid'></td>" + "<td class='status'></td></tr>") - .find("th").text(task.type).end() - .find("td.object").text(task.task).end() + .find("th").text(type).end() + .find("td.object").text(object).end() .find("td.started").text(toTaskDate(task.started_on)).end() .find("td.updated").text(toTaskDate(task.updated_on)).end() .find("td.pid").text(task.pid).end() - .find("td.status").text(task.status).end() + .find("td.status").text(status).end() .appendTo("#status tbody.content"); }); } Modified: couchdb/trunk/src/couch_index/src/couch_index_api.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_api.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couch_index/src/couch_index_api.erl (original) +++ couchdb/trunk/src/couch_index/src/couch_index_api.erl Sat Sep 17 03:31:12 2011 @@ -29,7 +29,7 @@ reset(State) -> ok. -start_update(State) -> +start_update(State, PurgedState, NumChanges) -> {ok, State}. purge(PurgedIdRevs, State) -> Modified: couchdb/trunk/src/couch_index/src/couch_index_updater.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_updater.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couch_index/src/couch_index_updater.erl (original) +++ couchdb/trunk/src/couch_index/src/couch_index_updater.erl Sat Sep 17 03:31:12 2011 @@ -121,10 +121,6 @@ update(Idx, Mod, IdxState) -> _ -> [conflicts, deleted_conflicts] end, - TaskType = <<"Indexer">>, - Starting = <<"Starting index update.">>, - couch_task_status:add_task(TaskType, Mod:get(idx_name, IdxState), Starting), - couch_util:with_db(DbName, fun(Db) -> DbUpdateSeq = couch_db:get_update_seq(Db), DbCommittedSeq = couch_db:get_committed_update_seq(Db), @@ -134,7 +130,6 @@ update(Idx, Mod, IdxState) -> reset -> exit(reset) end, - couch_task_status:set_update_frequency(500), NumChanges = couch_db:count_changes_since(Db, CurrSeq), LoadDoc = fun(DocInfo) -> @@ -155,23 +150,22 @@ update(Idx, Mod, IdxState) -> end end, - Proc = fun(DocInfo, _, {IdxStateAcc, Count, _}) -> + Proc = fun(DocInfo, _, {IdxStateAcc, _}) -> HighSeq = DocInfo#doc_info.high_seq, case CommittedOnly and (HighSeq > DbCommittedSeq) of true -> - {stop, {IdxStateAcc, Count, false}}; + {stop, {IdxStateAcc, false}}; false -> - update_task_status(NumChanges, Count), {Doc, Seq} = LoadDoc(DocInfo), {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc), - {ok, {NewSt, Count+1, true}} + {ok, {NewSt, true}} end end, - {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState), - Acc0 = {InitIdxState, 0, true}, + {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges), + Acc0 = {InitIdxState, true}, {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []), - {ProcIdxSt, _, SendLast} = Acc, + {ProcIdxSt, SendLast} = Acc, % If we didn't bail due to hitting the last committed seq we need % to send our last update_seq through. @@ -182,9 +176,6 @@ update(Idx, Mod, IdxState) -> {ok, ProcIdxSt} end, - couch_task_status:set_update_frequency(0), - couch_task_status:update("Waiting for index writer to finish."), - {ok, FinalIdxState} = Mod:finish_update(LastIdxSt), exit({updated, FinalIdxState}) end). @@ -197,16 +188,8 @@ purge_index(Db, Mod, IdxState) -> DbPurgeSeq == IdxPurgeSeq -> {ok, IdxState}; DbPurgeSeq == IdxPurgeSeq + 1 -> - couch_task_status:update(<<"Purging index entries.">>), {ok, PurgedIdRevs} = couch_db:get_last_purged(Db), Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState); true -> - couch_task_status:update(<<"Resetting index due to purge state.">>), reset end. - - -update_task_status(Total, Count) -> - PercDone = (Count * 100) div Total, - Mesg = "Processed ~p of ~p changes (~p%)", - couch_task_status:update(Mesg, [Count, Total, PercDone]). Modified: couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl (original) +++ couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl Sat Sep 17 03:31:12 2011 @@ -17,6 +17,15 @@ -export([compact/2, swap_compacted/2]). +-record(acc, { + btree = nil, + last_id = nil, + kvs = [], + kvs_size = 0, + changes = 0, + total_changes +}). + compact(State, Opts) -> case lists:member(recompact, Opts) of @@ -46,15 +55,26 @@ compact(State) -> } = EmptyState, {ok, Count} = couch_btree:full_reduce(IdBtree), - TaskName = <<DbName/binary, ":", IdxName/binary>>, - couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), + TotalChanges = lists:foldl( + fun(View, Acc) -> + {ok, Kvs} = couch_mrview_util:get_row_count(View), + Acc + Kvs + end, + Count, Views), + couch_task_status:add_task([ + {type, view_compaction}, + {database, DbName}, + {design_document, IdxName}, + {progress, 0} + ]), BufferSize0 = couch_config:get( "view_compaction", "keyvalue_buffer_size", "2097152" ), BufferSize = list_to_integer(BufferSize0), - FoldFun = fun({DocId, _} = KV, {Bt, Acc, AccSize, Copied, LastId}) -> + FoldFun = fun({DocId, _} = KV, Acc) -> + #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc, if DocId =:= LastId -> % COUCHDB-999 regression test ?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`" @@ -63,26 +83,28 @@ compact(State) -> ), exit({view_duplicate_id, DocId}); true -> ok end, - AccSize2 = AccSize + ?term_size(KV), - case AccSize2 >= BufferSize of + KvsSize2 = KvsSize + ?term_size(KV), + case KvsSize2 >= BufferSize of true -> - {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), - couch_task_status:update("Copied ~p of ~p Ids (~p%)", - [Copied, Count, (Copied * 100) div Count]), - {ok, {Bt2, [], 0, Copied+1+length(Acc), DocId}}; + {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])), + Acc2 = update_task(Acc, 1 + length(Kvs)), + {ok, Acc2#acc{ + btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}}; _ -> - {ok, {Bt, [KV | Acc], AccSize2, Copied, DocId}} + {ok, Acc#acc{ + kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}} end end, - InitAcc = {EmptyIdBtree, [], 0, 0, nil}, + InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree}, {ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc), - {Bt3, Uncopied, _, _, _} = FinalAcc, + #acc{btree = Bt3, kvs = Uncopied} = FinalAcc, {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), + FinalAcc2 = update_task(FinalAcc, length(Uncopied)), - NewViews = lists:map(fun({View, EmptyView}) -> - compact_view(View, EmptyView, BufferSize) - end, lists:zip(Views, EmptyViews)), + {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) -> + compact_view(View, EmptyView, BufferSize, Acc) + end, FinalAcc2, lists:zip(Views, EmptyViews)), unlink(EmptyState#mrst.fd), {ok, EmptyState#mrst{ @@ -109,27 +131,31 @@ recompact(State) -> end. -%% @spec compact_view(View, EmptyView, Retry) -> CompactView -compact_view(View, EmptyView, BufferSize) -> - {ok, Count} = couch_mrview_util:get_row_count(View), - Fun = fun(KV, {Bt, Acc, AccSize, Copied}) -> - AccSize2 = AccSize + ?term_size(KV), - if AccSize2 >= BufferSize -> - {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), - couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)", - [View#mrview.id_num, Copied, Count, (Copied*100) div Count]), - {ok, {Bt2, [], 0, Copied + 1 + length(Acc)}}; +%% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc} +compact_view(View, EmptyView, BufferSize, Acc0) -> + Fun = fun(KV, #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc) -> + KvsSize2 = KvsSize + ?term_size(KV), + if KvsSize2 >= BufferSize -> + {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])), + Acc2 = update_task(Acc, 1 + length(Kvs)), + {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0}}; true -> - {ok, {Bt, [KV|Acc], AccSize2, Copied}} + {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2}} end end, - InitAcc = {EmptyView#mrview.btree, [], 0, 0}, + InitAcc = Acc0#acc{kvs = [], kvs_size = 0, btree = EmptyView#mrview.btree}, {ok, _, FinalAcc} = couch_btree:foldl(View#mrview.btree, Fun, InitAcc), - {Bt3, Uncopied, _, _} = FinalAcc, + #acc{btree = Bt3, kvs = Uncopied} = FinalAcc, {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)), + FinalAcc2 = update_task(FinalAcc, length(Uncopied)), + {EmptyView#mrview{btree=NewBt}, FinalAcc2}. + - EmptyView#mrview{btree=NewBt}. +update_task(#acc{changes = Changes, total_changes = Total} = Acc, ChangesInc) -> + Changes2 = Changes + ChangesInc, + couch_task_status:update([{progress, (Changes2 * 100) div Total}]), + Acc#acc{changes = Changes2}. swap_compacted(OldState, NewState) -> Modified: couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl (original) +++ couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl Sat Sep 17 03:31:12 2011 @@ -15,7 +15,7 @@ -export([get/2]). -export([init/2, open/2, close/1, reset/1, delete/1]). --export([start_update/2, purge/4, process_doc/3, finish_update/1, commit/1]). +-export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]). -export([compact/2, swap_compacted/2]). @@ -106,8 +106,8 @@ reset(State) -> end). -start_update(PartialDest, State) -> - couch_mrview_updater:start_update(PartialDest, State). +start_update(PartialDest, State, NumChanges) -> + couch_mrview_updater:start_update(PartialDest, State, NumChanges). purge(Db, PurgeSeq, PurgedIdRevs, State) -> Modified: couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl (original) +++ couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl Sat Sep 17 03:31:12 2011 @@ -12,13 +12,13 @@ -module(couch_mrview_updater). --export([start_update/2, purge/4, process_doc/3, finish_update/1]). +-export([start_update/3, purge/4, process_doc/3, finish_update/1]). -include("couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -start_update(Partial, State) -> +start_update(Partial, State, NumChanges) -> QueueOpts = [{max_size, 100000}, {max_items, 500}], {ok, DocQueue} = couch_work_queue:new(QueueOpts), {ok, WriteQueue} = couch_work_queue:new(QueueOpts), @@ -32,7 +32,18 @@ start_update(Partial, State) -> }, Self = self(), - MapFun = fun() -> map_docs(Self, InitState) end, + MapFun = fun() -> + couch_task_status:add_task([ + {type, indexer}, + {database, State#mrst.db_name}, + {design_document, State#mrst.idx_name}, + {progress, 0}, + {changes_done, 0}, + {total_changes, NumChanges} + ]), + couch_task_status:set_update_frequency(500), + map_docs(Self, InitState) + end, WriteFun = fun() -> write_results(Self, InitState) end, spawn_link(MapFun), @@ -136,6 +147,7 @@ map_docs(Parent, State0) -> {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]} end, FoldFun = fun(Docs, Acc) -> + update_task(length(Docs)), lists:foldl(DocFun, Acc, Docs) end, Results = lists:foldl(FoldFun, {0, []}, Dequeued), @@ -255,3 +267,16 @@ send_partial(Pid, State) when is_pid(Pid gen_server:cast(Pid, {new_state, State}); send_partial(_, _) -> ok. + + +update_task(NumChanges) -> + [Changes, Total] = couch_task_status:get([changes_done, total_changes]), + Changes2 = Changes + NumChanges, + Progress = case Total of + 0 -> + % updater restart after compaction finishes + 0; + _ -> + (Changes2 * 100) div Total + end, + couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]). Modified: couchdb/trunk/src/couchdb/couch_db_updater.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_db_updater.erl (original) +++ couchdb/trunk/src/couchdb/couch_db_updater.erl Sat Sep 17 03:31:12 2011 @@ -879,6 +879,7 @@ copy_docs(Db, #db{updater_fd = DestFd} = NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs), {ok, FullDocInfoBTree} = couch_btree:add_remove( NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), + update_compact_task(length(NewFullDocInfos)), NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. @@ -896,40 +897,46 @@ copy_compact(Db, NewDb0, Retry) -> EnumBySeqFun = fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, - {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize, TotalCopied}) -> + {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) -> AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo), if AccUncopiedSize2 >= BufferSize -> NewDb2 = copy_docs( Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), - TotalCopied2 = TotalCopied + 1 + length(AccUncopied), - couch_task_status:update("Copied ~p of ~p changes (~p%)", - [TotalCopied2, TotalChanges, (TotalCopied2 * 100) div TotalChanges]), AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, if AccCopiedSize2 >= CheckpointAfter -> - {ok, {commit_data(NewDb2#db{update_seq = Seq}), [], - 0, 0, TotalCopied2}}; + {ok, {commit_data(NewDb2#db{update_seq = Seq}), [], 0, 0}}; true -> - {ok, {NewDb2#db{update_seq = Seq}, [], - 0, AccCopiedSize2, TotalCopied2}} + {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}} end; true -> {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2, - AccCopiedSize, TotalCopied}} + AccCopiedSize}} end end, - couch_task_status:set_update_frequency(500), + TaskProps0 = [ + {type, database_compaction}, + {database, Db#db.name}, + {progress, 0}, + {changes_done, 0}, + {total_changes, TotalChanges} + ], + case Retry of + true -> + couch_task_status:update([{retry, true}]); + false -> + couch_task_status:add_task(TaskProps0), + couch_task_status:set_update_frequency(500) + end, - {ok, _, {NewDb2, Uncopied, _, _, ChangesDone}} = + {ok, _, {NewDb2, Uncopied, _, _}} = couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun, - {NewDb, [], 0, 0, 0}, + {NewDb, [], 0, 0}, [{start_key, NewDb#db.update_seq + 1}]), - couch_task_status:update("Flushing"), - NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), - TotalChanges = ChangesDone + length(Uncopied), + TotalChanges = (couch_task_status:get(changes_done) - NewDb#db.update_seq), % copy misc header values if NewDb3#db.security /= Db#db.security -> @@ -948,7 +955,6 @@ start_copy_compact(#db{name=Name,filepat ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of {ok, Fd} -> - couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>), Retry = true, case couch_file:read_header(Fd) of {ok, Header} -> @@ -957,7 +963,6 @@ start_copy_compact(#db{name=Name,filepat ok = couch_file:write_header(Fd, Header=#db_header{}) end; {error, enoent} -> - couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), Retry = false, ok = couch_file:write_header(Fd, Header=#db_header{}) @@ -984,6 +989,17 @@ start_copy_compact(#db{name=Name,filepat start_copy_compact(CurrentDb) end. +update_compact_task(NumChanges) -> + [Changes, Total] = couch_task_status:get([changes_done, total_changes]), + Changes2 = Changes + NumChanges, + Progress = case Total of + 0 -> + 0; + _ -> + (Changes2 * 100) div Total + end, + couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]). + make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) -> Body = case couch_compress:is_compressed(Body0) of true -> Modified: couchdb/trunk/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator.erl Sat Sep 17 03:31:12 2011 @@ -213,7 +213,8 @@ do_init(#rep{options = Options, id = {Ba source_name = SourceName, target_name = TargetName, start_seq = {_Ts, StartSeq}, - source_seq = SourceCurSeq + source_seq = SourceCurSeq, + committed_seq = {_, CommittedSeq} } = State = init_state(Rep), NumWorkers = get_value(worker_processes, Options), @@ -240,11 +241,21 @@ do_init(#rep{options = Options, id = {Ba end, lists:seq(1, NumWorkers)), - couch_task_status:add_task( - "Replication", - io_lib:format("`~s`: `~s` -> `~s`", - [BaseId ++ Ext, SourceName, TargetName]), - io_lib:format("Processed ~p / ~p changes", [StartSeq, SourceCurSeq])), + couch_task_status:add_task([ + {type, replication}, + {source, ?l2b(SourceName)}, + {target, ?l2b(TargetName)}, + {continuous, get_value(continuous, Options, false)}, + {revisions_checked, 0}, + {missing_revisions_found, 0}, + {docs_read, 0}, + {docs_written, 0}, + {doc_write_failures, 0}, + {source_seq, SourceCurSeq}, + {checkpointed_source_seq, CommittedSeq}, + {progress, 0} + ]), + couch_task_status:set_update_frequency(1000), % Until OTP R14B03: % @@ -337,9 +348,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_ end. -handle_call({report_seq_done, Seq, StatsInc}, _From, +handle_call({report_seq_done, Seq, StatsInc}, From, #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone, current_through_seq = ThroughSeq, stats = Stats} = State) -> + gen_server:reply(From, ok), {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of [Seq | Rest] -> {Seq, Rest}; @@ -360,8 +372,6 @@ handle_call({report_seq_done, Seq, Stats [Seq, ThroughSeq, NewThroughSeq, HighestDone, NewHighestDone, SeqsInProgress, NewSeqsInProgress]), SourceCurSeq = source_cur_seq(State), - couch_task_status:update( - "Processed ~p / ~p changes", [element(2, NewThroughSeq), SourceCurSeq]), NewState = State#rep_state{ stats = sum_stats([Stats, StatsInc]), current_through_seq = NewThroughSeq, @@ -369,7 +379,8 @@ handle_call({report_seq_done, Seq, Stats highest_seq_done = NewHighestDone, source_seq = SourceCurSeq }, - {reply, ok, NewState}. + update_task(NewState), + {noreply, NewState}. handle_cast({db_compacted, DbName}, @@ -425,7 +436,7 @@ terminate(Reason, State) -> terminate_cleanup(State) -> - couch_task_status:update("Finishing"), + update_task(State), stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier), stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier), couch_api_wrap:db_close(State#rep_state.source), @@ -684,6 +695,7 @@ do_checkpoint(State) -> source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} }, + update_task(NewState), {ok, NewState} catch throw:{checkpoint_commit_failure, _} = Failure -> Failure @@ -858,3 +870,32 @@ source_cur_seq(#rep_state{source = #http source_cur_seq(#rep_state{source = Db, source_seq = Seq}) -> {ok, Info} = couch_api_wrap:get_db_info(Db), get_value(<<"update_seq">>, Info, Seq). + + +update_task(State) -> + #rep_state{ + current_through_seq = {_, CurSeq}, + committed_seq = {_, CommittedSeq}, + source_seq = SourceCurSeq, + stats = Stats + } = State, + couch_task_status:update([ + {revisions_checked, Stats#rep_stats.missing_checked}, + {missing_revisions_found, Stats#rep_stats.missing_found}, + {docs_read, Stats#rep_stats.docs_read}, + {docs_written, Stats#rep_stats.docs_written}, + {doc_write_failures, Stats#rep_stats.doc_write_failures}, + {source_seq, SourceCurSeq}, + {checkpointed_source_seq, CommittedSeq}, + case is_number(CurSeq) andalso is_number(SourceCurSeq) of + true -> + case SourceCurSeq of + 0 -> + {progress, 0}; + _ -> + {progress, (CurSeq * 100) div SourceCurSeq} + end; + false -> + {progress, null} + end + ]). Modified: couchdb/trunk/src/couchdb/couch_task_status.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_task_status.erl?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_task_status.erl (original) +++ couchdb/trunk/src/couchdb/couch_task_status.erl Sat Sep 17 03:31:12 2011 @@ -13,29 +13,26 @@ -module(couch_task_status). -behaviour(gen_server). -% This module allows is used to track the status of long running tasks. -% Long running tasks register (add_task/3) then update their status (update/1) -% and the task and status is added to tasks list. When the tracked task dies -% it will be automatically removed the tracking. To get the tasks list, use the -% all/0 function +% This module is used to track the status of long running tasks. +% Long running tasks register themselves, via a call to add_task/1, and then +% update their status properties via update/1. The status of a task is a +% list of properties. Each property is a tuple, with the first element being +% either an atom or a binary and the second element must be an EJSON value. When +% a task updates its status, it can override some or all of its properties. +% The properties {started_on, UnitTimestamp}, {updated_on, UnixTimestamp} and +% {pid, ErlangPid} are automatically added by this module. +% When a tracked task dies, its status will be automatically removed from +% memory. To get the tasks list, call the all/0 function. -export([start_link/0, stop/0]). --export([all/0, add_task/3, update/1, update/2, set_update_frequency/1]). +-export([all/0, add_task/1, update/1, get/1, set_update_frequency/1]). -export([init/1, terminate/2, code_change/3]). -export([handle_call/3, handle_cast/2, handle_info/2]). --import(couch_util, [to_binary/1]). - -include("couch_db.hrl"). --record(task_status, { - type, - name, - start_ts, - update_ts, - status -}). +-define(set(L, K, V), lists:keystore(K, 1, L, {K, V})). start_link() -> @@ -50,36 +47,41 @@ all() -> gen_server:call(?MODULE, all). -add_task(Type, TaskName, StatusText) -> +add_task(Props) -> put(task_status_update, {{0, 0, 0}, 0}), - Ts = now_ts(), - Msg = { - add_task, - #task_status{ - type = to_binary(Type), - name = to_binary(TaskName), - status = to_binary(StatusText), - start_ts = Ts, - update_ts = Ts - } - }, - gen_server:call(?MODULE, Msg). + Ts = timestamp(), + TaskProps = lists:ukeysort( + 1, [{started_on, Ts}, {updated_on, Ts} | Props]), + put(task_status_props, TaskProps), + gen_server:call(?MODULE, {add_task, TaskProps}). set_update_frequency(Msecs) -> put(task_status_update, {{0, 0, 0}, Msecs * 1000}). -update(StatusText) -> - update("~s", [StatusText]). +update(Props) -> + MergeProps = lists:ukeysort(1, Props), + TaskProps = lists:ukeymerge(1, MergeProps, erlang:get(task_status_props)), + put(task_status_props, TaskProps), + maybe_persist(TaskProps). + + +get(Props) when is_list(Props) -> + TaskProps = erlang:get(task_status_props), + [couch_util:get_value(P, TaskProps) || P <- Props]; +get(Prop) -> + TaskProps = erlang:get(task_status_props), + couch_util:get_value(Prop, TaskProps). + -update(Format, Data) -> - {LastUpdateTime, Frequency} = get(task_status_update), +maybe_persist(TaskProps0) -> + {LastUpdateTime, Frequency} = erlang:get(task_status_update), case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of true -> put(task_status_update, {Now, Frequency}), - Msg = ?l2b(io_lib:format(Format, Data)), - gen_server:cast(?MODULE, {update_status, self(), Msg}); + TaskProps = ?set(TaskProps0, updated_on, timestamp(Now)), + gen_server:cast(?MODULE, {update_status, self(), TaskProps}); false -> ok end. @@ -95,10 +97,10 @@ terminate(_Reason,_State) -> ok. -handle_call({add_task, TaskStatus}, {From, _}, Server) -> +handle_call({add_task, TaskProps}, {From, _}, Server) -> case ets:lookup(?MODULE, From) of [] -> - true = ets:insert(?MODULE, {From, TaskStatus}), + true = ets:insert(?MODULE, {From, TaskProps}), erlang:monitor(process, From), {reply, ok, Server}; [_] -> @@ -106,25 +108,23 @@ handle_call({add_task, TaskStatus}, {Fro end; handle_call(all, _, Server) -> All = [ - [ - {type, Task#task_status.type}, - {task, Task#task_status.name}, - {started_on, Task#task_status.start_ts}, - {updated_on, Task#task_status.update_ts}, - {status, Task#task_status.status}, - {pid, ?l2b(pid_to_list(Pid))} - ] + [{pid, ?l2b(pid_to_list(Pid))} | TaskProps] || - {Pid, Task} <- ets:tab2list(?MODULE) + {Pid, TaskProps} <- ets:tab2list(?MODULE) ], {reply, All, Server}. -handle_cast({update_status, Pid, StatusText}, Server) -> - [{Pid, #task_status{name = TaskName} = Task}] = ets:lookup(?MODULE, Pid), - ?LOG_DEBUG("New task status for ~s: ~s",[TaskName, StatusText]), - NewTaskStatus = Task#task_status{status = StatusText, update_ts = now_ts()}, - true = ets:insert(?MODULE, {Pid, NewTaskStatus}), +handle_cast({update_status, Pid, NewProps}, Server) -> + case ets:lookup(?MODULE, Pid) of + [{Pid, _CurProps}] -> + ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]), + true = ets:insert(?MODULE, {Pid, NewProps}); + _ -> + % Task finished/died in the meanwhile and we must have received + % a monitor message before this call - ignore. + ok + end, {noreply, Server}; handle_cast(stop, State) -> {stop, normal, State}. @@ -139,6 +139,8 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -now_ts() -> - {Mega, Secs, _} = erlang:now(), +timestamp() -> + timestamp(now()). + +timestamp({Mega, Secs, _}) -> Mega * 1000000 + Secs. Modified: couchdb/trunk/test/etap/090-task-status.t URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/090-task-status.t?rev=1171888&r1=1171887&r2=1171888&view=diff ============================================================================== --- couchdb/trunk/test/etap/090-task-status.t (original) +++ couchdb/trunk/test/etap/090-task-status.t Sat Sep 17 03:31:12 2011 @@ -15,7 +15,7 @@ main(_) -> test_util:init_code_path(), - etap:plan(16), + etap:plan(28), case (catch test()) of ok -> etap:end_tests(); @@ -25,7 +25,7 @@ main(_) -> end, ok. -check_status(Pid,ListPropLists) -> +get_task_prop(Pid, Prop) -> From = list_to_binary(pid_to_list(Pid)), Element = lists:foldl( fun(PropList,Acc) -> @@ -36,18 +36,28 @@ check_status(Pid,ListPropLists) -> [] end end, - [], ListPropLists + [], couch_task_status:all() ), - couch_util:get_value(status,hd(Element)). + case couch_util:get_value(Prop, hd(Element), nil) of + nil -> + etap:bail("Could not get property '" ++ couch_util:to_list(Prop) ++ + "' for task " ++ pid_to_list(Pid)); + Value -> + Value + end. + +now_ts() -> + {Mega, Secs, _} = erlang:now(), + Mega * 1000000 + Secs. loop() -> receive - {add, From} -> - Resp = couch_task_status:add_task("type", "task", "init"), + {add, Props, From} -> + Resp = couch_task_status:add_task(Props), From ! {ok, self(), Resp}, loop(); - {update, Status, From} -> - Resp = couch_task_status:update(Status), + {update, Props, From} -> + Resp = couch_task_status:update(Props), From ! {ok, self(), Resp}, loop(); {update_frequency, Msecs, From} -> @@ -82,96 +92,159 @@ test() -> Pid2 = spawn(TaskUpdater), Pid3 = spawn(TaskUpdater), - ok = call(Pid1, add), + ok = call(Pid1, add, [{type, replication}, {progress, 0}]), etap:is( length(couch_task_status:all()), 1, "Started a task" ), + Task1StartTime = get_task_prop(Pid1, started_on), + etap:is( + is_integer(Task1StartTime), + true, + "Task start time is defined." + ), + etap:is( + get_task_prop(Pid1, updated_on), + Task1StartTime, + "Task's start time is the same as the update time before an update." + ), etap:is( - call(Pid1, add), + call(Pid1, add, [{type, compaction}, {progress, 0}]), {add_task_error, already_registered}, "Unable to register multiple tasks for a single Pid." ), etap:is( - check_status(Pid1, couch_task_status:all()), - <<"init">>, - "Task status was set to 'init'." + get_task_prop(Pid1, type), + replication, + "Task type is 'replication'." ), - - call(Pid1,update,"running"), etap:is( - check_status(Pid1,couch_task_status:all()), - <<"running">>, - "Status updated to 'running'." + get_task_prop(Pid1, progress), + 0, + "Task progress is 0." ), + ok = timer:sleep(1000), + call(Pid1, update, [{progress, 25}]), + etap:is( + get_task_prop(Pid1, progress), + 25, + "Task progress is 25." + ), + etap:is( + get_task_prop(Pid1, updated_on) > Task1StartTime, + true, + "Task's last update time has increased after an update." + ), - call(Pid2,add), + call(Pid2, add, [{type, compaction}, {progress, 0}]), etap:is( length(couch_task_status:all()), 2, "Started a second task." ), - + Task2StartTime = get_task_prop(Pid2, started_on), + etap:is( + is_integer(Task2StartTime), + true, + "Second task's start time is defined." + ), etap:is( - check_status(Pid2, couch_task_status:all()), - <<"init">>, - "Second tasks's status was set to 'init'." + get_task_prop(Pid2, updated_on), + Task2StartTime, + "Second task's start time is the same as the update time before an update." ), - call(Pid2, update, "running"), etap:is( - check_status(Pid2, couch_task_status:all()), - <<"running">>, - "Second task's status updated to 'running'." + get_task_prop(Pid2, type), + compaction, + "Second task's type is 'compaction'." + ), + etap:is( + get_task_prop(Pid2, progress), + 0, + "Second task's progress is 0." ), + ok = timer:sleep(1000), + call(Pid2, update, [{progress, 33}]), + etap:is( + get_task_prop(Pid2, progress), + 33, + "Second task's progress updated to 33." + ), + etap:is( + get_task_prop(Pid2, updated_on) > Task2StartTime, + true, + "Second task's last update time has increased after an update." + ), - call(Pid3, add), + call(Pid3, add, [{type, indexer}, {progress, 0}]), etap:is( length(couch_task_status:all()), 3, "Registered a third task." ), - + Task3StartTime = get_task_prop(Pid3, started_on), + etap:is( + is_integer(Task3StartTime), + true, + "Third task's start time is defined." + ), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"init">>, - "Third tasks's status was set to 'init'." + get_task_prop(Pid3, updated_on), + Task3StartTime, + "Third task's start time is the same as the update time before an update." ), - call(Pid3, update, "running"), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"running">>, - "Third task's status updated to 'running'." + get_task_prop(Pid3, type), + indexer, + "Third task's type is 'indexer'." + ), + etap:is( + get_task_prop(Pid3, progress), + 0, + "Third task's progress is 0." ), + ok = timer:sleep(1000), + call(Pid3, update, [{progress, 50}]), + etap:is( + get_task_prop(Pid3, progress), + 50, + "Third task's progress updated to 50." + ), + etap:is( + get_task_prop(Pid3, updated_on) > Task3StartTime, + true, + "Third task's last update time has increased after an update." + ), call(Pid3, update_frequency, 500), - call(Pid3, update, "still running"), + call(Pid3, update, [{progress, 66}]), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"still running">>, - "Third task's status updated to 'still running'." + get_task_prop(Pid3, progress), + 66, + "Third task's progress updated to 66." ), - call(Pid3, update, "skip this update"), + call(Pid3, update, [{progress, 67}]), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"still running">>, - "Status update dropped because of frequency limit." + get_task_prop(Pid3, progress), + 66, + "Task update dropped because of frequency limit." ), call(Pid3, update_frequency, 0), - call(Pid3, update, "don't skip"), + call(Pid3, update, [{progress, 77}]), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"don't skip">>, - "Status updated after reseting frequency limit." + get_task_prop(Pid3, progress), + 77, + "Task updated after reseting frequency limit." ),