This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/fdb-layer in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit cea9274c5801373f7be74f15df13848b66307ba7 Author: Paul J. Davis <[email protected]> AuthorDate: Fri Dec 6 14:40:55 2019 -0600 Delete attachments when no longer referenced This fixes attachment handling to properly remove attachment data when it is no longer referenced by the document's revision tree. As implemented this accounts for the possibility that multiple revisions may reference a given attachment id. However, due to how the current revision tree removes revisions aggressively it's not currently possible for multiple leaf revisions to share an underlying attachment. This is because when attempting to use a stub attachment when replicating in a conflict we will encounter the `missing_stub` error because the previous shared revision has already been removed. --- src/fabric/include/fabric2.hrl | 5 +- src/fabric/src/fabric2_db.erl | 16 +- src/fabric/src/fabric2_fdb.erl | 126 +++++++++++-- src/fabric/src/fabric2_util.erl | 16 ++ src/fabric/test/fabric2_doc_att_tests.erl | 285 ++++++++++++++++++++++++++++++ 5 files changed, 428 insertions(+), 20 deletions(-) diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 189995d..b4dd084 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -37,12 +37,15 @@ -define(DB_ATTS, 23). -define(DB_VIEWS, 24). -define(DB_LOCAL_DOC_BODIES, 25). +-define(DB_ATT_NAMES, 26). % Versions --define(CURR_REV_FORMAT, 0). +% 0 - Initial implementation +% 1 - Added attachment hash +-define(CURR_REV_FORMAT, 1). % Misc constants diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 88840e7..6d015df 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -1341,7 +1341,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> #doc{ deleted = NewDeleted, - revs = {NewRevPos, [NewRev | NewRevPath]} + revs = {NewRevPos, [NewRev | NewRevPath]}, + atts = Atts } = Doc4 = stem_revisions(Db, Doc3), NewRevInfo = #{ @@ -1350,7 +1351,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> rev_id => {NewRevPos, NewRev}, rev_path => NewRevPath, sequence => undefined, - branch_count => undefined + branch_count => undefined, + att_hash => fabric2_util:hash_atts(Atts) }, % Gather the list of possible winnig revisions @@ -1405,7 +1407,8 @@ update_doc_replicated(Db, Doc0, _Options) -> rev_id => {RevPos, Rev}, rev_path => RevPath, sequence => undefined, - branch_count => undefined + branch_count => undefined, + att_hash => <<>> }, AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId), @@ -1444,6 +1447,9 @@ update_doc_replicated(Db, Doc0, _Options) -> PrevRevInfo = find_prev_revinfo(RevPos, LeafPath), Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo), Doc3 = flush_doc_atts(Db, Doc2), + DocRevInfo2 = DocRevInfo1#{ + atts_hash => fabric2_util:hash_atts(Doc3#doc.atts) + }, % Possible winners are the previous winner and % the new DocRevInfo @@ -1453,9 +1459,9 @@ update_doc_replicated(Db, Doc0, _Options) -> end, {NewWinner0, NonWinner} = case Winner == PrevRevInfo of true -> - {DocRevInfo1, not_found}; + {DocRevInfo2, not_found}; false -> - [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo1]), + [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo2]), {W, NW} end, diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index fb2891b..404460e 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -580,9 +580,40 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> #doc{ id = DocId, - deleted = Deleted + deleted = Deleted, + atts = Atts } = Doc, + % Doc body + + ok = write_doc_body(Db, Doc), + + % Attachment bookkeeping + + % If a document's attachments have changed we have to scan + % for any attachments that may need to be deleted. The check + % for `>= 2` is a bit subtle. The important point is that + % one of the revisions will be from the new document so we + % have to find at least one more beyond that to assert that + % the attachments have not changed. + AttHash = fabric2_util:hash_atts(Atts), + RevsToCheck = [NewWinner0] ++ ToUpdate ++ ToRemove, + AttHashCount = lists:foldl(fun(Att, Count) -> + #{att_hash := RevAttHash} = Att, + case RevAttHash == AttHash of + true -> Count + 1; + false -> Count + end + end, 0, RevsToCheck), + if + AttHashCount == length(RevsToCheck) -> + ok; + AttHashCount >= 2 -> + ok; + true -> + cleanup_attachments(Db, DocId, Doc, ToRemove) + end, + % Revision tree NewWinner = NewWinner0#{winner := true}, @@ -649,8 +680,6 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> % And all the rest... - ok = write_doc_body(Db, Doc), - IsDDoc = case Doc#doc.id of <<?DESIGN_DOC_PREFIX, _/binary>> -> true; _ -> false @@ -755,6 +784,9 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) -> AttId = fabric2_util:uuid(), Chunks = chunkify_binary(Data), + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + ok = erlfdb:set(Tx, IdKey, <<>>), + lists:foldl(fun(Chunk, ChunkId) -> AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix), ok = erlfdb:set(Tx, AttKey, Chunk), @@ -1014,16 +1046,71 @@ clear_doc_body(#{} = Db, DocId, #{} = RevInfo) -> ok = erlfdb:clear_range(Tx, StartKey, EndKey). +cleanup_attachments(Db, DocId, NewDoc, ToRemove) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + RemoveRevs = lists:map(fun(#{rev_id := RevId}) -> RevId end, ToRemove), + + % Gather all known document revisions + {ok, DiskDocs} = fabric2_db:open_doc_revs(Db, DocId, all, []), + AllDocs = [{ok, NewDoc} | DiskDocs], + + % Get referenced attachment ids + ActiveIdSet = lists:foldl(fun({ok, Doc}, Acc) -> + #doc{ + revs = {Pos, [Rev | _]} + } = Doc, + case lists:member({Pos, Rev}, RemoveRevs) of + true -> + Acc; + false -> + lists:foldl(fun(Att, InnerAcc) -> + {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att), + sets:add_element(AttId, InnerAcc) + end, Acc, Doc#doc.atts) + end + end, sets:new(), AllDocs), + + AttPrefix = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId}, DbPrefix), + Options = [{streaming_mode, want_all}], + Future = erlfdb:get_range_startswith(Tx, AttPrefix, Options), + + ExistingIdSet = lists:foldl(fun({K, _}, Acc) -> + {?DB_ATT_NAMES, DocId, AttId} = erlfdb_tuple:unpack(K, DbPrefix), + sets:add_element(AttId, Acc) + end, sets:new(), erlfdb:wait(Future)), + + AttsToRemove = sets:subtract(ExistingIdSet, ActiveIdSet), + + lists:foreach(fun(AttId) -> + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + erlfdb:clear(Tx, IdKey), + + ChunkKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + erlfdb:clear_range_startswith(Tx, ChunkKey) + end, sets:to_list(AttsToRemove)). + + revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) -> #{ deleted := Deleted, rev_id := {RevPos, Rev}, rev_path := RevPath, - branch_count := BranchCount + branch_count := BranchCount, + att_hash := AttHash } = RevId, VS = new_versionstamp(Tx), Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, - Val = {?CURR_REV_FORMAT, VS, BranchCount, list_to_tuple(RevPath)}, + Val = { + ?CURR_REV_FORMAT, + VS, + BranchCount, + list_to_tuple(RevPath), + AttHash + }, KBin = erlfdb_tuple:pack(Key, DbPrefix), VBin = erlfdb_tuple:pack_vs(Val), {KBin, VBin, VS}; @@ -1032,38 +1119,49 @@ revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) -> #{ deleted := Deleted, rev_id := {RevPos, Rev}, - rev_path := RevPath + rev_path := RevPath, + att_hash := AttHash } = RevId, Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, - Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath)}, + Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath), AttHash}, KBin = erlfdb_tuple:pack(Key, DbPrefix), VBin = erlfdb_tuple:pack(Val), {KBin, VBin, undefined}. -fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) -> +fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _, _} = Val) -> {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, - {_RevFormat, Sequence, BranchCount, RevPath} = Val, + {_RevFormat, Sequence, BranchCount, RevPath, AttHash} = Val, #{ winner => true, deleted => not NotDeleted, rev_id => {RevPos, Rev}, rev_path => tuple_to_list(RevPath), sequence => Sequence, - branch_count => BranchCount + branch_count => BranchCount, + att_hash => AttHash }; -fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _} = Val) -> +fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _} = Val) -> {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, - {_RevFormat, RevPath} = Val, + {_RevFormat, RevPath, AttHash} = Val, #{ winner => false, deleted => not NotDeleted, rev_id => {RevPos, Rev}, rev_path => tuple_to_list(RevPath), sequence => undefined, - branch_count => undefined - }. + branch_count => undefined, + att_hash => AttHash + }; + +fdb_to_revinfo(Key, {0, Seq, BCount, RPath}) -> + Val = {?CURR_REV_FORMAT, Seq, BCount, RPath, <<>>}, + fdb_to_revinfo(Key, Val); + +fdb_to_revinfo(Key, {0, RPath}) -> + Val = {?CURR_REV_FORMAT, RPath, <<>>}, + fdb_to_revinfo(Key, Val). doc_to_fdb(Db, #doc{} = Doc) -> diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl index 2b8e49e..4e2e2d7 100644 --- a/src/fabric/src/fabric2_util.erl +++ b/src/fabric/src/fabric2_util.erl @@ -25,6 +25,8 @@ validate_security_object/1, + hash_atts/1, + dbname_ends_with/2, get_value/2, @@ -124,6 +126,20 @@ validate_json_list_of_strings(Member, Props) -> end. +hash_atts([]) -> + <<>>; + +hash_atts(Atts) -> + SortedAtts = lists:sort(fun(A, B) -> + couch_att:fetch(name, A) =< couch_att:fetch(name, B) + end, Atts), + Md5St = lists:foldl(fun(Att, Acc) -> + {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att), + couch_hash:md5_hash_update(Acc, AttId) + end, couch_hash:md5_hash_init(), SortedAtts), + couch_hash:md5_hash_final(Md5St). + + dbname_ends_with(#{} = Db, Suffix) -> dbname_ends_with(fabric2_db:name(Db), Suffix); diff --git a/src/fabric/test/fabric2_doc_att_tests.erl b/src/fabric/test/fabric2_doc_att_tests.erl new file mode 100644 index 0000000..331e1a4 --- /dev/null +++ b/src/fabric/test/fabric2_doc_att_tests.erl @@ -0,0 +1,285 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_doc_att_tests). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("fabric2.hrl"). +-include("fabric2_test.hrl"). + + +doc_crud_test_() -> + { + "Test document CRUD operations", + { + setup, + fun setup/0, + fun cleanup/1, + with([ + ?TDEF(create_att), + ?TDEF(delete_att), + ?TDEF(multiple_atts), + ?TDEF(delete_one_att), + ?TDEF(large_att), + ?TDEF(att_on_conflict_isolation) + ]) + } + }. + + +setup() -> + Ctx = test_util:start_couch([fabric]), + {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]), + {Db, Ctx}. + + +cleanup({Db, Ctx}) -> + ok = fabric2_db:delete(fabric2_db:name(Db), []), + test_util:stop_couch(Ctx). + + +create_att({Db, _}) -> + DocId = fabric2_util:uuid(), + Att1 = couch_att:new([ + {name, <<"foo.txt">>}, + {type, <<"application/octet-stream">>}, + {att_len, 6}, + {data, <<"foobar">>}, + {encoding, identity}, + {md5, <<>>} + ]), + Doc1 = #doc{ + id = DocId, + atts = [Att1] + }, + {ok, _} = fabric2_db:update_doc(Db, Doc1), + {ok, Doc2} = fabric2_db:open_doc(Db, DocId), + #doc{ + atts = [Att2] + } = Doc2, + {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2), + AttData = fabric2_db:read_attachment(Db, DocId, AttId), + ?assertEqual(<<"foobar">>, AttData), + + % Check that the raw keys exist + #{ + db_prefix := DbPrefix + } = Db, + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + + fabric2_fdb:transactional(fun(Tx) -> + IdVal = erlfdb:wait(erlfdb:get(Tx, IdKey)), + AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)), + + ?assertEqual(<<>>, IdVal), + ?assertMatch([{_, <<"foobar">>}], AttVals) + end). + + +delete_att({Db, _}) -> + DocId = fabric2_util:uuid(), + Att1 = couch_att:new([ + {name, <<"foo.txt">>}, + {type, <<"application/octet-stream">>}, + {att_len, 6}, + {data, <<"foobar">>}, + {encoding, identity}, + {md5, <<>>} + ]), + Doc1 = #doc{ + id = DocId, + atts = [Att1] + }, + {ok, _} = fabric2_db:update_doc(Db, Doc1), + {ok, Doc2} = fabric2_db:open_doc(Db, DocId), + #doc{ + atts = [Att2] + } = Doc2, + {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2), + + Doc3 = Doc2#doc{atts = []}, + {ok, _} = fabric2_db:update_doc(Db, Doc3), + + {ok, Doc4} = fabric2_db:open_doc(Db, DocId), + ?assertEqual([], Doc4#doc.atts), + + % Check that the raw keys were removed + #{ + db_prefix := DbPrefix + } = Db, + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + + fabric2_fdb:transactional(fun(Tx) -> + IdVal = erlfdb:wait(erlfdb:get(Tx, IdKey)), + AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)), + + ?assertEqual(not_found, IdVal), + ?assertMatch([], AttVals) + end). + + +multiple_atts({Db, _}) -> + DocId = fabric2_util:uuid(), + Atts = [ + mk_att(<<"foo.txt">>, <<"foobar">>), + mk_att(<<"bar.txt">>, <<"barfoo">>), + mk_att(<<"baz.png">>, <<"blargh">>) + ], + {ok, _} = create_doc(Db, DocId, Atts), + ?assertEqual( + #{ + <<"foo.txt">> => <<"foobar">>, + <<"bar.txt">> => <<"barfoo">>, + <<"baz.png">> => <<"blargh">> + }, + read_atts(Db, DocId) + ). + + +delete_one_att({Db, _}) -> + DocId = fabric2_util:uuid(), + Atts1 = [ + mk_att(<<"foo.txt">>, <<"foobar">>), + mk_att(<<"bar.txt">>, <<"barfoo">>), + mk_att(<<"baz.png">>, <<"blargh">>) + ], + {ok, RevId} = create_doc(Db, DocId, Atts1), + Atts2 = tl(Atts1), + {ok, _} = update_doc(Db, DocId, RevId, stubify(RevId, Atts2)), + ?assertEqual( + #{ + <<"bar.txt">> => <<"barfoo">>, + <<"baz.png">> => <<"blargh">> + }, + read_atts(Db, DocId) + ). + + +large_att({Db, _}) -> + DocId = fabric2_util:uuid(), + % Total size ~360,000 bytes + AttData = iolist_to_binary([ + <<"foobar">> || _ <- lists:seq(1, 60000) + ]), + Att1 = mk_att("long.txt", AttData), + {ok, _} = create_doc(Db, DocId, [Att1]), + ?assertEqual(#{"long.txt" => AttData}, read_atts(Db, DocId)), + + {ok, Doc} = fabric2_db:open_doc(Db, DocId), + #doc{atts = [Att2]} = Doc, + {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2), + + #{db_prefix := DbPrefix} = Db, + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + fabric2_fdb:transactional(fun(Tx) -> + AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)), + ?assertEqual(4, length(AttVals)) + end). + + +att_on_conflict_isolation({Db, _}) -> + DocId = fabric2_util:uuid(), + [PosRevA1, PosRevB1] = create_conflicts(Db, DocId, []), + Att = mk_att(<<"happy_goat.tiff">>, <<":D>">>), + {ok, PosRevA2} = update_doc(Db, DocId, PosRevA1, [Att]), + ?assertEqual( + #{<<"happy_goat.tiff">> => <<":D>">>}, + read_atts(Db, DocId, PosRevA2) + ), + ?assertEqual(#{}, read_atts(Db, DocId, PosRevB1)). + + +mk_att(Name, Data) -> + couch_att:new([ + {name, Name}, + {type, <<"application/octet-stream">>}, + {att_len, size(Data)}, + {data, Data}, + {encoding, identity}, + {md5, <<>>} + ]). + + +stubify(RevId, Atts) when is_list(Atts) -> + lists:map(fun(Att) -> + stubify(RevId, Att) + end, Atts); + +stubify({Pos, _Rev}, Att) -> + couch_att:store([ + {data, stub}, + {revpos, Pos} + ], Att). + + +create_doc(Db, DocId, Atts) -> + Doc = #doc{ + id = DocId, + atts = Atts + }, + fabric2_db:update_doc(Db, Doc). + + +update_doc(Db, DocId, {Pos, Rev}, Atts) -> + Doc = #doc{ + id = DocId, + revs = {Pos, [Rev]}, + atts = Atts + }, + fabric2_db:update_doc(Db, Doc). + + +create_conflicts(Db, DocId, Atts) -> + Base = #doc{ + id = DocId, + atts = Atts + }, + {ok, {_, Rev1} = PosRev} = fabric2_db:update_doc(Db, Base), + <<Rev2:16/binary, Rev3:16/binary>> = fabric2_util:uuid(), + Doc1 = #doc{ + id = DocId, + revs = {2, [Rev2, Rev1]}, + atts = stubify(PosRev, Atts) + }, + Doc2 = #doc{ + id = DocId, + revs = {2, [Rev3, Rev1]}, + atts = stubify(PosRev, Atts) + }, + {ok, _} = fabric2_db:update_doc(Db, Doc1, [replicated_changes]), + {ok, _} = fabric2_db:update_doc(Db, Doc2, [replicated_changes]), + lists:reverse(lists:sort([{2, Rev2}, {2, Rev3}])). + + +read_atts(Db, DocId) -> + {ok, #doc{atts = Atts}} = fabric2_db:open_doc(Db, DocId), + atts_to_map(Db, DocId, Atts). + + +read_atts(Db, DocId, PosRev) -> + {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, [PosRev], []), + [{ok, #doc{atts = Atts}}] = Docs, + atts_to_map(Db, DocId, Atts). + + +atts_to_map(Db, DocId, Atts) -> + lists:foldl(fun(Att, Acc) -> + [Name, Data] = couch_att:fetch([name, data], Att), + {loc, _Db, DocId, AttId} = Data, + AttBin = fabric2_db:read_attachment(Db, DocId, AttId), + maps:put(Name, AttBin, Acc) + end, #{}, Atts).
