This is an automated email from the ASF dual-hosted git repository. jiangphcn pushed a commit to branch COUCHDB-3226-downgrade-clustered-purge in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 568209f54a9835af87ea3085ad181756eae53ccb Author: jiangph <[email protected]> AuthorDate: Thu Sep 20 21:01:26 2018 +0800 Add downgrade function to downgrade database with clustered purge COUCHDB-3226 --- src/couch/src/couch_bt_engine.erl | 97 ++++++++++++++++++++++++++++++-- src/couch/src/couch_bt_engine_header.erl | 5 ++ 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index ee0d6d8..40736b1 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -85,7 +85,11 @@ seq_tree_reduce/2, local_tree_split/1, - local_tree_join/2 + local_tree_join/2, + + purge_tree_reduce/2, + purge_seq_tree_split/1, + purge_seq_tree_join/2 ]). @@ -101,6 +105,9 @@ -include("couch_bt_engine.hrl"). +-define(CLUSTERED_PURGE_DISK_VERSION, 7). + + exists(FilePath) -> case filelib:is_file(FilePath) of true -> @@ -627,6 +634,21 @@ local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) -> }. +purge_seq_tree_split({PurgeSeq, UUID, DocId, Revs}) -> + {PurgeSeq, {UUID, DocId, Revs}}. + + +purge_seq_tree_join(PurgeSeq, {UUID, DocId, Revs}) -> + {PurgeSeq, UUID, DocId, Revs}. + + +purge_tree_reduce(reduce, IdRevs) -> + % count the number of purge requests + length(IdRevs); +purge_tree_reduce(rereduce, Reds) -> + lists:sum(Reds). + + set_update_seq(#st{header = Header} = St, UpdateSeq) -> {ok, St#st{ header = couch_bt_engine_header:set(Header, [ @@ -681,8 +703,17 @@ init_state(FilePath, Fd, Header0, Options) -> Compression = couch_compress:get_compression_method(), - Header1 = couch_bt_engine_header:upgrade(Header0), - Header = set_default_security_object(Fd, Header1, Compression, Options), + DiskVersion = couch_bt_engine_header:disk_version(Header0), + Latest = couch_bt_engine_header:latset_disk_version(), + Header1 = case DiskVersion of + N when N =< Latest -> + Header0; + ?CLUSTERED_PURGE_DISK_VERSION -> + downgrade_purge_info(Fd, Header0) + end, + + Header2 = couch_bt_engine_header:upgrade(Header1), + Header = set_default_security_object(Fd, Header2, Compression, Options), IdTreeState = couch_bt_engine_header:id_tree_state(Header), {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [ @@ -727,7 +758,7 @@ init_state(FilePath, Fd, Header0, Options) -> % to be written to disk. case Header /= Header0 of true -> - {ok, NewSt} = commit_data(St), + {ok, NewSt} = commit_data(St#st{needs_commit = true}), NewSt; false -> St @@ -763,6 +794,64 @@ set_default_security_object(Fd, Header, Compression, Options) -> end. +% Rollback for clustered purge. It replaces PurgeTreeState and PurgeSeqTreeState +% with purged_docs disk pointer that stores only the last purge request +downgrade_purge_info(Fd, Header) -> + { + db_header, + _DiskVer, + UpSeq, + _Unused, + IdTreeState, + SeqTreeState, + LocalTreeState, + PurgeTreeState, + PurgeSeqTreeState, + SecurityPtr, + RevsLimit, + Uuid, + Epochs, + CompactedSeq, + _PDocsLimit + } = Header, + + {PSeq, PurgedDocsPtr} = case PurgeTreeState of + nil -> + {0, nil}; + PurgeSeqInOldVer when is_integer(PurgeSeqInOldVer)-> + {PurgeSeqInOldVer, nil}; + _ when is_tuple(PurgeTreeState) -> + {ok, PSTree} = couch_btree:open(PurgeSeqTreeState, Fd, [ + {split, fun ?MODULE:purge_seq_tree_split/1}, + {join, fun ?MODULE:purge_seq_tree_join/2}, + {reduce, fun ?MODULE:purge_tree_reduce/2} + ]), + Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) -> + {stop, PurgeSeq} + end, + {ok, _, PurgeSeq} = couch_btree:fold(PSTree, Fun, 0, [{dir, rev}]), + Compression = couch_compress:get_compression_method(), + AppendOps = [{compression, Compression}], + {ok, Ptr, _} = couch_file:append_term(Fd, [], AppendOps), + if PurgeSeq > 0 -> {PurgeSeq + 2, Ptr}; true -> {0, Ptr} end + end, + + NewHeader = couch_bt_engine_header:new(), + couch_bt_engine_header:set(NewHeader, [ + {update_seq, UpSeq}, + {id_tree_state, IdTreeState}, + {seq_tree_state, SeqTreeState}, + {local_tree_state, LocalTreeState}, + {purge_seq, PSeq}, + {purged_docs, PurgedDocsPtr}, + {security_ptr, SecurityPtr}, + {revs_limit, RevsLimit}, + {uuid, Uuid}, + {epochs, Epochs}, + {compacted_seq, CompactedSeq} + ]). + + delete_compaction_files(FilePath) -> RootDir = config:get("couchdb", "database_dir", "."), DelOpts = [{context, compaction}], diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index 3d24f31..2eaa6ff 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -26,6 +26,7 @@ -export([ disk_version/1, + latest_disk_version/0, update_seq/1, id_tree_state/1, seq_tree_state/1, @@ -134,6 +135,10 @@ disk_version(Header) -> get_field(Header, disk_version). +latest_disk_version() -> + ?LATEST_DISK_VERSION. + + update_seq(Header) -> get_field(Header, update_seq).
