This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch auto-delete-tseq in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit fe71ac08392a04b40dc559018403fe523c52c654 Author: Robert Newson <rnew...@apache.org> AuthorDate: Tue Sep 2 15:32:21 2025 +0100 purge deleted documents that exceed TTL --- src/couch/src/couch_auto_purge_plugin.erl | 166 +++++++++++++++++++++ .../test/eunit/couch_auto_purge_plugin_tests.erl | 98 ++++++++++++ src/docs/src/config/scanner.rst | 9 ++ 3 files changed, 273 insertions(+) diff --git a/src/couch/src/couch_auto_purge_plugin.erl b/src/couch/src/couch_auto_purge_plugin.erl new file mode 100644 index 000000000..2e70504b7 --- /dev/null +++ b/src/couch/src/couch_auto_purge_plugin.erl @@ -0,0 +1,166 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_auto_purge_plugin). +-behaviour(couch_scanner_plugin). + +-export([ + start/2, + resume/2, + complete/1, + checkpoint/1, + db/2, + db_opened/2, + db_closing/2, + doc_fdi/3 +]). + +-include_lib("couch_scanner/include/couch_scanner_plugin.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +start(ScanId, #{}) -> + St = init_config(ScanId), + ?INFO("Starting.", [], St), + {ok, St}. + +resume(ScanId, #{}) -> + St = init_config(ScanId), + ?INFO("Resuming.", [], St), + {ok, St}. + +complete(St) -> + ?INFO("Completed", [], St), + {ok, #{}}. + +checkpoint(_St) -> + {ok, #{}}. + +db(St, DbName) -> + case ttl(St, DbName) of + TTL when is_integer(TTL) -> + {ok, St#{ttl => TTL}}; + undefined -> + {skip, St} + end. + +db_opened(#{} = St, Db) -> + #{ttl := TTL} = St, + EndSeq = couch_time_seq:since(couch_db:get_time_seq(Db), couch_time_seq:timestamp() - TTL), + ChangeOpts = + if + EndSeq == now -> []; + true -> [{end_key, EndSeq}] + end, + ?INFO("scanning for deleted documents in ~s up to ~p", [couch_db:name(Db), EndSeq], meta(St)), + {0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}. + +db_closing(#{} = St, Db) -> + #{count := Count} = St, + ?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], meta(St)), + {ok, St}. + +doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) -> + #{end_seq := EndSeq} = St, + ?assert( + FDI#full_doc_info.update_seq =< EndSeq, "FDI update_seq should not be greater than end seq" + ), + {ok, purge(St, FDI, Db)}; +doc_fdi(#{} = St, #full_doc_info{}, _Db) -> + {ok, St}. + +purge(#{} = St, #full_doc_info{} = FDI, Db) -> + {Id, Revs} = fdi_to_idrevs(FDI), + MaxBatchSize = config:get_integer("couch_tombstone_remover", "max_batch_size", 100), + purge(St, Id, Revs, MaxBatchSize, Db). + +purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize -> + DbName = mem3:dbname(couch_db:name(Db)), + PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) end, + Timeout = fabric_util:request_timeout(), + try fabric_util:isolate(PurgeFun, Timeout) of + {Health, Results} when Health == ok; Health == accepted -> + #{count := Count, limiter := Limiter0} = St, + {Wait, Limiter1} = couch_scanner_rate_limiter:update( + Limiter0, doc_write, length(Results) + ), + timer:sleep(Wait), + St#{count => Count + length(Results), limiter => Limiter1}; + Else -> + ?WARN( + "Failed to purge deleted documents in ~s/~s for reason ~p", + [DbName, Id, Else], + meta(St) + ), + St + catch + Class:Reason -> + ?WARN( + "Failed to purge deleted documents in ~s/~s for reason ~p:~p", + [DbName, Id, Class, Reason], + meta(St) + ), + St + end; +purge(#{} = St0, Id, Revs, MaxBatchSize, Db) -> + {RevBatch, RevRest} = lists:split(MaxBatchSize, Revs), + St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db), + purge(St1, Id, RevRest, MaxBatchSize, Db). + +fdi_to_idrevs(#full_doc_info{} = FDI) -> + Revs = [ + couch_doc:rev_to_str({Pos, RevId}) + || {#leaf{}, {Pos, [RevId | _]}} <- couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree) + ], + {FDI#full_doc_info.id, Revs}. + +init_config(ScanId) -> + #{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}. + +meta(#{sid := ScanId}) -> + #{sid => ScanId}. + +ttl(St, DbName) -> + Timeout = fabric_util:request_timeout(), + DefaultTTL = config:get("couch_auto_purge_plugin", "deleted_document_ttl"), + GetFun = fun() -> fabric:open_doc(DbName, <<"_local/_auto_purge">>, [?ADMIN_CTX]) end, + DbTTL = + try fabric_util:isolate(GetFun, Timeout) of + {ok, #doc{} = Doc} -> + {Props} = couch_doc:to_json_obj(Doc, []), + case couch_util:get_value(<<"deleted_document_ttl">>, Props) of + TTL when is_integer(TTL) -> + TTL; + Else -> + ?WARN( + "TTL in ~s as ttl was '~p', not integer", + [DbName, Else], + meta(St) + ), + undefined + end; + {not_found, _Reason} -> + undefined; + {error, Reason} -> + ?WARN( + "Failed to fetch ttl in ~s for reason ~p", + [DbName, Reason], + meta(St) + ), + undefined + catch + Class:Reason -> + ?WARN( + "Failed to fetch ttl in ~s for reason ~p:~p", [DbName, Class, Reason], meta(St) + ), + undefined + end, + if DbTTL /= undefined -> DbTTL; true -> DefaultTTL end. diff --git a/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl b/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl new file mode 100644 index 000000000..4fd50a5cf --- /dev/null +++ b/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl @@ -0,0 +1,98 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_auto_purge_plugin_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(PLUGIN, couch_auto_purge_plugin). + +couch_quickjs_scanner_plugin_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_auto_purge_after_ttl, 10) + ] + }. + +setup() -> + {module, _} = code:ensure_loaded(?PLUGIN), + meck:new(?PLUGIN, [passthrough]), + meck:new(couch_scanner_server, [passthrough]), + meck:new(couch_scanner_util, [passthrough]), + Ctx = test_util:start_couch([fabric, couch_scanner]), + DbName = ?tempdb(), + ok = fabric:create_db(DbName, [{q, "2"}, {n, "1"}]), + ok = add_doc(DbName, <<"_local/_auto_purge">>, #{<<"deleted_document_ttl">> => -1_000_000}), + config:set(atom_to_list(?PLUGIN), "max_batch_items", "1", false), + reset_stats(), + {Ctx, DbName}. + +teardown({Ctx, DbName}) -> + config_delete_section("couch_scanner"), + config_delete_section("couch_scanner_plugins"), + config_delete_section(atom_to_list(?PLUGIN)), + couch_scanner:reset_checkpoints(), + couch_scanner:resume(), + fabric:delete_db(DbName), + test_util:stop_couch(Ctx), + meck:unload(). + +t_auto_purge_after_ttl({_, DbName}) -> + ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}), + ?assertEqual(1, doc_del_count(DbName)), + meck:reset(couch_scanner_server), + meck:reset(?PLUGIN), + config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false), + wait_exit(10000), + ?assertEqual(0, doc_del_count(DbName)), + ok. + +reset_stats() -> + Counters = [ + [couchdb, query_server, process_error_exits], + [couchdb, query_server, process_errors], + [couchdb, query_server, process_exits] + ], + [reset_counter(C) || C <- Counters]. + +reset_counter(Counter) -> + case couch_stats:sample(Counter) of + 0 -> + ok; + N when is_integer(N), N > 0 -> + couch_stats:decrement_counter(Counter, N) + end. + +config_delete_section(Section) -> + [config:delete(K, V, false) || {K, V} <- config:get(Section)]. + +add_doc(DbName, DocId, Body) -> + {ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX]), + ok. + +mkdoc(Id, #{} = Body) -> + Body1 = Body#{<<"_id">> => Id}, + jiffy:decode(jiffy:encode(Body1)). + +num_calls(Fun, Args) -> + meck:num_calls(?PLUGIN, Fun, Args). + +wait_exit(MSec) -> + meck:wait(couch_scanner_server, handle_info, [{'EXIT', '_', '_'}, '_'], MSec). + +doc_del_count(DbName) -> + {ok, DbInfo} = fabric:get_db_info(DbName), + couch_util:get_value(doc_del_count, DbInfo). diff --git a/src/docs/src/config/scanner.rst b/src/docs/src/config/scanner.rst index f36619f4a..fc4ed5a97 100644 --- a/src/docs/src/config/scanner.rst +++ b/src/docs/src/config/scanner.rst @@ -249,3 +249,12 @@ settings in their ``[{plugin}]`` section. [couch_scanner_plugin_ddoc_features] ddoc_report = false + +.. config:section:: couch_auto_purge_plugin :: Configure the Auto Purge plugin + + .. config:option:: deleted_document_ttl + + Set the default interval, in seconds, before the plugin will purge + a deleted document. The database may override this setting with the + ``/dbname/_deleted_document_ttl`` endpoint. If neither is set, the + plugin will not purge deleted documents.