This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch auto-delete-tseq in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f742f64730652fd8436ad5411793ba6cfb411bf1 Author: Robert Newson <rnew...@apache.org> AuthorDate: Tue Sep 2 15:32:21 2025 +0100 purge tombstones that exceed TTL --- src/couch/src/couch_tombstone_remover.erl | 169 +++++++++++++++++++++ .../test/eunit/couch_tombstone_remover_tests.erl | 98 ++++++++++++ src/docs/src/config/scanner.rst | 6 + 3 files changed, 273 insertions(+) diff --git a/src/couch/src/couch_tombstone_remover.erl b/src/couch/src/couch_tombstone_remover.erl new file mode 100644 index 000000000..d21cbb94d --- /dev/null +++ b/src/couch/src/couch_tombstone_remover.erl @@ -0,0 +1,169 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_tombstone_remover). +-behaviour(couch_scanner_plugin). + +-export([ + start/2, + resume/2, + complete/1, + checkpoint/1, + db/2, + db_opened/2, + db_closing/2, + doc_fdi/3 +]). + +-include_lib("couch_scanner/include/couch_scanner_plugin.hrl"). + +start(ScanId, #{}) -> + St = init_config(ScanId), + case should_run() of + true -> + ?INFO("Starting.", [], St), + {ok, St}; + false -> + ?INFO("Not starting.", [], St), + skip + end. + +resume(ScanId, #{}) -> + St = init_config(ScanId), + case should_run() of + true -> + ?INFO("Resuming.", [], St), + {ok, St}; + false -> + ?INFO("Not resuming.", [], St), + skip + end. + +complete(St) -> + ?INFO("Completed", [], St), + {ok, #{}}. + +checkpoint(_St) -> + {ok, #{}}. + +db(St, DbName) -> + Timeout = fabric_util:request_timeout(), + GetFun = fun() -> fabric:open_doc(DbName, <<"_local/_tombstone_ttl">>, [?ADMIN_CTX]) end, + try fabric_util:isolate(GetFun, Timeout) of + {ok, #doc{} = Doc} -> + {Props} = couch_doc:to_json_obj(Doc, []), + case couch_util:get_value(<<"ttl">>, Props) of + TTL when is_integer(TTL) -> + {ok, St#{ttl => TTL}}; + Else -> + ?WARN( + "Failed to purge tombstones in ~s as ttl was '~p', not integer", + [DbName, Else], + meta(St) + ), + {skip, St} + end; + {not_found, _Reason} -> + {skip, St}; + {error, Reason} -> + ?WARN("Failed to purge tombstones in ~s for reason ~p", [DbName, Reason], meta(St)), + {skip, St} + catch + Class:Reason -> + ?WARN( + "Failed to purge tombstones in ~s for reason ~p:~p", + [DbName, Class, Reason], + meta(St) + ), + {skip, St} + end. + +db_opened(#{} = St, Db) -> + #{ttl := TTL} = St, + EndSeq = couch_time_seq:since(couch_db:get_time_seq(Db), couch_time_seq:timestamp() - TTL), + ChangeOpts = + if + EndSeq == now -> []; + true -> [{end_key, EndSeq}] + end, + ?INFO("scanning for tombstones in ~s up to ~p", [couch_db:name(Db), EndSeq], meta(St)), + {0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}. + +db_closing(#{} = St, Db) -> + #{count := Count} = St, + ?INFO("purged ~B tombstones from ~s", [Count, couch_db:name(Db)], meta(St)), + {ok, St}. + +doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) -> + #{end_seq := EndSeq} = St, + if + FDI#full_doc_info.update_seq =< EndSeq -> + {ok, purge(St, FDI, Db)}; + true -> + {ok, St} + end; +doc_fdi(#{} = St, #full_doc_info{}, _Db) -> + {ok, St}. + +purge(#{} = St, #full_doc_info{} = FDI, Db) -> + {Id, Revs} = fdi_to_idrevs(FDI), + MaxBatchSize = config:get_integer("couch_tombstone_remover", "max_batch_size", 100), + purge(St, Id, Revs, MaxBatchSize, Db). + +purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize -> + DbName = mem3:dbname(couch_db:name(Db)), + PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) end, + Timeout = fabric_util:request_timeout(), + try fabric_util:isolate(PurgeFun, Timeout) of + {Health, Results} when Health == ok; Health == accepted -> + #{count := Count, limiter := Limiter0} = St, + {Wait, Limiter1} = couch_scanner_rate_limiter:update( + Limiter0, doc_write, length(Results) + ), + timer:sleep(Wait), + St#{count => Count + length(Results), limiter => Limiter1}; + Else -> + ?WARN( + "Failed to purge tombstones in ~s/~s for reason ~p", + [DbName, Id, Else], + meta(St) + ), + St + catch + Class:Reason -> + ?WARN( + "Failed to purge tombstones in ~s/~s for reason ~p:~p", + [DbName, Id, Class, Reason], + meta(St) + ), + St + end; +purge(#{} = St0, Id, Revs, MaxBatchSize, Db) -> + {RevBatch, RevRest} = lists:split(MaxBatchSize, Revs), + St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db), + purge(St1, Id, RevRest, MaxBatchSize, Db). + +fdi_to_idrevs(#full_doc_info{} = FDI) -> + Revs = [ + couch_doc:rev_to_str({Pos, RevId}) + || {#leaf{}, {Pos, [RevId | _]}} <- couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree) + ], + {FDI#full_doc_info.id, Revs}. + +init_config(ScanId) -> + #{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}. + +should_run() -> + true. + +meta(#{sid := ScanId}) -> + #{sid => ScanId}. diff --git a/src/couch/test/eunit/couch_tombstone_remover_tests.erl b/src/couch/test/eunit/couch_tombstone_remover_tests.erl new file mode 100644 index 000000000..4be9fffd5 --- /dev/null +++ b/src/couch/test/eunit/couch_tombstone_remover_tests.erl @@ -0,0 +1,98 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_tombstone_remover_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(PLUGIN, couch_tombstone_remover). + +couch_quickjs_scanner_plugin_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_removes_tombstone, 10) + ] + }. + +setup() -> + {module, _} = code:ensure_loaded(?PLUGIN), + meck:new(?PLUGIN, [passthrough]), + meck:new(couch_scanner_server, [passthrough]), + meck:new(couch_scanner_util, [passthrough]), + Ctx = test_util:start_couch([fabric, couch_scanner]), + DbName = ?tempdb(), + ok = fabric:create_db(DbName, [{q, "2"}, {n, "1"}]), + ok = add_doc(DbName, <<"_local/_tombstone_ttl">>, #{<<"ttl">> => -1_000_000}), + config:set(atom_to_list(?PLUGIN), "max_batch_items", "1", false), + reset_stats(), + {Ctx, DbName}. + +teardown({Ctx, DbName}) -> + config_delete_section("couch_scanner"), + config_delete_section("couch_scanner_plugins"), + config_delete_section(atom_to_list(?PLUGIN)), + couch_scanner:reset_checkpoints(), + couch_scanner:resume(), + fabric:delete_db(DbName), + test_util:stop_couch(Ctx), + meck:unload(). + +t_removes_tombstone({_, DbName}) -> + ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}), + ?assertEqual(1, doc_del_count(DbName)), + meck:reset(couch_scanner_server), + meck:reset(?PLUGIN), + config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false), + wait_exit(10000), + ?assertEqual(0, doc_del_count(DbName)), + ok. + +reset_stats() -> + Counters = [ + [couchdb, query_server, process_error_exits], + [couchdb, query_server, process_errors], + [couchdb, query_server, process_exits] + ], + [reset_counter(C) || C <- Counters]. + +reset_counter(Counter) -> + case couch_stats:sample(Counter) of + 0 -> + ok; + N when is_integer(N), N > 0 -> + couch_stats:decrement_counter(Counter, N) + end. + +config_delete_section(Section) -> + [config:delete(K, V, false) || {K, V} <- config:get(Section)]. + +add_doc(DbName, DocId, Body) -> + {ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX]), + ok. + +mkdoc(Id, #{} = Body) -> + Body1 = Body#{<<"_id">> => Id}, + jiffy:decode(jiffy:encode(Body1)). + +num_calls(Fun, Args) -> + meck:num_calls(?PLUGIN, Fun, Args). + +wait_exit(MSec) -> + meck:wait(couch_scanner_server, handle_info, [{'EXIT', '_', '_'}, '_'], MSec). + +doc_del_count(DbName) -> + {ok, DbInfo} = fabric:get_db_info(DbName), + couch_util:get_value(doc_del_count, DbInfo). diff --git a/src/docs/src/config/scanner.rst b/src/docs/src/config/scanner.rst index f36619f4a..6f399f817 100644 --- a/src/docs/src/config/scanner.rst +++ b/src/docs/src/config/scanner.rst @@ -249,3 +249,9 @@ settings in their ``[{plugin}]`` section. [couch_scanner_plugin_ddoc_features] ddoc_report = false + +.. config:section:: couch_tombstone_remover :: Configure the "Tombstone Remover" plugin + + .. config:option:: {tag} + + TODO