This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 4e8ab6420f0f8db818fab809fdfecebb1af78259 Author: Paul J. Davis <paul.joseph.da...@gmail.com> AuthorDate: Mon Mar 26 10:25:34 2018 -0500 WIP - couch_bt_engine stuff --- src/couch/src/couch_bt_engine.erl | 29 +++++++++++++++-------------- src/couch/src/couch_bt_engine_compactor.erl | 17 ++++++++--------- src/couch/src/couch_bt_engine_header.erl | 2 +- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 485d21d..899a915 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -47,7 +47,6 @@ set_revs_limit/2, set_purge_infos_limit/2, set_security/2, - set_purged_docs_limit/2, open_docs/2, open_local_docs/2, @@ -348,7 +347,7 @@ read_doc_body(#st{} = St, #doc{} = Doc) -> load_purge_infos(St, UUIDs) -> - Results = couch_btree:lookup(St#st.upurge_tree, UUIDs), + Results = couch_btree:lookup(St#st.purge_tree, UUIDs), lists:map(fun ({ok, Info}) -> Info; (not_found) -> not_found @@ -444,10 +443,10 @@ purge_docs(#st{} = St, Pairs, PurgeInfos) -> id_tree = IdTree, seq_tree = SeqTree, purge_tree = PurgeTree, - upurge_tree = UPurgeTree + purge_seq_tree = PurgeSeqTree } = St, - RemDocIds = [Old#full_doc_info.doc_id || {Old, not_found} <- Pairs], + RemDocIds = [Old#full_doc_info.id || {Old, not_found} <- Pairs], RemSeqs = [Old#full_doc_info.update_seq || {Old, _} <- Pairs], DocsToAdd = [New || {_, New} <- Pairs, New /= not_found], CurrSeq = couch_bt_engine_header:get(St#st.header, update_seq), @@ -458,8 +457,8 @@ purge_docs(#st{} = St, Pairs, PurgeInfos) -> % indexers see that they need to process the new purge % information. UpdateSeq = case NewSeq == CurrSeq of - true -> InitUpdateSeq + 1; - false -> NewUpdateSeq + true -> CurrSeq + 1; + false -> NewSeq end, Header = couch_bt_engine_header:set(St#st.header, [ {update_seq, UpdateSeq} @@ -470,7 +469,7 @@ purge_docs(#st{} = St, Pairs, PurgeInfos) -> {ok, PurgeTree2} = couch_btree:add(PurgeTree, PurgeInfos), {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, PurgeInfos), {ok, St#st{ - header = Header2, + header = Header, id_tree = IdTree2, seq_tree = SeqTree2, purge_tree = PurgeTree2, @@ -544,7 +543,7 @@ fold_changes(St, SinceSeq, UserFun, UserAcc, Options) -> fold_purge_infos(St, StartSeq0, UserFun, UserAcc, Options) -> PurgeSeqTree = St#st.purge_seq_tree, StartSeq = StartSeq0 + 1, - MinSeq = load_oldest_purge_seq(PurgeSeqTree), + MinSeq = get_oldest_purge_seq(St), if MinSeq =< StartSeq -> ok; true -> throw({invalid_start_purge_seq, StartSeq0}) end, @@ -553,7 +552,7 @@ fold_purge_infos(St, StartSeq0, UserFun, UserAcc, Options) -> end, Opts = [{start_key, StartSeq}] ++ Options, {ok, _, OutAcc} = couch_btree:fold(PurgeSeqTree, Wrapper, UserAcc, Opts), - {ok, OutAcc}; + {ok, OutAcc}. count_changes_since(St, SinceSeq) -> @@ -714,7 +713,7 @@ purge_tree_split({PurgeSeq, UUID, DocId, Revs}) -> {UUID, {PurgeSeq, DocId, Revs}}. -purge_tree_join({UUID, {PurgeSeq, DocId, Revs}}) -> +purge_tree_join(UUID, {PurgeSeq, DocId, Revs}) -> {PurgeSeq, UUID, DocId, Revs}. @@ -722,7 +721,7 @@ purge_seq_tree_split({PurgeSeq, UUID, DocId, Revs}) -> {PurgeSeq, {UUID, DocId, Revs}}. -purge_seq_tree_join({PurgeSeq, {UUID, DocId, Revs}}) -> +purge_seq_tree_join(PurgeSeq, {UUID, DocId, Revs}) -> {PurgeSeq, UUID, DocId, Revs}. @@ -897,6 +896,8 @@ upgrade_purge_info(Fd, Header) -> Ptr when is_tuple(Ptr) -> Header; PurgeSeq when is_integer(PurgeSeq)-> + % Pointer to old purged ids/revs is in purge_seq_tree_state + Ptr = couch_bt_engine_header:get(Header, purge_seq_tree_state), {ok, PurgedIdsRevs} = couch_file:pread_term(Fd, Ptr), {Infos, NewSeq} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) -> @@ -921,8 +922,8 @@ upgrade_purge_info(Fd, Header) -> {ok, PurgeSeqTreeSt} = couch_btree:get_state(PurgeSeqTree2), couch_bt_engine_header:set(Header, [ - {purge_tree_state, PTreeState}, - {purge_seq_tree_state, UPTreeState} + {purge_tree_state, PurgeTreeSt}, + {purge_seq_tree_state, PurgeSeqTreeSt} ]) end. @@ -1098,7 +1099,7 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> header = couch_bt_engine_header:set(Header, [ {compacted_seq, get_update_seq(OldSt)}, {revs_limit, get_revs_limit(OldSt)}, - {purge_infos_limit, get_purged_docs_limit(OldSt)} + {purge_infos_limit, get_purge_infos_limit(OldSt)} ]), local_tree = NewLocal2 }), diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index f6e79b4..884f0fa 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -107,7 +107,6 @@ copy_purge_info(DbName, OldSt, NewSt, Retry) -> MinPurgeSeq = couch_util:with_db(DbName, fun(Db) -> couch_db:get_minimum_purge_seq(Db) end), - OldIdTree = OldSt#st.id_tree, OldPSTree = OldSt#st.purge_seq_tree, StartSeq = couch_bt_engine:get_purge_seq(NewSt) + 1, BufferSize = config:get_integer( @@ -115,7 +114,7 @@ copy_purge_info(DbName, OldSt, NewSt, Retry) -> CheckpointAfter = config:get( "database_compaction", "checkpoint_after", BufferSize * 10), - EnumFun = fun(Info, _Reds, {StAcc, InfosAcc0, InfosSize, CopiedSize}) -> + EnumFun = fun(Info, _Reds, {StAcc0, InfosAcc, InfosSize, CopiedSize}) -> NewInfosSize = InfosSize + ?term_size(Info), if NewInfosSize >= BufferSize -> StAcc1 = copy_purge_infos( @@ -125,11 +124,11 @@ copy_purge_info(DbName, OldSt, NewSt, Retry) -> StAcc2 = commit_compaction_data(StAcc1), {ok, {StAcc2, [], 0, 0}}; true -> - {ok, {StAcc2, [], 0, NewCopiedSize}} + {ok, {StAcc1, [], 0, NewCopiedSize}} end; true -> - NewInfosAcc = [Info | InfosAcc] - {ok, {StAcc, NewInfosAcc, NewInfosSize, CopiedSize}} + NewInfosAcc = [Info | InfosAcc], + {ok, {StAcc0, NewInfosAcc, NewInfosSize, CopiedSize}} end end, @@ -140,7 +139,7 @@ copy_purge_info(DbName, OldSt, NewSt, Retry) -> copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry). -copy_purge_docs(OldSt, NewSt, Infos, MinPurgeSeq, Retry) -> +copy_purge_infos(OldSt, NewSt, Infos, MinPurgeSeq, Retry) -> #st{ id_tree = OldIdTree } = OldSt, @@ -170,13 +169,13 @@ copy_purge_docs(OldSt, NewSt, Infos, MinPurgeSeq, Retry) -> AllDocIds = [DocId || {_PurgeSeq, _UUID, DocId, _Revs} <- Infos], UniqDocIds = lists:usort(AllDocIds), {ok, OldIdResults} = couch_btree:lookup(OldIdTree, UniqDocIds), - OldZipped = lists:zip(UniqDocIds, Results), + OldZipped = lists:zip(UniqDocIds, OldIdResults), % The list of non-existant docs in the database being compacted - MaybeRemDocIds = [DocId || {DocId, not_found} <- Zipped], + MaybeRemDocIds = [DocId || {DocId, not_found} <- OldZipped], % Removing anything that exists in the partially compacted database - {ok, NewIdResults} = couch_btree:lookup(NewIdTree, MaybeRemDocIds), + {ok, NewIdResults} = couch_btree:lookup(NewIdTree0, MaybeRemDocIds), ToRemove = [Doc || Doc <- NewIdResults, Doc /= not_found], {RemIds, RemSeqs} = lists:unzip(lists:map(fun(FDI) -> diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index 55246ac..467bb2f 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -33,7 +33,7 @@ local_tree_state/1, purge_tree_state/1, purge_seq_tree_state/1, - purged_docs_limit/1, + purge_infos_limit/1, security_ptr/1, revs_limit/1, uuid/1, -- To stop receiving notification emails like this one, please contact dav...@apache.org.