This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch fix-purge-checkpoint-creation-race-condition in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 70e863bcc12960fad4a661af45237cd0b6768e7d Author: Nick Vatamaniuc <[email protected]> AuthorDate: Thu Dec 18 03:36:41 2025 -0500 Fix race condition during purge checkpoint creation Previously, when the purge checkpoints were first created concurrently with compaction running, it was possible for compaction to finish first and remove too many purge infos before the internal replicator checkpointed. In that case we could end up with a "hole" between a minimum (checkpointed) purge sequence, and the oldest purge sequence. Subsequently, internal replicator would start crashing since when fetching the minimum purge sequence it will correctly detect that one of the purge clients is asking for a sequence that's too low (that is it "skipped" and hasn't processed intermediate purge sequences). The tell-tale sign of this in production is repeated `invalid_start_purge_seq` errors emitted in the logs. One way to get out of would be to delete the checkpoints docs and let them be re-created. To fix the race condition, when compaction starts check if all the expected checkpoints from the other shard copies are created first, and only then use the minimum version, otherwise use the oldest purge sequence version. --- src/couch/src/couch_bt_engine_compactor.erl | 15 ++- src/mem3/src/mem3_rep.erl | 165 +++++++++++++++++++++++++++- 2 files changed, 177 insertions(+), 3 deletions(-) diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index 85d33cf95..2414285b4 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -163,7 +163,20 @@ copy_purge_info(#comp_st{} = CompSt) -> % stale or deprecated internal replicator checkpoints beforehand. ok = mem3_rep:cleanup_purge_checkpoints(DbName), MinPurgeSeq = couch_util:with_db(DbName, fun(Db) -> - couch_db:get_minimum_purge_seq(Db) + % If we don't (yet) have all the expected internal replicator purge + % checkpoints, use the oldest purge sequence instead of the minimum. + % This is to avoid the removing some purge infos too early before the + % checkpoint is created. For example, if the oldest sequence = 1, + % minimum sequence = 1000, and current purge sequence = 2000, we can + % compact and remove all the purge infos from 1 to 1000. While + % compaction happens, a checkpoint is created with sequence = 500. In + % that case we'd end up with a "hole" between 500 and 1001 -- a new + % minimum purge sequence of 500, but the oldest checkpoint is would be + % 1001. + case mem3_rep:have_all_purge_checkpoints(Db) of + true -> couch_db:get_minimum_purge_seq(Db); + false -> couch_db:get_oldest_purge_seq(Db) + end end), OldPSTree = OldSt#st.purge_seq_tree, StartSeq = couch_bt_engine:get_purge_seq(NewSt) + 1, diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 38a940e49..4345d792d 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -20,6 +20,7 @@ make_purge_id/2, verify_purge_checkpoint/2, cleanup_purge_checkpoints/1, + have_all_purge_checkpoints/1, find_source_seq/4, find_split_target_seq/4, local_id_hash/1 @@ -207,11 +208,59 @@ cleanup_purge_checkpoints(Db) -> {stop, Acc} end end, - Opts = [{start_key, list_to_binary(?PURGE_PREFIX)}], - {ok, ToDelete} = couch_db:fold_local_docs(Db, FoldFun, [], Opts), + ToDelete = fold_purge_checkpoints(Db, FoldFun, []), DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end, lists:foreach(DeleteFun, ToDelete). +% Check if we have all the internal replicator purge checkpoints. Call this +% before compaction starts to avoid removing purge infos before the internal +% replicator has managed to create the first checkpoint. +% +have_all_purge_checkpoints(ShardName) when is_binary(ShardName) -> + couch_util:with_db(ShardName, fun(Db) -> have_all_purge_checkpoints(Db) end); +have_all_purge_checkpoints(Db) -> + Shards = shards(couch_db:name(Db)), + ReplicatePurges = config:get_boolean("mem3", "replicate_purges", true), + have_all_purge_checkpoints(ReplicatePurges, Db, Shards). + +have_all_purge_checkpoints(true, Db, [_ | _] = Shards) -> + ShardName = couch_db:name(Db), + UUID = couch_db:get_uuid(Db), + FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) -> + case Id of + <<?PURGE_PREFIX, UUID:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> -> + case verify_checkpoint_shard(Shards, Props) of + true -> + Range = couch_util:get_value(<<"range">>, Props), + TBin = couch_util:get_value(<<"target">>, Props), + TNode = binary_to_existing_atom(TBin, latin1), + {ok, sets:add_element({TNode, Range}, Acc)}; + false -> + {ok, Acc} + end; + _ -> + {stop, Acc} + end + end, + Checkpoints = fold_purge_checkpoints(Db, FoldFun, couch_util:new_set()), + % Keep only shard copies. These are not necessarily ones with a matching + % ranges but also overlapping ranges, since the shards may have been split. + SrcRange = mem3:range(ShardName), + IsCopy = fun(#shard{name = Name, node = Node, range = Range}) -> + Name =/= ShardName andalso + Node =/= config:node_name() andalso + mem3_util:range_overlap(SrcRange, Range) + end, + Copies = [{T, R} || #shard{node = T, range = R} = S <- Shards, IsCopy(S)], + Copies1 = couch_util:set_from_list(Copies), + sets:size(sets:subtract(Copies1, Checkpoints)) == 0; +have_all_purge_checkpoints(false, _Db, _Shards) -> + % If purges are not replicated then we assume we have all (0) checkpoints. + true; +have_all_purge_checkpoints(_, _Db, []) -> + % For a unsharded db we also assume we have all (0) checkpoints. + true. + delete_checkpoint(Db, DocId) -> DbName = couch_db:name(Db), LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s", @@ -229,6 +278,11 @@ delete_checkpoint(Db, DocId) -> ok end. +fold_purge_checkpoints(Db, FoldFun, Acc0) -> + Opts = [{start_key, list_to_binary(?PURGE_PREFIX)}], + {ok, Acc1} = couch_db:fold_local_docs(Db, FoldFun, Acc0, Opts), + Acc1. + verify_purge_checkpoint(DbName, Props) -> try case couch_util:get_value(<<"type">>, Props) of @@ -1232,4 +1286,111 @@ target_not_in_shard_map(_) -> ?assertEqual(1, map_size(Map)), ?assertMatch(#{R0f := #shard{name = Name, node = 'n3'}}, Map). +purge_checkpoints_test_() -> + { + foreach, + fun() -> + Ctx = test_util:start_couch([mem3, fabric]), + config:set("mem3", "replicate_purges", "true", false), + meck:new(mem3, [passthrough]), + meck:expect(mem3, nodes, 0, [node(), n2, n3]), + Ctx + end, + fun(Ctx) -> + meck:unload(), + config:delete("mem3", "replicate_purges", false), + test_util:stop_couch(Ctx) + end, + [ + ?TDEF_FE(t_not_sharded), + ?TDEF_FE(t_purges_not_replicated), + ?TDEF_FE(t_have_all_checkpoints) + ] + }. + +t_not_sharded(_) -> + meck:expect(mem3, shards, 1, meck:raise(error, database_does_not_exist)), + Name = <<"mem3_rep_test", (couch_uuids:random())/binary>>, + {ok, Db} = couch_server:create(Name, [?ADMIN_CTX]), + couch_db:close(Db), + ?assert(have_all_purge_checkpoints(Name)), + ok = couch_server:delete(Name, [?ADMIN_CTX]). + +t_purges_not_replicated(_) -> + R07 = [16#00000000, 16#7fffffff], + R8f = [16#80000000, 16#ffffffff], + R0f = [16#00000000, 16#ffffffff], + + Shards = [ + #shard{node = node(), range = R07}, + #shard{node = node(), range = R8f}, + #shard{node = 'n2', range = R07}, + #shard{node = 'n2', range = R8f}, + #shard{node = 'n3', range = R0f} + ], + meck:expect(mem3, shards, 1, Shards), + + SrcName = <<"shards/00000000-7fffffff/d.1551893550">>, + {ok, Db} = couch_server:create(SrcName, [?ADMIN_CTX]), + couch_db:close(Db), + ?assert(not have_all_purge_checkpoints(SrcName)), + config:set("mem3", "replicate_purges", "false", false), + ?assert(have_all_purge_checkpoints(SrcName)), + ok = couch_server:delete(SrcName, [?ADMIN_CTX]). + +t_have_all_checkpoints(_) -> + R07 = [16#00000000, 16#7fffffff], + R8f = [16#80000000, 16#ffffffff], + R0f = [16#00000000, 16#ffffffff], + + Shards = [ + #shard{node = node(), range = R07}, + #shard{node = node(), range = R8f}, + #shard{node = 'n2', range = R07}, + #shard{node = 'n2', range = R8f}, + #shard{node = 'n3', range = R0f} + ], + meck:expect(mem3, shards, 1, Shards), + + SrcName = <<"shards/00000000-7fffffff/d.1551893551">>, + TgtName1 = <<"shards/00000000-7fffffff/d.1551893551">>, + TgtName2 = <<"shards/80000000-ffffffff/d.1551893551">>, + TgtName3 = <<"shards/00000000-ffffffff/d.1551893551">>, + + Src1 = #shard{name = SrcName, node = node(), range = R07}, + Tgt1 = #shard{name = TgtName1, node = 'n2', range = R07}, + Tgt2 = #shard{name = TgtName2, node = 'n2', range = R8f}, + Tgt3 = #shard{name = TgtName3, node = 'n3', range = R0f}, + + {ok, Db} = couch_server:create(SrcName, [?ADMIN_CTX]), + SrcUuid = couch_db:get_uuid(Db), + + TgtUuid1 = couch_uuids:random(), + % <<"875ce187a5c0f36ee75896d74d10300c">>, + Body1 = purge_cp_body(Src1, Tgt1, 42), + DocId1 = make_purge_id(SrcUuid, TgtUuid1), + Doc1 = #doc{id = DocId1, body = Body1}, + {ok, _} = couch_db:update_doc(Db, Doc1, [?ADMIN_CTX]), + % Not enough checkpoints + ?assert(not have_all_purge_checkpoints(SrcName)), + + Body2 = purge_cp_body(Src1, Tgt2, 43), + TgtUuid2 = couch_uuids:random(), + DocId2 = make_purge_id(SrcUuid, TgtUuid2), + Doc2 = #doc{id = DocId2, body = Body2}, + {ok, _} = couch_db:update_doc(Db, Doc2, [?ADMIN_CTX]), + % Still not enough checkpoints + ?assert(not have_all_purge_checkpoints(SrcName)), + + Body3 = purge_cp_body(Src1, Tgt3, 44), + TgtUuid3 = couch_uuids:random(), + DocId3 = make_purge_id(SrcUuid, TgtUuid3), + Doc3 = #doc{id = DocId3, body = Body3}, + {ok, _} = couch_db:update_doc(Db, Doc3, [?ADMIN_CTX]), + % Now should have all the checkpoints + ?assert(have_all_purge_checkpoints(SrcName)), + + couch_db:close(Db), + ok = couch_server:delete(SrcName, [?ADMIN_CTX]). + -endif.
