This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3226-downgrade-clustered-purge
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 568209f54a9835af87ea3085ad181756eae53ccb
Author: jiangph <[email protected]>
AuthorDate: Thu Sep 20 21:01:26 2018 +0800

    Add downgrade function to downgrade database with clustered purge
    
    COUCHDB-3226
---
 src/couch/src/couch_bt_engine.erl        | 97 ++++++++++++++++++++++++++++++--
 src/couch/src/couch_bt_engine_header.erl |  5 ++
 2 files changed, 98 insertions(+), 4 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl 
b/src/couch/src/couch_bt_engine.erl
index ee0d6d8..40736b1 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -85,7 +85,11 @@
     seq_tree_reduce/2,
 
     local_tree_split/1,
-    local_tree_join/2
+    local_tree_join/2,
+
+    purge_tree_reduce/2,
+    purge_seq_tree_split/1,
+    purge_seq_tree_join/2
 ]).
 
 
@@ -101,6 +105,9 @@
 -include("couch_bt_engine.hrl").
 
 
+-define(CLUSTERED_PURGE_DISK_VERSION, 7).
+
+
 exists(FilePath) ->
     case filelib:is_file(FilePath) of
         true ->
@@ -627,6 +634,21 @@ local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) 
->
     }.
 
 
+purge_seq_tree_split({PurgeSeq, UUID, DocId, Revs}) ->
+    {PurgeSeq, {UUID, DocId, Revs}}.
+
+
+purge_seq_tree_join(PurgeSeq, {UUID, DocId, Revs}) ->
+    {PurgeSeq, UUID, DocId, Revs}.
+
+
+purge_tree_reduce(reduce, IdRevs) ->
+    % count the number of purge requests
+    length(IdRevs);
+purge_tree_reduce(rereduce, Reds) ->
+    lists:sum(Reds).
+
+
 set_update_seq(#st{header = Header} = St, UpdateSeq) ->
     {ok, St#st{
         header = couch_bt_engine_header:set(Header, [
@@ -681,8 +703,17 @@ init_state(FilePath, Fd, Header0, Options) ->
 
     Compression = couch_compress:get_compression_method(),
 
-    Header1 = couch_bt_engine_header:upgrade(Header0),
-    Header = set_default_security_object(Fd, Header1, Compression, Options),
+    DiskVersion = couch_bt_engine_header:disk_version(Header0),
+    Latest = couch_bt_engine_header:latset_disk_version(),
+    Header1 = case DiskVersion of
+        N when N =< Latest ->
+            Header0;
+        ?CLUSTERED_PURGE_DISK_VERSION ->
+            downgrade_purge_info(Fd, Header0)
+    end,
+
+    Header2 = couch_bt_engine_header:upgrade(Header1),
+    Header = set_default_security_object(Fd, Header2, Compression, Options),
 
     IdTreeState = couch_bt_engine_header:id_tree_state(Header),
     {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [
@@ -727,7 +758,7 @@ init_state(FilePath, Fd, Header0, Options) ->
     % to be written to disk.
     case Header /= Header0 of
         true ->
-            {ok, NewSt} = commit_data(St),
+            {ok, NewSt} = commit_data(St#st{needs_commit = true}),
             NewSt;
         false ->
             St
@@ -763,6 +794,64 @@ set_default_security_object(Fd, Header, Compression, 
Options) ->
     end.
 
 
+% Rollback for clustered purge. It replaces PurgeTreeState and 
PurgeSeqTreeState
+% with purged_docs disk pointer that stores only the last purge request
+downgrade_purge_info(Fd, Header) ->
+    {
+        db_header,
+        _DiskVer,
+        UpSeq,
+        _Unused,
+        IdTreeState,
+        SeqTreeState,
+        LocalTreeState,
+        PurgeTreeState,
+        PurgeSeqTreeState,
+        SecurityPtr,
+        RevsLimit,
+        Uuid,
+        Epochs,
+        CompactedSeq,
+        _PDocsLimit
+    } = Header,
+
+    {PSeq, PurgedDocsPtr} = case PurgeTreeState of
+        nil ->
+            {0, nil};
+        PurgeSeqInOldVer when is_integer(PurgeSeqInOldVer)->
+            {PurgeSeqInOldVer, nil};
+        _ when is_tuple(PurgeTreeState) ->
+            {ok, PSTree} = couch_btree:open(PurgeSeqTreeState, Fd, [
+                {split, fun ?MODULE:purge_seq_tree_split/1},
+                {join, fun ?MODULE:purge_seq_tree_join/2},
+                {reduce, fun ?MODULE:purge_tree_reduce/2}
+            ]),
+            Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) ->
+                {stop, PurgeSeq}
+            end,
+            {ok, _, PurgeSeq} = couch_btree:fold(PSTree, Fun, 0, [{dir, rev}]),
+            Compression = couch_compress:get_compression_method(),
+            AppendOps = [{compression, Compression}],
+            {ok, Ptr, _} = couch_file:append_term(Fd, [], AppendOps),
+            if PurgeSeq > 0 -> {PurgeSeq + 2, Ptr}; true -> {0, Ptr} end
+    end,
+
+    NewHeader = couch_bt_engine_header:new(),
+    couch_bt_engine_header:set(NewHeader, [
+        {update_seq, UpSeq},
+        {id_tree_state, IdTreeState},
+        {seq_tree_state, SeqTreeState},
+        {local_tree_state, LocalTreeState},
+        {purge_seq, PSeq},
+        {purged_docs, PurgedDocsPtr},
+        {security_ptr, SecurityPtr},
+        {revs_limit, RevsLimit},
+        {uuid, Uuid},
+        {epochs, Epochs},
+        {compacted_seq, CompactedSeq}
+    ]).
+
+
 delete_compaction_files(FilePath) ->
     RootDir = config:get("couchdb", "database_dir", "."),
     DelOpts = [{context, compaction}],
diff --git a/src/couch/src/couch_bt_engine_header.erl 
b/src/couch/src/couch_bt_engine_header.erl
index 3d24f31..2eaa6ff 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -26,6 +26,7 @@
 
 -export([
     disk_version/1,
+    latest_disk_version/0,
     update_seq/1,
     id_tree_state/1,
     seq_tree_state/1,
@@ -134,6 +135,10 @@ disk_version(Header) ->
     get_field(Header, disk_version).
 
 
+latest_disk_version() ->
+    ?LATEST_DISK_VERSION.
+
+
 update_seq(Header) ->
     get_field(Header, update_seq).
 

Reply via email to