This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch auto-delete-3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 363340e6e7c08b8229cd66b1bfe9403c18586ce5 Author: Robert Newson <[email protected]> AuthorDate: Thu Mar 20 17:08:55 2025 +0000 Implement _update_drop_seq --- .gitignore | 2 + Makefile | 8 +- configure | 1 + configure.ps1 | 1 + dev/run | 1 + erlang_ls.config | 5 + mix.exs | 5 +- .../nouveau/NouveauApplicationConfiguration.java | 2 +- .../org/apache/couchdb/nouveau/api/IndexInfo.java | 13 +- .../org/apache/couchdb/nouveau/core/Index.java | 12 +- rel/nouveau.yaml | 2 +- src/chttpd/src/chttpd_db.erl | 16 +- src/chttpd/src/chttpd_httpd_handlers.erl | 1 + src/config/src/config.erl | 6 +- src/couch/src/couch_bt_engine.erl | 18 +- src/couch/src/couch_bt_engine_compactor.erl | 3 +- src/couch/src/couch_bt_engine_header.erl | 12 +- src/couch/src/couch_db.erl | 6 + src/couch/src/couch_db_engine.erl | 18 +- src/couch_mrview/src/couch_mrview_cleanup.erl | 6 +- src/couch_mrview/src/couch_mrview_index.erl | 26 +- .../src/couch_replicator_scheduler_job.erl | 53 +- src/dreyfus/src/dreyfus_fabric_cleanup.erl | 1 + src/dreyfus/src/dreyfus_index.erl | 7 + src/dreyfus/src/dreyfus_index_updater.erl | 13 +- src/fabric/src/fabric.erl | 15 +- src/fabric/src/fabric_db_info.erl | 2 + src/fabric/src/fabric_drop_seq.erl | 860 +++++++++++++++++++-- src/mem3/src/mem3_reshard_index.erl | 10 +- src/nouveau/src/nouveau_fabric_cleanup.erl | 1 + src/nouveau/src/nouveau_index_updater.erl | 17 + test/elixir/lib/utils.ex | 2 +- test/elixir/test/config/nouveau.elixir | 3 + test/elixir/test/config/search.elixir | 3 + test/elixir/test/config/suite.elixir | 5 + test/elixir/test/drop_seq_statem_test.exs | 415 ++++++++++ test/elixir/test/drop_seq_test.exs | 248 ++++++ 37 files changed, 1691 insertions(+), 128 deletions(-) diff --git a/.gitignore b/.gitignore index 080a7dd6f..4d4918a73 100644 --- a/.gitignore +++ b/.gitignore @@ -78,6 +78,8 @@ src/unicode_util_compat/ src/file_system/ src/rebar3/ src/erlfmt/ +src/libgraph/ +src/propcheck/ src/*.erl tmp/ diff --git a/Makefile b/Makefile index 8b1df51e6..1acd42048 100644 --- a/Makefile +++ b/Makefile @@ -254,6 +254,12 @@ elixir-cluster-with-quorum: elixir-init devclean --degrade-cluster 1 \ --no-eval 'mix test --trace --only with_quorum_test $(EXUNIT_OPTS)' +.PHONY: elixir-cluster +elixir-cluster: export MIX_ENV=integration +elixir-cluster: elixir-init devclean + @dev/run -n 3 -q -a adm:pass --with-nouveau \ + --no-eval 'mix test --trace --only with_cluster $(EXUNIT_OPTS)' + .PHONY: elixir # target: elixir - Run Elixir-based integration tests elixir: export MIX_ENV=integration @@ -575,5 +581,5 @@ nouveau-test-elixir: couch nouveau ifeq ($(with_nouveau), true) @dev/run "$(TEST_OPTS)" -n 1 -q -a adm:pass --with-nouveau \ --locald-config test/config/test-config.ini \ - --no-eval 'mix test --trace --include test/elixir/test/config/nouveau.elixir' + --no-eval 'mix test --trace --include test/elixir/test/config/nouveau.elixir $(EXUNIT_OPTS)' endif diff --git a/configure b/configure index 70350a07b..80b8921c3 100755 --- a/configure +++ b/configure @@ -373,6 +373,7 @@ cat > rel/couchdb.config << EOF {data_dir, "./data"}. {view_index_dir, "./data"}. {nouveau_index_dir, "./data/nouveau"}. +{nouveau_commit_interval, 30}. {nouveau_url, "http://127.0.0.1:5987"}. {nouveau_port, 5987}. {nouveau_admin_port, 5988}. diff --git a/configure.ps1 b/configure.ps1 index faf3669ec..ca0c417dd 100644 --- a/configure.ps1 +++ b/configure.ps1 @@ -194,6 +194,7 @@ $CouchDBConfig = @" {data_dir, "./data"}. {view_index_dir, "./data"}. {nouveau_index_dir, "./data/nouveau"}. +{nouveau_commit_interval, 30}. {nouveau_url, "http://127.0.0.1:5987"}. {nouveau_port, 5987}. {nouveau_admin_port, 5988}. diff --git a/dev/run b/dev/run index 85ea9622d..fc070a035 100755 --- a/dev/run +++ b/dev/run @@ -424,6 +424,7 @@ def generate_nouveau_config(ctx): config = { "nouveau_index_dir": os.path.join(ctx["devdir"], "lib", "nouveau"), + "nouveau_commit_interval": 5, "nouveau_port": 5987, "nouveau_admin_port": 5988, } diff --git a/erlang_ls.config b/erlang_ls.config index 94483cfec..3464e6fdd 100644 --- a/erlang_ls.config +++ b/erlang_ls.config @@ -3,3 +3,8 @@ apps_dirs: include_dirs: - "src" - "src/*/include" +macros: + - name: WITH_PROPER + value: true + - name: TEST + value: true diff --git a/mix.exs b/mix.exs index 4f0cb4ba3..86fff4227 100644 --- a/mix.exs +++ b/mix.exs @@ -71,7 +71,7 @@ defmodule CouchDBTest.Mixfile do end # Run "mix help compile.app" to learn about applications. - def application, do: [applications: [:logger, :httpotion]] + def application, do: [applications: [:logger, :httpotion, :propcheck]] # Specifies which paths to compile per environment. defp elixirc_paths(:test), do: ["test/elixir/lib", "test/elixir/test/support"] @@ -85,7 +85,8 @@ defmodule CouchDBTest.Mixfile do {:httpotion, ">= 3.2.0", only: [:dev, :test, :integration], runtime: false}, {:excoveralls, "~> 0.18.3", only: :test}, {:ibrowse, path: path("ibrowse"), override: true}, - {:credo, "~> 1.7.11", only: [:dev, :test, :integration], runtime: false} + {:credo, "~> 1.7.11", only: [:dev, :test, :integration], runtime: false}, + {:propcheck, ">= 1.5.0", only: [:dev, :test, :integration]}, ] extra_deps = [:b64url, :jiffy, :jwtf, :meck, :mochiweb] diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java index dce6fe6da..67eed38af 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java @@ -24,7 +24,7 @@ public class NouveauApplicationConfiguration extends Configuration { @Min(10) private int maxIndexesOpen = 10; - @Min(10) + @Min(5) private int commitIntervalSeconds = 10; @Min(30) diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java index 9958c7780..fbf003c69 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java @@ -24,6 +24,9 @@ public final class IndexInfo { @PositiveOrZero private final long updateSeq; + @PositiveOrZero + private final long committedUpdateSeq; + @PositiveOrZero private final long purgeSeq; @@ -35,10 +38,12 @@ public final class IndexInfo { public IndexInfo( @JsonProperty("update_seq") final long updateSeq, + @JsonProperty("committed_update_seq") final long committedUpdateSeq, @JsonProperty("purge_seq") final long purgeSeq, @JsonProperty("num_docs") final int numDocs, @JsonProperty("disk_size") final long diskSize) { this.updateSeq = updateSeq; + this.committedUpdateSeq = committedUpdateSeq; this.purgeSeq = purgeSeq; this.numDocs = numDocs; this.diskSize = diskSize; @@ -56,13 +61,17 @@ public final class IndexInfo { return updateSeq; } + public long getCommittedUpdateSeq() { + return committedUpdateSeq; + } + public long getPurgeSeq() { return purgeSeq; } @Override public String toString() { - return "IndexInfo [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + ", numDocs=" + numDocs + ", diskSize=" - + diskSize + "]"; + return "IndexInfo [updateSeq=" + updateSeq + ", committedUpdateSeq=" + committedUpdateSeq + ", purgeSeq=" + + purgeSeq + ", numDocs=" + numDocs + ", diskSize=" + diskSize + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java index 69c9bd48f..bcb5859e4 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java @@ -36,18 +36,20 @@ import org.apache.couchdb.nouveau.api.SearchResults; public abstract class Index implements Closeable { private long updateSeq; + private long committedUpdateSeq; private long purgeSeq; private boolean deleteOnClose = false; protected Index(final long updateSeq, final long purgeSeq) { this.updateSeq = updateSeq; + this.committedUpdateSeq = updateSeq; this.purgeSeq = purgeSeq; } public final IndexInfo info() throws IOException { final int numDocs = doNumDocs(); final long diskSize = doDiskSize(); - return new IndexInfo(updateSeq, purgeSeq, numDocs, diskSize); + return new IndexInfo(updateSeq, committedUpdateSeq, purgeSeq, numDocs, diskSize); } protected abstract int doNumDocs() throws IOException; @@ -93,7 +95,13 @@ public abstract class Index implements Closeable { updateSeq = this.updateSeq; purgeSeq = this.purgeSeq; } - return doCommit(updateSeq, purgeSeq); + final boolean result = doCommit(updateSeq, purgeSeq); + if (result) { + synchronized (this) { + this.committedUpdateSeq = Math.max(this.committedUpdateSeq, updateSeq); + } + } + return result; } protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException; diff --git a/rel/nouveau.yaml b/rel/nouveau.yaml index 40837f12c..55dd192fa 100644 --- a/rel/nouveau.yaml +++ b/rel/nouveau.yaml @@ -1,5 +1,5 @@ maxIndexesOpen: 3000 -commitIntervalSeconds: 30 +commitIntervalSeconds: {{nouveau_commit_interval}} idleSeconds: 60 rootDir: {{nouveau_index_dir}} diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index b4c141f8c..3e22dc423 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -30,7 +30,8 @@ handle_view_cleanup_req/2, update_doc/4, http_code_from_status/1, - handle_partition_req/2 + handle_partition_req/2, + handle_update_drop_seq_req/2 ]). -import( @@ -390,6 +391,19 @@ update_partition_stats(PathParts) -> ok end. +handle_update_drop_seq_req( + #httpd{method = 'POST', path_parts = [_DbName, <<"_update_drop_seq">>]} = Req, Db +) -> + chttpd:validate_ctype(Req, "application/json"), + case fabric:update_drop_seq(Db) of + {ok, Results} -> + send_json(Req, 201, {[{ok, true}, {results, Results}]}); + {error, Reason} -> + chttpd:send_error(Req, Reason) + end; +handle_update_drop_seq_req(Req, _Db) -> + send_method_not_allowed(Req, "POST"). + handle_design_req( #httpd{ path_parts = [_DbName, _Design, Name, <<"_", _/binary>> = Action | _Rest] diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl index 932b52e5f..306885e83 100644 --- a/src/chttpd/src/chttpd_httpd_handlers.erl +++ b/src/chttpd/src/chttpd_httpd_handlers.erl @@ -35,6 +35,7 @@ db_handler(<<"_design">>) -> fun chttpd_db:handle_design_req/2; db_handler(<<"_partition">>) -> fun chttpd_db:handle_partition_req/2; db_handler(<<"_temp_view">>) -> fun chttpd_view:handle_temp_view_req/2; db_handler(<<"_changes">>) -> fun chttpd_db:handle_changes_req/2; +db_handler(<<"_update_drop_seq">>) -> fun chttpd_db:handle_update_drop_seq_req/2; db_handler(_) -> no_match. design_handler(<<"_view">>) -> fun chttpd_view:handle_view_req/3; diff --git a/src/config/src/config.erl b/src/config/src/config.erl index bf8358d1d..695bc40b9 100644 --- a/src/config/src/config.erl +++ b/src/config/src/config.erl @@ -512,8 +512,10 @@ remove_comments(Line) -> NoLeadingComment when is_list(NoLeadingComment) -> % Check for in-line comments. In-line comments must be preceded by % space or a tab character. - [NoComments | _] = re:split(NoLeadingComment, " ;|\t;", [{return, list}]), - NoComments + case re:split(NoLeadingComment, " ;|\t;", [{return, list}]) of + [] -> []; + [NoComments | _] -> NoComments + end end. trim(String) -> diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 6f9c86d6e..9d3c97fd7 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -45,6 +45,7 @@ get_partition_info/2, get_update_seq/1, get_drop_seq/1, + get_drop_count/1, get_uuid/1, set_revs_limit/2, @@ -54,6 +55,7 @@ set_update_seq/2, set_drop_seq/3, + increment_drop_count/1, open_docs/2, open_local_docs/2, @@ -331,6 +333,9 @@ get_update_seq(#st{header = Header}) -> get_drop_seq(#st{header = Header}) -> couch_bt_engine_header:get(Header, drop_seq). +get_drop_count(#st{header = Header}) -> + couch_bt_engine_header:get(Header, drop_count). + get_uuid(#st{header = Header}) -> couch_bt_engine_header:get(Header, uuid). @@ -820,12 +825,11 @@ set_drop_seq(#st{header = Header} = St, ExpectedUuidPrefix, NewDropSeq) when CurrentDropSeq = get_drop_seq(St), Uuid = get_uuid(St), ActualUuidPrefix = binary:part(Uuid, 0, byte_size(ExpectedUuidPrefix)), - if - NewDropSeq < CurrentDropSeq -> - {error, drop_seq_cant_decrease}; ExpectedUuidPrefix /= ActualUuidPrefix -> {error, uuid_mismatch}; + NewDropSeq < CurrentDropSeq -> + {error, {drop_seq_cant_decrease, CurrentDropSeq, NewDropSeq}}; true -> NewSt = St#st{ header = couch_bt_engine_header:set(Header, [ @@ -836,6 +840,14 @@ set_drop_seq(#st{header = Header} = St, ExpectedUuidPrefix, NewDropSeq) when {ok, increment_update_seq(NewSt)} end. +increment_drop_count(#st{header = Header} = St) -> + CurrentDropCount = get_drop_count(St), + NewSt = St#st{ + header = couch_bt_engine_header:set(Header, [{drop_count, CurrentDropCount + 1}]), + needs_commit = true + }, + {ok, NewSt}. + copy_security(#st{header = Header} = St, SecProps) -> Options = [{compression, St#st.compression}], {ok, Ptr, _} = couch_file:append_term(St#st.fd, SecProps, Options), diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index 619200bce..2f230f489 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -330,7 +330,8 @@ copy_compact(#comp_st{} = CompSt) -> if Deleted andalso Seq =< DropSeq -> %% drop this document completely - {ok, {AccNewSt, AccUncopied, AccUncopiedSize, AccCopiedSize}}; + {ok, NewSt2} = couch_bt_engine:increment_drop_count(AccNewSt), + {ok, {NewSt2, AccUncopied, AccUncopiedSize, AccCopiedSize}}; AccUncopiedSize2 >= BufferSize -> NewSt2 = copy_docs( St, AccNewSt, lists:reverse([DocInfo | AccUncopied]), Retry diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index 9f5285d49..0d3c55a14 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -39,7 +39,8 @@ uuid/1, epochs/1, compacted_seq/1, - drop_seq/1 + drop_seq/1, + drop_count/1 ]). -include_lib("stdlib/include/assert.hrl"). @@ -73,7 +74,8 @@ compacted_seq, purge_infos_limit = 1000, props_ptr, - drop_seq = 0 + drop_seq = 0, + drop_count = 0 }). -define(PARTITION_DISK_VERSION, 8). @@ -90,7 +92,8 @@ from(Header0) -> uuid = Header#db_header.uuid, epochs = Header#db_header.epochs, compacted_seq = Header#db_header.compacted_seq, - drop_seq = Header#db_header.drop_seq + drop_seq = Header#db_header.drop_seq, + drop_count = Header#db_header.drop_count }. is_header(Header) -> @@ -182,6 +185,9 @@ compacted_seq(Header) -> drop_seq(Header) -> get_field(Header, drop_seq). +drop_count(Header) -> + get_field(Header, drop_count). + purge_infos_limit(Header) -> get_field(Header, purge_infos_limit). diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 8718b5b82..e4d16a36b 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -42,6 +42,7 @@ get_db_info/1, get_partition_info/2, get_del_doc_count/1, + get_drop_count/1, get_doc_count/1, get_epochs/1, get_filepath/1, @@ -579,6 +580,9 @@ get_pid(#db{main_pid = Pid}) -> get_del_doc_count(Db) -> {ok, couch_db_engine:get_del_doc_count(Db)}. +get_drop_count(Db) -> + {ok, couch_db_engine:get_drop_count(Db)}. + get_doc_count(Db) -> {ok, couch_db_engine:get_doc_count(Db)}. @@ -619,6 +623,7 @@ get_db_info(Db) -> } = Db, {ok, DocCount} = get_doc_count(Db), {ok, DelDocCount} = get_del_doc_count(Db), + {ok, DropCount} = get_drop_count(Db), SizeInfo = couch_db_engine:get_size_info(Db), DiskVersion = couch_db_engine:get_disk_version(Db), Uuid = @@ -641,6 +646,7 @@ get_db_info(Db) -> {engine, couch_db_engine:get_engine(Db)}, {doc_count, DocCount}, {doc_del_count, DelDocCount}, + {drop_count, DropCount}, {update_seq, get_update_seq(Db)}, {purge_seq, couch_db_engine:get_purge_seq(Db)}, {compact_running, Compactor /= nil}, diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 1d89066e8..7a3dd8b32 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -194,6 +194,11 @@ -callback get_del_doc_count(DbHandle :: db_handle()) -> DelDocCount :: non_neg_integer(). +% The number of tombstones (deleted documents with no content) in the +% database which have been completely dropped from the database. +-callback get_drop_count(DbHandle :: db_handle()) -> + DropCount :: non_neg_integer(). + % This number is reported in the database info properties and % as such can be any JSON value. -callback get_disk_version(DbHandle :: db_handle()) -> Version :: json(). @@ -679,6 +684,7 @@ get_engine/1, get_compacted_seq/1, get_del_doc_count/1, + get_drop_count/1, get_disk_version/1, get_doc_count/1, get_epochs/1, @@ -806,6 +812,10 @@ get_del_doc_count(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, Engine:get_del_doc_count(EngineState). +get_drop_count(#db{} = Db) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:get_drop_count(EngineState). + get_disk_version(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, Engine:get_disk_version(EngineState). @@ -889,8 +899,12 @@ set_update_seq(#db{} = Db, UpdateSeq) -> set_drop_seq(#db{} = Db, UuidPrefix, UpdateSeq) -> #db{engine = {Engine, EngineState}} = Db, - {ok, NewSt} = Engine:set_drop_seq(EngineState, UuidPrefix, UpdateSeq), - {ok, Db#db{engine = {Engine, NewSt}}}. + case Engine:set_drop_seq(EngineState, UuidPrefix, UpdateSeq) of + {ok, NewSt} -> + {ok, Db#db{engine = {Engine, NewSt}}}; + {error, Reason} -> + {error, Reason} + end. open_docs(#db{} = Db, DocIds) -> #db{engine = {Engine, EngineState}} = Db, diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl index 5b5afbdce..d769ded8f 100644 --- a/src/couch_mrview/src/couch_mrview_cleanup.erl +++ b/src/couch_mrview/src/couch_mrview_cleanup.erl @@ -15,7 +15,8 @@ -export([ run/1, cleanup_purges/3, - cleanup_indices/2 + cleanup_indices/2, + cleanup_peer_checkpoints/2 ]). -include_lib("couch/include/couch_db.hrl"). @@ -43,6 +44,9 @@ cleanup_indices(#{} = Sigs, #{} = IndexMap) -> maps:map(Fun, maps:without(maps:keys(Sigs), IndexMap)), ok. +cleanup_peer_checkpoints(DbName, Sigs) when is_binary(DbName), is_map(Sigs) -> + fabric_drop_seq:cleanup_peer_checkpoint_docs(DbName, <<"mrview">>, maps:keys(Sigs)). + delete_file(File) -> RootDir = couch_index_util:root_dir(), couch_log:debug("~p : deleting inactive index : ~s", [?MODULE, File]), diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 51777480c..5ec895434 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -129,12 +129,14 @@ open(Db, State0) -> NewSt = init_and_upgrade_state(Db, Fd, State, Header), ok = commit(NewSt), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt}; % end of upgrade code for <= 2.x {ok, {Sig, Header}} -> % Matching view signatures. NewSt = init_and_upgrade_state(Db, Fd, State, Header), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), check_collator_versions(DbName, NewSt), {ok, NewSt}; {ok, {WrongSig, _}} -> @@ -144,6 +146,7 @@ open(Db, State0) -> ), NewSt = couch_mrview_util:reset_index(Db, Fd, State), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt}; {ok, Else} -> couch_log:error( @@ -152,10 +155,12 @@ open(Db, State0) -> ), NewSt = couch_mrview_util:reset_index(Db, Fd, State), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt}; no_valid_header -> NewSt = couch_mrview_util:reset_index(Db, Fd, State), ensure_local_purge_doc(Db, NewSt), + ensure_peer_checkpoint_doc(NewSt), {ok, NewSt} end; {error, Reason} = Error -> @@ -210,7 +215,17 @@ finish_update(State) -> commit(State) -> Header = {State#mrst.sig, couch_mrview_util:make_header(State)}, - couch_file:write_header(State#mrst.fd, Header). + ok = couch_file:sync(State#mrst.fd), + ok = couch_file:write_header(State#mrst.fd, Header), + ok = couch_file:sync(State#mrst.fd), + fabric_drop_seq:update_peer_checkpoint_doc( + State#mrst.db_name, + <<"mrview">>, + State#mrst.idx_name, + fabric_drop_seq:peer_id_from_sig(State#mrst.db_name, couch_util:to_hex_bin(State#mrst.sig)), + State#mrst.update_seq + ), + ok. compact(Db, State, Opts) -> couch_mrview_compactor:compact(Db, State, Opts). @@ -295,6 +310,15 @@ ensure_local_purge_doc(Db, #mrst{} = State) -> ok end. +ensure_peer_checkpoint_doc(#mrst{} = State) -> + fabric_drop_seq:create_peer_checkpoint_doc_if_missing( + State#mrst.db_name, + <<"mrview">>, + State#mrst.idx_name, + fabric_drop_seq:peer_id_from_sig(State#mrst.db_name, couch_util:to_hex_bin(State#mrst.sig)), + State#mrst.update_seq + ). + create_local_purge_doc(Db, State) -> PurgeSeq = couch_db:get_purge_seq(Db), update_local_purge_doc(Db, State, PurgeSeq). diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 62e604b5e..0b14564c0 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -678,6 +678,7 @@ init_state(Rep) -> StartSeq = {0, StartSeq1}, SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), + create_peer_checkpoint_doc_if_missing(Source, BaseId, SourceSeq), #doc{body = {CheckpointHistory}} = SourceLog, State = #rep_state{ @@ -809,7 +810,7 @@ do_checkpoint(State) -> src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, stats = Stats, - rep_details = #rep{options = Options}, + rep_details = #rep{id = {BaseId, _}, options = Options}, session_id = SessionId } = State, case commit_to_both(Source, Target) of @@ -886,7 +887,14 @@ do_checkpoint(State) -> {TgtRevPos, TgtRevId} = update_checkpoint( Target, TargetLog#doc{body = NewRepHistory}, target ), - %% TODO update_checkpoint(Source, peer_checkpoint_doc(State), source), + if + is_binary(NewSeq) -> + update_checkpoint( + Source, peer_checkpoint_doc(Source, BaseId, NewSeq), source + ); + true -> + ok + end, NewState = State#rep_state{ checkpoint_history = NewRepHistory, committed_seq = NewTsSeq, @@ -913,14 +921,39 @@ do_checkpoint(State) -> >>} end. -peer_checkpoint_doc(#rep_state{} = State) -> - #rep_state{ - session_id = SessionId - } = State, - #doc{ - id = <<"peer-checkpoint-", SessionId/binary>>, - body = {[{<<"update_seq">>, State#rep_state.committed_seq}]} - }. +create_peer_checkpoint_doc_if_missing(#httpdb{} = Db, BaseId, SourceSeq) when + is_list(BaseId), is_binary(SourceSeq) +-> + case couch_replicator_api_wrap:open_doc(Db, peer_checkpoint_id(BaseId), []) of + {ok, _} -> + ok; + {error, <<"not_found">>} -> + Doc = peer_checkpoint_doc(Db, BaseId, SourceSeq), + case couch_replicator_api_wrap:update_doc(Db, Doc, []) of + {ok, _} -> + ok; + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end; + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end; +create_peer_checkpoint_doc_if_missing(#httpdb{} = _Db, BaseId, SourceSeq) when + is_list(BaseId), is_integer(SourceSeq) +-> + %% ignored + ok. + +peer_checkpoint_doc(#httpdb{} = Db, BaseId, UpdateSeq) -> + fabric_drop_seq:peer_checkpoint_doc( + peer_checkpoint_id(BaseId), + <<"replication">>, + ?l2b(couch_replicator_api_wrap:db_uri(Db)), + UpdateSeq + ). + +peer_checkpoint_id(BaseId) when is_list(BaseId) -> + ?l2b("repl-" ++ BaseId). update_checkpoint(Db, Doc, DbType) -> try diff --git a/src/dreyfus/src/dreyfus_fabric_cleanup.erl b/src/dreyfus/src/dreyfus_fabric_cleanup.erl index e2710744d..1a3aec9dc 100644 --- a/src/dreyfus/src/dreyfus_fabric_cleanup.erl +++ b/src/dreyfus/src/dreyfus_fabric_cleanup.erl @@ -29,6 +29,7 @@ go(DbName) -> ) ), cleanup_local_purge_doc(DbName, ActiveSigs), + fabric_drop_seq:cleanup_peer_checkpoint_docs(DbName, <<"search">>, ActiveSigs), clouseau_rpc:cleanup(DbName, ActiveSigs), ok. diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl index c97a837d5..c2b6fa6ea 100644 --- a/src/dreyfus/src/dreyfus_index.erl +++ b/src/dreyfus/src/dreyfus_index.erl @@ -124,6 +124,13 @@ init({DbName, Index}) -> couch_db:close(Db) end, dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index), + fabric_drop_seq:create_peer_checkpoint_doc_if_missing( + DbName, + <<"search">>, + <<(Index#index.ddoc_id)/binary, "/", (Index#index.name)/binary>>, + fabric_drop_seq:peer_id_from_sig(DbName, Index#index.sig), + Seq + ), proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], State); Error -> diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl index 6edc5a257..f8404cc53 100644 --- a/src/dreyfus/src/dreyfus_index_updater.erl +++ b/src/dreyfus/src/dreyfus_index_updater.erl @@ -60,7 +60,18 @@ update(IndexPid, Index) -> [Changes] = couch_task_status:get([changes_done]), Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs}, {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []), - ok = clouseau_rpc:commit(IndexPid, NewCurSeq) + ok = clouseau_rpc:commit(IndexPid, NewCurSeq), + {ok, CommittedSeq} = clouseau_rpc:get_update_seq(IndexPid), + fabric_drop_seq:update_peer_checkpoint_doc( + DbName, + <<"search">>, + <<(Index#index.ddoc_id)/binary, "/", (Index#index.name)/binary>>, + fabric_drop_seq:peer_id_from_sig( + DbName, + Index#index.sig + ), + CommittedSeq + ) after ret_os_process(Proc) end, diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 92f30e4e8..c1fb021a8 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -34,7 +34,7 @@ get_purged_infos/1, compact/1, compact/2, get_partition_info/2, - calculate_drop_seq/1 + update_drop_seq/1 ]). % Documents @@ -120,9 +120,9 @@ get_db_info(DbName) -> get_partition_info(DbName, Partition) -> fabric_db_partition_info:go(dbname(DbName), Partition). --spec calculate_drop_seq(dbname()) -> +-spec update_drop_seq(dbname()) -> {ok, [{node(), binary(), non_neg_integer()}]}. -calculate_drop_seq(DbName) -> +update_drop_seq(DbName) -> fabric_drop_seq:go(dbname(DbName)). %% @doc the number of docs in a database @@ -586,18 +586,19 @@ cleanup_index_files() -> cleanup_index_files(DbName) -> try ShardNames = [mem3:name(S) || S <- mem3:local_shards(dbname(DbName))], - cleanup_local_indices_and_purge_checkpoints(ShardNames) + Sigs = couch_mrview_util:get_signatures(hd(ShardNames)), + couch_mrview_cleanup:cleanup_peer_checkpoints(dbname(DbName), Sigs), + cleanup_local_indices_and_purge_checkpoints(Sigs, ShardNames) catch error:database_does_not_exist -> ok end. -cleanup_local_indices_and_purge_checkpoints([]) -> +cleanup_local_indices_and_purge_checkpoints(_Sigs, []) -> ok; -cleanup_local_indices_and_purge_checkpoints([_ | _] = Dbs) -> +cleanup_local_indices_and_purge_checkpoints(Sigs, [_ | _] = Dbs) -> AllIndices = lists:map(fun couch_mrview_util:get_index_files/1, Dbs), AllPurges = lists:map(fun couch_mrview_util:get_purge_checkpoints/1, Dbs), - Sigs = couch_mrview_util:get_signatures(hd(Dbs)), ok = cleanup_purges(Sigs, AllPurges, Dbs), ok = cleanup_indices(Sigs, AllIndices). diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl index 5461404c5..08d3747aa 100644 --- a/src/fabric/src/fabric_db_info.erl +++ b/src/fabric/src/fabric_db_info.erl @@ -105,6 +105,8 @@ merge_results(Info) -> [{doc_count, lists:sum(X)} | Acc]; (doc_del_count, X, Acc) -> [{doc_del_count, lists:sum(X)} | Acc]; + (drop_count, X, Acc) -> + [{drop_count, lists:sum(X)} | Acc]; (compact_running, X, Acc) -> [{compact_running, lists:member(true, X)} | Acc]; (sizes, X, Acc) -> diff --git a/src/fabric/src/fabric_drop_seq.erl b/src/fabric/src/fabric_drop_seq.erl index c00ea24ea..f01bc3fee 100644 --- a/src/fabric/src/fabric_drop_seq.erl +++ b/src/fabric/src/fabric_drop_seq.erl @@ -6,12 +6,25 @@ -export([go/1]). +-export([ + create_peer_checkpoint_doc_if_missing/5, + update_peer_checkpoint_doc/5, + cleanup_peer_checkpoint_docs/3, + peer_checkpoint_doc/4, + peer_id_from_sig/2 +]). + +%% rpc +-export([gather_drop_seq_info_rpc/1]). + -type range() :: [non_neg_integer()]. -type uuid() :: binary(). -type seq() :: non_neg_integer(). +-type uuid_map() :: #{{Range :: range(), Node :: node()} => uuid()}. + -type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(), Seq :: seq()}}. -type history_item() :: { @@ -23,69 +36,114 @@ }. go(DbName) -> - {PeerCheckpoints0, ShardSyncHistory} = parse_local_docs(DbName), - ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory), - PeerCheckpoints1 = maps:merge_with(fun merge_peers/3, PeerCheckpoints0, ShardSyncCheckpoints), - PeerCheckpoints2 = crossref(PeerCheckpoints1, ShardSyncHistory), - Shards = mem3:live_shards(DbName, [node() | nodes()]), - RexiMon = fabric_util:create_monitors(Shards), + Shards0 = mem3:shards(DbName), + #{ + uuid_map := UuidMap, + peer_checkpoints := PeerCheckpoints, + shard_sync_history := ShardSyncHistory + } = gather_drop_seq_info( + Shards0 + ), + {Shards1, DropSeqs} = go_int( + Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory + ), Workers = lists:filtermap( fun(Shard) -> #shard{range = Range, node = Node, name = ShardName} = Shard, - case maps:find({Range, Node}, PeerCheckpoints2) of + case maps:find({Range, Node}, DropSeqs) of + {ok, {_UuidPrefix, 0}} -> + false; {ok, {UuidPrefix, DropSeq}} -> Ref = rexi:cast( Node, {fabric_rpc, set_drop_seq, [ShardName, UuidPrefix, DropSeq, [?ADMIN_CTX]]} ), - {true, Shard#shard{ref = Ref}}; + {true, Shard#shard{ref = Ref, opts = [{drop_seq, DropSeq}]}}; error -> false end end, - Shards + Shards1 ), - Acc0 = {Workers, length(Workers) - 1}, - try - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, ok} -> - ok; - {timeout, {WorkersDict, _}} -> - DefunctWorkers = fabric_util:remove_done_workers( - WorkersDict, - nil - ), - fabric_util:log_timeout( - DefunctWorkers, - "set_drop_seq" - ), - {error, timeout}; - {error, Reason} -> - {error, Reason} - end - after - rexi_monitor:stop(RexiMon) + if + Workers == [] -> + %% nothing to do + {ok, #{}}; + true -> + RexiMon = fabric_util:create_monitors(Shards1), + Acc0 = {#{}, length(Workers) - 1}, + try + case fabric_util:recv(Workers, #shard.ref, fun handle_set_drop_seq_reply/3, Acc0) of + {ok, Results} -> + {ok, Results}; + {timeout, {WorkersDict, _}} -> + DefunctWorkers = fabric_util:remove_done_workers( + WorkersDict, + nil + ), + fabric_util:log_timeout( + DefunctWorkers, + "set_drop_seq" + ), + {error, timeout}; + {error, Reason} -> + {error, Reason} + end + after + rexi_monitor:stop(RexiMon) + end end. -handle_message(ok, _Worker, {_Workers, 0}) -> - {stop, ok}; -handle_message(ok, Worker, {Workers, Waiting}) -> - {ok, {lists:delete(Worker, Workers), Waiting - 1}}; -handle_message(Error, _, _Acc) -> +go_int(Shards0, UuidFetcher, PeerCheckpoints0, ShardSyncHistory) -> + Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory), + PeerCheckpoints1 = crossref(PeerCheckpoints0, ShardSyncHistory), + PeerCheckpoints2 = substitute_splits(Shards1, UuidFetcher, PeerCheckpoints1), + DropSeqs = calculate_drop_seqs(PeerCheckpoints2, ShardSyncHistory), + {Shards1, DropSeqs}. + +-spec calculate_drop_seqs(peer_checkpoints(), shard_sync_history()) -> + peer_checkpoints(). +calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory) -> + ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory), + PeerCheckpoints1 = maps:merge_with(fun merge_peers/3, PeerCheckpoints0, ShardSyncCheckpoints), + crossref(PeerCheckpoints1, ShardSyncHistory). + +handle_set_drop_seq_reply(ok, Worker, {Results0, Waiting}) -> + DropSeq = proplists:get_value(drop_seq, Worker#shard.opts), + [B, E] = Worker#shard.range, + BHex = couch_util:to_hex(<<B:32/integer>>), + EHex = couch_util:to_hex(<<E:32/integer>>), + Range = list_to_binary([BHex, "-", EHex]), + Results1 = maps:merge_with( + fun(_Key, Val1, Val2) -> + maps:merge(Val1, Val2) + end, + Results0, + #{Range => #{Worker#shard.node => DropSeq}} + ), + if + Waiting == 0 -> + {stop, Results1}; + true -> + {ok, {Results1, Waiting - 1}} + end; +handle_set_drop_seq_reply(Error, _, _Acc) -> {error, Error}. crossref(PeerCheckpoints0, ShardSyncHistory) -> PeerCheckpoints1 = maps:fold( fun({Range, Node}, {Uuid, Seq}, Acc1) -> Others = maps:filter( - fun({R, _S, T}, _V) -> R == Range andalso T /= Node end, ShardSyncHistory + fun({R, _S, T}, _History) -> R == Range andalso T /= Node end, ShardSyncHistory ), maps:fold( - fun({R, _S, T}, H, Acc2) -> + fun({R, _S, T}, History, Acc2) -> case lists:search( - fun({SU, SS, _TU, _TS}) -> Uuid == SU andalso SS =< Seq end, - H + fun({SU, SS, _TU, _TS}) -> + uuids_match([Uuid, SU]) andalso SS =< Seq + end, + History ) of {value, {_SU, _SS, TU, TS}} -> @@ -107,29 +165,114 @@ crossref(PeerCheckpoints0, ShardSyncHistory) -> %% crossreferences may be possible. if PeerCheckpoints0 == PeerCheckpoints1 -> - PeerCheckpoints1; + %% insert {<<>>, 0} for any missing crossref so that shard sync + %% history is subordinate. + maps:fold( + fun({Range, Node}, {_Uuid, _Seq}, Acc1) -> + Others = maps:filter( + fun({R, _S, T}, _History) -> R == Range andalso T /= Node end, + ShardSyncHistory + ), + maps:fold( + fun({R, _S, T}, _History, Acc3) -> + maps:merge(#{{R, T} => {<<>>, 0}}, Acc3) + end, + Acc1, + Others + ) + end, + PeerCheckpoints1, + PeerCheckpoints1 + ); true -> crossref(PeerCheckpoints1, ShardSyncHistory) end. --spec parse_local_docs(DbName :: binary()) -> {peer_checkpoints(), shard_sync_history()}. -parse_local_docs(DbName) -> - {ok, Result} = fabric:all_docs( - DbName, fun parse_local_docs_cb/2, {#{}, #{}}, all_docs_mrargs() +%% return only the shards that have synced with every other replica +fully_replicated_shards_only(Shards, ShardSyncHistory) -> + lists:filter( + fun(#shard{range = Range, node = Node}) -> + ExpectedPeers = [ + S#shard.node + || S <- Shards, S#shard.range == Range, S#shard.node /= Node + ], + ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers], + lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end, ExpectedKeys) + end, + Shards + ). + +-spec gather_drop_seq_info(Shards :: [#shard{}]) -> {peer_checkpoints(), shard_sync_history()}. +gather_drop_seq_info([#shard{} | _] = Shards) -> + Workers = fabric_util:submit_jobs( + Shards, ?MODULE, gather_drop_seq_info_rpc, [] ), - Result. + RexiMon = fabric_util:create_monitors(Workers), + Acc0 = #{uuid_map => #{}, peer_checkpoints => #{}, shard_sync_history => #{}}, + try + case + rexi_utils:recv( + Workers, + #shard.ref, + fun gather_drop_seq_info_cb/3, + {Acc0, length(Workers) - 1}, + fabric_util:request_timeout(), + infinity + ) + of + {ok, Result} -> + Result; + {timeout, _State} -> + {error, timeout}; + {error, Reason} -> + {error, Reason} + end + after + rexi_monitor:stop(RexiMon), + fabric_streams:cleanup(Workers) + end. -parse_local_docs_cb({row, Row}, Acc) -> - case lists:keyfind(doc, 1, Row) of - false -> +gather_drop_seq_info_rpc(DbName) -> + case couch_db:open_int(DbName, []) of + {ok, Db} -> + try + Uuid = couch_db:get_uuid(Db), + Acc0 = {#{}, #{}}, + {ok, {PeerCheckpoints, ShardSyncHistory}} = couch_db:fold_local_docs( + Db, fun gather_drop_seq_info_fun/2, Acc0, [] + ), + rexi:reply( + {ok, #{ + uuid => Uuid, + peer_checkpoints => PeerCheckpoints, + shard_sync_history => ShardSyncHistory + }} + ) + after + couch_db:close(Db) + end; + Else -> + rexi:reply(Else) + end. + +gather_drop_seq_info_fun( + #doc{id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", _/binary>>} = Doc, + {PeerCheckpoints0, ShardSyncHistory} = Acc +) -> + {Props} = Doc#doc.body, + case couch_util:get_value(<<"update_seq">>, Props) of + undefined -> {ok, Acc}; - {doc, Doc} -> - parse_local_doc(couch_doc:from_json_obj(Doc), Acc) + UpdateSeq -> + PeerCheckpoints1 = maps:merge_with( + fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0 + ), + {ok, {PeerCheckpoints1, ShardSyncHistory}} end; -parse_local_docs_cb(_Else, Acc) -> - {ok, Acc}. - -parse_local_doc(#doc{id = <<"_local/shard-sync-", _/binary>>} = Doc, Acc) -> +gather_drop_seq_info_fun( + #doc{id = <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>>} = Doc, + {PeerCheckpoints, ShardSyncHistory0} = Acc +) -> {Props} = Doc#doc.body, case couch_util:get_value(<<"dbname">>, Props) of undefined -> @@ -138,7 +281,6 @@ parse_local_doc(#doc{id = <<"_local/shard-sync-", _/binary>>} = Doc, Acc) -> DbName -> Range = mem3:range(DbName), {[{_SrcNode, History}]} = couch_util:get_value(<<"history">>, Props), - {PeerCheckpoints, ShardSyncHistory0} = Acc, KeyFun = fun({Item}) -> {Range, binary_to_existing_atom(couch_util:get_value(<<"source_node">>, Item)), binary_to_existing_atom(couch_util:get_value(<<"target_node">>, Item))} @@ -151,35 +293,55 @@ parse_local_doc(#doc{id = <<"_local/shard-sync-", _/binary>>} = Doc, Acc) -> couch_util:get_value(<<"target_seq">>, Item) } end, - ShardSyncHistory1 = maps:merge( maps:groups_from_list(KeyFun, ValueFun, History), ShardSyncHistory0 ), {ok, {PeerCheckpoints, ShardSyncHistory1}} end; -parse_local_doc(#doc{id = <<"_local/peer-checkpoint-", _/binary>>} = Doc, Acc) -> - {Props} = Doc#doc.body, - case couch_util:get_value(<<"update_seq">>, Props) of - undefined -> - {ok, Acc}; - UpdateSeq -> - {PeerCheckpoints0, ShardSyncHistory} = Acc, - PeerCheckpoints1 = maps:merge_with( - fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0 - ), - {ok, {PeerCheckpoints1, ShardSyncHistory}} - end; -parse_local_doc(_Doc, Acc) -> +gather_drop_seq_info_fun(#doc{}, Acc) -> + %% ignored {ok, Acc}. -merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when is_integer(Val1), is_integer(Val2) -> - PrefixLen = min(byte_size(Uuid1), byte_size(Uuid2)), - case binary:longest_common_prefix([Uuid1, Uuid2]) == PrefixLen of +gather_drop_seq_info_cb({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) -> + {ok, {Acc, Count - 1}}; +gather_drop_seq_info_cb({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) -> + {ok, {Acc, Count - 1}}; +gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count}) -> + MergedInfo = merge_info(Worker, Info, Acc), + if + Count == 0 -> + {stop, MergedInfo}; true -> - {Uuid1, min(Val1, Val2)}; - false -> - {Uuid2, Val2} - end. + {ok, {MergedInfo, Count - 1}} + end; +gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) -> + {ok, {Acc, Count - 1}}. + +merge_info(#shard{} = Shard, Info, Acc) -> + #{ + uuid_map => + maps:put( + {Shard#shard.range, Shard#shard.node}, maps:get(uuid, Info), maps:get(uuid_map, Acc) + ), + peer_checkpoints => maps:merge_with( + fun merge_peers/3, + maps:get(peer_checkpoints, Info), + maps:get(peer_checkpoints, Acc) + ), + shard_sync_history => maps:merge( + maps:get(shard_sync_history, Info), maps:get(shard_sync_history, Acc) + ) + }. + +merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when + is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2) +-> + true = uuids_match([Uuid1, Uuid2]), + {Uuid1, min(Val1, Val2)}. + +uuids_match(Uuids) when is_list(Uuids) -> + PrefixLen = lists:min([byte_size(Uuid) || Uuid <- Uuids]), + binary:longest_common_prefix(Uuids) == PrefixLen. decode_seq(OpaqueSeq) -> Decoded = fabric_view_changes:decode_seq(OpaqueSeq), @@ -203,16 +365,6 @@ decode_seq(OpaqueSeq) -> Decoded ). -all_docs_mrargs() -> - #mrargs{ - view_type = map, - include_docs = true, - extra = [ - {include_system, true}, - {namespace, <<"_local">>} - ] - }. - latest_shard_sync_checkpoints(ShardSyncHistory) -> maps:fold( fun({R, SN, _TN}, History, Acc) -> @@ -222,3 +374,541 @@ latest_shard_sync_checkpoints(ShardSyncHistory) -> #{}, ShardSyncHistory ). + +%% A shard may have been split since a peer saw it. +-spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) -> peer_checkpoints(). +substitute_splits(Shards, UuidMap, PeerCheckpoints) -> + maps:fold( + fun({[B1, E1], Node}, {Uuid, Seq}, Acc) -> + ShardsInRange = [ + S + || #shard{range = [B2, E2]} = S <- Shards, + Node == S#shard.node, + B2 >= B1 andalso E2 =< E1 + ], + %% lookup uuid from map if substituted + AsMap = maps:from_list( + lists:filtermap( + fun(#shard{} = Shard) -> + Key = {Shard#shard.range, Shard#shard.node}, + if + [B1, E1] == Shard#shard.range -> + {true, {Key, {Uuid, Seq}}}; + true -> + case maps:find(Key, UuidMap) of + {ok, SubstUuid} -> + {true, {Key, {SubstUuid, Seq}}}; + error -> + false + end + end + end, + ShardsInRange + ) + ), + maps:merge_with(fun merge_peers/3, AsMap, Acc) + end, + #{}, + PeerCheckpoints + ). + +create_peer_checkpoint_doc_if_missing( + <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq +) when + is_binary(DbName), is_binary(PeerId), is_integer(UpdateSeq) +-> + create_peer_checkpoint_doc_if_missing( + DbName, Subtype, Source, PeerId, pack_seq(DbName, UpdateSeq) + ); +create_peer_checkpoint_doc_if_missing( + DbName, Subtype, Source, PeerId, UpdateSeq +) when + is_binary(DbName), + is_binary(Subtype), + is_binary(Source), + is_binary(PeerId), + is_integer(UpdateSeq) +-> + ok; +create_peer_checkpoint_doc_if_missing( + <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq +) when + is_binary(DbName), + is_binary(Subtype), + is_binary(Source), + is_binary(PeerId), + is_binary(UpdateSeq) +-> + {_, Ref} = spawn_monitor(fun() -> + case + fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(Subtype, PeerId), [?ADMIN_CTX]) + of + {ok, _} -> + ok; + {not_found, _} -> + update_peer_checkpoint_doc(DbName, Subtype, Source, PeerId, UpdateSeq); + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end + end), + receive + {'DOWN', Ref, _, _, ok} -> + ok; + {'DOWN', Ref, _, _, Else} -> + Else + end. + +update_peer_checkpoint_doc( + <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq +) when + is_binary(DbName), + is_binary(Subtype), + is_binary(Source), + is_binary(PeerId), + is_integer(UpdateSeq) +-> + update_peer_checkpoint_doc(DbName, Subtype, Source, PeerId, pack_seq(DbName, UpdateSeq)); +update_peer_checkpoint_doc( + DbName, Subtype, Source, PeerId, UpdateSeq +) when + is_binary(DbName), + is_binary(Subtype), + is_binary(Source), + is_binary(PeerId), + is_integer(UpdateSeq) +-> + ok; +update_peer_checkpoint_doc( + <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq +) when + is_binary(DbName), + is_binary(Subtype), + is_binary(Source), + is_binary(PeerId), + is_binary(UpdateSeq) +-> + {_, OpenRef} = spawn_monitor(fun() -> + case + fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(Subtype, PeerId), [?ADMIN_CTX]) + of + {ok, ExistingDoc} -> + exit(ExistingDoc#doc.revs); + {not_found, _Reason} -> + exit({0, []}); + {error, Reason} -> + throw({checkpoint_fetch_failure, Reason}) + end + end), + receive + {'DOWN', OpenRef, _, _, Revs} -> + NewDoc0 = peer_checkpoint_doc(PeerId, Subtype, Source, UpdateSeq), + NewDoc1 = NewDoc0#doc{revs = Revs}, + {_, UpdateRef} = spawn_monitor(fun() -> + case fabric:update_doc(mem3:dbname(DbName), NewDoc1, [?ADMIN_CTX]) of + {ok, _} -> + couch_log:notice( + "updated peer checkpoint for db:~s, subtype:~s, source:~s, peer_id:~s, update_seq:~s", + [ + DbName, Subtype, Source, PeerId, UpdateSeq + ] + ), + ok; + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end + end), + receive + {'DOWN', UpdateRef, _, _, ok} -> + ok; + {'DOWN', UpdateRef, _, _, Else} -> + Else + end; + {'DOWN', OpenRef, _, _, not_found} -> + ok; + {'DOWN', OpenRef, _, _, Else} -> + Else + end. + +peer_checkpoint_doc(PeerId, Subtype, Source, UpdateSeq) when + is_binary(PeerId), is_binary(Subtype), is_binary(Source), is_binary(UpdateSeq) +-> + #doc{ + id = peer_checkpoint_id(Subtype, PeerId), + body = + {[ + {<<"type">>, <<"peer-checkpoint">>}, + {<<"subtype">>, Subtype}, + {<<"source">>, Source}, + {<<"update_seq">>, UpdateSeq}, + {<<"last_updated">>, ?l2b(couch_log_util:iso8601_timestamp())} + ]} + }. + +peer_checkpoint_id(Subtype, PeerId) -> + <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", Subtype/binary, "-", PeerId/binary>>. + +peer_id_from_sig(DbName, Sig) when is_binary(DbName), is_binary(Sig) -> + Hash = couch_util:encodeBase64Url( + crypto:hash(sha256, [atom_to_binary(node()), $0, DbName]) + ), + <<Sig/binary, "$", Hash/binary>>. + +pack_seq(DbName, UpdateSeq) -> + PrefixLen = fabric_util:get_uuid_prefix_len(), + DbUuid = couch_util:with_db(DbName, fun(Db) -> couch_db:get_uuid(Db) end), + fabric_view_changes:pack_seqs( + [ + { + #shard{node = node(), range = mem3:range(DbName)}, + {UpdateSeq, binary:part(DbUuid, {0, PrefixLen}), node()} + } + ] + ). + +cleanup_peer_checkpoint_docs(DbName, SubType, KeepSigs) when + is_binary(DbName), is_binary(SubType), is_list(KeepSigs) +-> + MrArgs = #mrargs{ + view_type = map, + include_docs = true, + start_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, "-">>, + end_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, ".">>, + inclusive_end = false, + extra = [ + {include_system, true}, + {namespace, <<"_local">>} + ] + }, + {ok, {SubType, KeepSigs, DocsToDelete}} = fabric:all_docs( + DbName, fun cleanup_peer_checkpoints_cb/2, {SubType, KeepSigs, []}, MrArgs + ), + {ok, _} = fabric:update_docs(DbName, DocsToDelete, [?ADMIN_CTX]). + +cleanup_peer_checkpoints_cb({row, Row}, {SubType, KeepSigs, DocsToDelete} = Acc) -> + {doc, JsonDoc} = lists:keyfind(doc, 1, Row), + Doc = couch_doc:from_json_obj(JsonDoc), + #doc{ + id = + <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType:(byte_size(SubType))/binary, "-", + SigHash/binary>> + } = Doc, + [Sig, _Hash] = binary:split(SigHash, <<"$">>), + case lists:member(Sig, KeepSigs) of + true -> + {ok, Acc}; + false -> + {ok, {SubType, KeepSigs, [Doc#doc{deleted = true, body = {[]}} | DocsToDelete]}} + end; +cleanup_peer_checkpoints_cb(_Else, Acc) -> + {ok, Acc}. + +-ifdef(TEST). +-include_lib("couch/include/couch_eunit.hrl"). + +empty_sync_history_means_no_change_test() -> + Range = [0, 10], + Node1 = '[email protected]', + PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, + ShardSyncHistory = #{}, + ?assertEqual(PeerCheckpoints, calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)). + +matching_sync_history_expands_result_test() -> + Range = [0, 10], + Node1 = '[email protected]', + Node2 = '[email protected]', + PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, + ShardSyncHistory = #{{Range, Node1, Node2} => [{<<"uuid1">>, 12, <<"uuid2">>, 5}]}, + ?assertEqual( + #{ + {Range, Node1} => {<<"uuid1">>, 12}, + {Range, Node2} => {<<"uuid2">>, 5} + }, + calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + ). + +transitive_sync_history_expands_result_test() -> + Range = [0, 10], + Node1 = '[email protected]', + Node2 = '[email protected]', + Node3 = '[email protected]', + PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, + ShardSyncHistory = #{ + {Range, Node1, Node2} => [{<<"uuid1">>, 12, <<"uuid2">>, 5}], + {Range, Node2, Node3} => [{<<"uuid2">>, 11, <<"uuid3">>, 11}] + }, + ?assertEqual( + #{ + {Range, Node1} => {<<"uuid1">>, 12}, + {Range, Node2} => {<<"uuid2">>, 5}, + {Range, Node3} => {<<"uuid3">>, 11} + }, + calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + ). + +shard_sync_history_caps_peer_checkpoint_test() -> + Range = [0, 10], + Node1 = '[email protected]', + Node2 = '[email protected]', + PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, + ShardSyncHistory = #{{Range, Node1, Node2} => [{<<"uuid1">>, 10, <<"uuid2">>, 5}]}, + ?assertEqual( + #{ + {Range, Node1} => {<<"uuid1">>, 10}, + {Range, Node2} => {<<"uuid2">>, 5} + }, + calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + ). + +multiple_range_test() -> + Range1 = [0, 10], + Range2 = [11, 20], + Node1 = '[email protected]', + Node2 = '[email protected]', + PeerCheckpoints = #{{Range1, Node1} => {<<"r1n1">>, 12}, {Range2, Node2} => {<<"r2n2">>, 20}}, + ShardSyncHistory = #{ + {Range1, Node1, Node2} => [{<<"r1n1">>, 10, <<"r1n2">>, 5}], + {Range2, Node2, Node1} => [{<<"r2n2">>, 19, <<"r2n1">>, 17}] + }, + ?assertEqual( + #{ + {Range1, Node1} => {<<"r1n1">>, 10}, + {Range1, Node2} => {<<"r1n2">>, 5}, + {Range2, Node2} => {<<"r2n2">>, 19}, + {Range2, Node1} => {<<"r2n1">>, 17} + }, + calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + ). + +search_history_for_latest_safe_crossover_test() -> + Range = [0, 10], + Node1 = '[email protected]', + Node2 = '[email protected]', + PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 50}}, + ShardSyncHistory = #{ + {Range, Node1, Node2} => [ + {<<"uuid1">>, 100, <<"uuid2">>, 99}, + {<<"uuid1">>, 75, <<"uuid2">>, 76}, + {<<"uuid1">>, 50, <<"uuid2">>, 51}, + {<<"uuid1">>, 40, <<"uuid2">>, 41} + ] + }, + ?assertEqual( + #{ + {Range, Node1} => {<<"uuid1">>, 50}, + {Range, Node2} => {<<"uuid2">>, 51} + }, + calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + ). + +fully_replicated_shards_only_test_() -> + Range1 = [0, 1], + Range2 = [1, 2], + Shards = [ + #shard{node = node1, range = Range1}, + #shard{node = node2, range = Range1}, + #shard{node = node3, range = Range1}, + #shard{node = node1, range = Range2}, + #shard{node = node2, range = Range2} + ], + [ + %% empty history means no fully replicated shards + ?_assertEqual([], fully_replicated_shards_only(Shards, #{})), + %% some but not all peers + ?_assertEqual( + [], + fully_replicated_shards_only(Shards, #{ + {Range1, node2, node1} => {0, <<>>} + }) + ), + %% all peers of one replica + ?_assertEqual( + [#shard{node = node1, range = Range1}], + fully_replicated_shards_only(Shards, #{ + {Range1, node2, node1} => {0, <<>>}, + {Range1, node3, node1} => {0, <<>>} + }) + ), + %% all peers of one range + ?_assertEqual( + [ + #shard{node = node1, range = Range1}, + #shard{node = node2, range = Range1}, + #shard{node = node3, range = Range1} + ], + fully_replicated_shards_only(Shards, #{ + {Range1, node2, node1} => {0, <<>>}, + {Range1, node3, node1} => {0, <<>>}, + {Range1, node1, node2} => {0, <<>>}, + {Range1, node3, node2} => {0, <<>>}, + {Range1, node1, node3} => {0, <<>>}, + {Range1, node2, node3} => {0, <<>>} + }) + ) + ]. + +substitute_splits_test() -> + Range = [0, 10], + Subrange1 = [0, 5], + Subrange2 = [6, 10], + Node1 = '[email protected]', + Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = Subrange2, node = Node1}], + UuidMap = #{ + {Subrange1, Node1} => <<"uuid2">>, + {Subrange2, Node1} => <<"uuid3">> + }, + PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, + + ?assertEqual( + #{{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => {<<"uuid3">>, 12}}, + substitute_splits(Shards, UuidMap, PeerCheckpoints) + ). + +crossref_test_() -> + Range = [0, 10], + Node1 = '[email protected]', + Node2 = '[email protected]', + Node3 = '[email protected]', + [ + ?_assertEqual(#{}, crossref(#{}, #{})), + ?_assertEqual( + #{ + {Range, Node1} => {<<"n1">>, 5}, + {Range, Node2} => {<<"n2">>, 4}, + {Range, Node3} => {<<"n3">>, 3} + }, + crossref( + #{{Range, Node1} => {<<"n1">>, 5}}, + #{ + {Range, Node1, Node2} => [ + {<<"n1">>, 10, <<"n2">>, 9}, + {<<"n1">>, 5, <<"n2">>, 4}, + {<<"n1">>, 2, <<"n2">>, 1} + ], + {Range, Node1, Node3} => [ + {<<"n1">>, 9, <<"n3">>, 8}, + {<<"n1">>, 4, <<"n3">>, 3}, + {<<"n1">>, 3, <<"n3">>, 2} + ] + } + ) + ), + ?_assertEqual( + #{ + {Range, Node1} => {<<"n1">>, 5}, + {Range, Node2} => {<<"n2x">>, 4}, + {Range, Node3} => {<<"n3x">>, 3} + }, + crossref( + #{{Range, Node1} => {<<"n1">>, 5}}, + #{ + {Range, Node1, Node2} => [ + {<<"n1x">>, 10, <<"n2x">>, 9}, + {<<"n1x">>, 5, <<"n2x">>, 4}, + {<<"n1x">>, 2, <<"n2x">>, 1} + ], + {Range, Node1, Node3} => [ + {<<"n1x">>, 9, <<"n3x">>, 8}, + {<<"n1x">>, 4, <<"n3x">>, 3}, + {<<"n1x">>, 3, <<"n3x">>, 2} + ] + } + ) + ) + ]. + +go_int_test_() -> + Range = [0, 10], + Subrange1 = [0, 5], + Subrange2 = [6, 10], + Node1 = '[email protected]', + Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = Subrange2, node = Node1}], + UuidMap = #{ + {Subrange1, Node1} => <<"uuid2">>, + {Subrange2, Node1} => <<"uuid3">> + }, + [ + ?_assertEqual( + {Shards, #{ + {Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => {<<"uuid3">>, 12} + }}, + go_int(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}}, #{}) + ), + ?_assertEqual( + {Shards, #{ + {Subrange1, Node1} => {<<"uuid2">>, 10}, {Subrange2, Node1} => {<<"uuid3">>, 12} + }}, + go_int( + Shards, + UuidMap, + #{{Range, Node1} => {<<"uuid1">>, 12}, {Subrange1, Node1} => {<<"uuid2">>, 10}}, + #{} + ) + ) + ]. + +go_int2_test_() -> + Range = [0, 10], + Subrange1 = [0, 5], + Subrange2 = [6, 10], + Node1 = '[email protected]', + Node2 = '[email protected]', + Shards = [ + #shard{range = Subrange1, node = Node1}, + #shard{range = Subrange2, node = Node1}, + #shard{range = Subrange1, node = Node2}, + #shard{range = Subrange2, node = Node2} + ], + UuidMap = #{ + {Subrange1, Node1} => <<"s1n1">>, + {Subrange2, Node1} => <<"s2n1">>, + {Subrange1, Node2} => <<"s1n2">>, + {Subrange2, Node2} => <<"s2n2">> + }, + ShardSyncHistory = + #{ + {Subrange1, Node1, Node2} => [ + {<<"s1n1">>, 100, <<"s1n2">>, 99}, + {<<"s1n1">>, 75, <<"s1n2">>, 76}, + {<<"s1n1">>, 50, <<"s1n2">>, 51}, + {<<"s1n1">>, 12, <<"s1n2">>, 11} + ], + {Subrange2, Node1, Node2} => [ + {<<"s2n1">>, 100, <<"s2n2">>, 99}, + {<<"s2n1">>, 75, <<"s2n2">>, 76}, + {<<"s2n1">>, 50, <<"s2n2">>, 51}, + {<<"s2n1">>, 12, <<"s2n2">>, 11} + ], + {Subrange1, Node2, Node1} => [ + {<<"s1n2">>, 100, <<"s1n1">>, 99}, + {<<"s1n2">>, 75, <<"s1n1">>, 76}, + {<<"s1n2">>, 50, <<"s1n1">>, 51}, + {<<"s1n2">>, 12, <<"s1n1">>, 11} + ], + {Subrange2, Node2, Node1} => [ + {<<"s2n2">>, 100, <<"s2n1">>, 99}, + {<<"s2n2">>, 75, <<"s2n1">>, 76}, + {<<"s2n2">>, 50, <<"s2n1">>, 51}, + {<<"s2n2">>, 12, <<"s2n1">>, 11} + ] + }, + [ + ?_assertEqual( + #{ + {Subrange1, Node1} => {<<"s1n1">>, 12}, + {Subrange2, Node1} => {<<"s2n1">>, 12}, + {Subrange1, Node2} => {<<"s1n2">>, 11}, + {Subrange2, Node2} => {<<"s2n2">>, 11} + }, + element( + 2, + go_int( + Shards, + UuidMap, + #{{Range, Node1} => {<<"ignored">>, 12}}, + ShardSyncHistory + ) + ) + ) + ]. + +-endif. diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl index 41e225d22..5881c9f12 100644 --- a/src/mem3/src/mem3_reshard_index.erl +++ b/src/mem3/src/mem3_reshard_index.erl @@ -130,7 +130,13 @@ build_index({?MRVIEW, DbName, MRSt} = Ctx, Try) -> Try ); build_index({?NOUVEAU, _DbName, DIndex} = Ctx, Try) -> - UpdateFun = fun() -> nouveau_index_updater:update(DIndex) end, + UpdateFun = fun() -> + {IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, update, [DIndex]), + receive + {'DOWN', IndexerRef, process, IndexerPid, Reason} -> + Reason + end + end, retry_loop(Ctx, UpdateFun, Try); build_index({?DREYFUS, DbName, DIndex} = Ctx, Try) -> await_retry( @@ -197,6 +203,8 @@ index_info({?MRVIEW, DbName, MRSt}) -> {DbName, GroupName}; index_info({?DREYFUS, DbName, Index}) -> {DbName, Index}; +index_info({?NOUVEAU, DbName, Index}) -> + {DbName, Index}; index_info({?HASTINGS, DbName, Index}) -> {DbName, Index}. diff --git a/src/nouveau/src/nouveau_fabric_cleanup.erl b/src/nouveau/src/nouveau_fabric_cleanup.erl index cd4128fb1..f010eef20 100644 --- a/src/nouveau/src/nouveau_fabric_cleanup.erl +++ b/src/nouveau/src/nouveau_fabric_cleanup.erl @@ -31,6 +31,7 @@ go(DbName) -> ) ), Shards = mem3:shards(DbName), + fabric_drop_seq:cleanup_peer_checkpoint_docs(DbName, <<"nouveau">>, ActiveSigs), lists:foreach( fun(Shard) -> rexi:cast(Shard#shard.node, {nouveau_rpc, cleanup, [Shard#shard.name, ActiveSigs]}) diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index efed245db..6ba3c3738 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -87,6 +87,13 @@ update(#index{} = Index) -> {ok, PurgeAcc1} = purge_index(ConnPid, Db, Index, PurgeAcc0), NewCurSeq = couch_db:get_update_seq(Db), + fabric_drop_seq:create_peer_checkpoint_doc_if_missing( + Index#index.dbname, + <<"nouveau">>, + <<(Index#index.ddoc_id)/binary, "/", (Index#index.name)/binary>>, + fabric_drop_seq:peer_id_from_sig(Index#index.dbname, Index#index.sig), + NewCurSeq + ), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -107,6 +114,16 @@ update(#index{} = Index) -> Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), nouveau_api:drain_async_responses(Acc1#acc.reqids, 0), + {ok, #{<<"committed_update_seq">> := CommittedUpdateSeq}} = nouveau_api:index_info( + Index + ), + fabric_drop_seq:update_peer_checkpoint_doc( + Index#index.dbname, + <<"nouveau">>, + <<(Index#index.ddoc_id)/binary, "/", (Index#index.name)/binary>>, + fabric_drop_seq:peer_id_from_sig(Index#index.dbname, Index#index.sig), + CommittedUpdateSeq + ), exit(nouveau_api:set_update_seq(ConnPid, Index, Acc1#acc.update_seq, NewCurSeq)) after ibrowse:stop_worker_process(ConnPid), diff --git a/test/elixir/lib/utils.ex b/test/elixir/lib/utils.ex index 3ecf878e7..15e321240 100644 --- a/test/elixir/lib/utils.ex +++ b/test/elixir/lib/utils.ex @@ -58,4 +58,4 @@ defmodule Couch.Test.Utils do end) end -end \ No newline at end of file +end diff --git a/test/elixir/test/config/nouveau.elixir b/test/elixir/test/config/nouveau.elixir index 02157e2ca..14c22b24f 100644 --- a/test/elixir/test/config/nouveau.elixir +++ b/test/elixir/test/config/nouveau.elixir @@ -1,4 +1,7 @@ %{ + "DropSeqTest": [ + "peer checkpoint - nouveau" + ], "NouveauTest": [ "user-agent header is forbidden", "search analyze", diff --git a/test/elixir/test/config/search.elixir b/test/elixir/test/config/search.elixir index b49b8d762..438a8bd2c 100644 --- a/test/elixir/test/config/search.elixir +++ b/test/elixir/test/config/search.elixir @@ -1,4 +1,7 @@ %{ + "DropSeqTest": [ + "peer checkpoint - search" + ], "PartitionSearchTest": [ "Cannot do global query with partition view", "Cannot do partition query with global search ddoc", diff --git a/test/elixir/test/config/suite.elixir b/test/elixir/test/config/suite.elixir index 1d1e6059a..f283096c2 100644 --- a/test/elixir/test/config/suite.elixir +++ b/test/elixir/test/config/suite.elixir @@ -211,6 +211,11 @@ "design doc path", "design doc path with slash in db name" ], + "DropSeqTest": [ + "peer checkpoint - mrview", + "peer checkpoint - custom", + "peer checkpoint - shard split" + ], "ErlangViewsTest": [ "Erlang map function", "Erlang reduce function", diff --git a/test/elixir/test/drop_seq_statem_test.exs b/test/elixir/test/drop_seq_statem_test.exs new file mode 100644 index 000000000..1e509df0b --- /dev/null +++ b/test/elixir/test/drop_seq_statem_test.exs @@ -0,0 +1,415 @@ +defmodule DropSeqStateM do + use PropCheck, default_opts: &PropCheck.TestHelpers.config/0 + use PropCheck.StateM + use CouchTestCase + + # alias Couch.Test.Utils + # import Utils + + @moduletag capture_log: true + + # expected to pass in all three cluster scenarios + @moduletag :with_cluster + @moduletag :without_quorum_test + @moduletag :with_quorum_test + + property "_update_drop_seq works correctly", start_size: 5, max_size: 100, numtests: 10000 do + forall cmds <- commands(__MODULE__) do + trap_exit do + db_name = random_db_name() + n = Enum.random(1..3) + q = Enum.random(1..10) + {:ok, _} = create_db(db_name, query: %{n: n, q: q}) + r = run_commands(__MODULE__, cmds, [{:dbname, db_name}]) + {history, state, result} = r + delete_db(db_name) + + (result == :ok) + |> when_fail( + IO.puts(""" + n: #{n}, q: #{q} + Commands: #{inspect(cmds, pretty: true)} + History: #{inspect(history, pretty: true)} + State: #{inspect(state, pretty: true)} + Result: #{inspect(result, pretty: true)} + """) + ) + end + end + end + + require Record + + Record.defrecord(:state, + docs: [], + deleted_docs: [], + current_seq: 0, + peer_checkpoint_seq: nil, + drop_seq: nil, + drop_count: 0, + changed: false, + stale: false + ) + + def initial_state() do + state() + end + + @max_docids 20 + @docids 1..@max_docids |> Enum.map(&"doc-#{&1}") + + def doc_id, do: oneof(@docids) + + def index_type do + oneof([:mrview, :nouveau]) + end + + def command(s) do + cond do + state(s, :stale) -> + {:call, __MODULE__, :update_indexes, [{:var, :dbname}]} + + state(s, :changed) -> + {:call, __MODULE__, :changes, [{:var, :dbname}]} + + true -> + oneof([ + {:call, __MODULE__, :update_document, [{:var, :dbname}, doc_id()]}, + {:call, __MODULE__, :delete_document, [{:var, :dbname}, doc_id()]}, + {:call, __MODULE__, :update_peer_checkpoint, [{:var, :dbname}]}, + {:call, __MODULE__, :update_drop_seq, [{:var, :dbname}]}, + {:call, __MODULE__, :compact_db, [{:var, :dbname}]}, + {:call, __MODULE__, :split_shard, [{:var, :dbname}]}, + {:call, __MODULE__, :create_index, [{:var, :dbname}, index_type()]} + ]) + end + end + + def get_document(db_name, doc_id) do + resp = Couch.get("/#{db_name}/#{doc_id}") + + case resp.status_code do + 200 -> + {:ok, resp.body} + + 404 -> + {:not_found, resp.body["reason"]} + end + end + + def update_document(db_name, doc_id) do + case get_document(db_name, doc_id) do + {:ok, doc} -> + resp = Couch.put("/#{db_name}/#{doc_id}", body: doc) + + assert resp.status_code == 201, + "Couch.put failed #{resp.status_code} #{inspect(resp.body)}" + + {:not_found, _} -> + resp = Couch.put("/#{db_name}/#{doc_id}", body: %{}) + + assert resp.status_code == 201, + "Couch.put failed #{resp.status_code} #{inspect(resp.body)}" + end + + sync_shards(db_name) + end + + def delete_document(db_name, doc_id) do + case get_document(db_name, doc_id) do + {:ok, doc} -> + rev = doc["_rev"] + resp = Couch.delete("/#{db_name}/#{doc_id}?rev=#{rev}") + + assert resp.status_code == 200, + "Couch.delete failed #{resp.status_code} #{inspect(resp.body)}" + + {:not_found, _} -> + :ok + end + + sync_shards(db_name) + end + + def update_peer_checkpoint(db_name) do + resp = Couch.get("/#{db_name}") + + assert resp.status_code == 200, + "Couch.get failed #{resp.status_code} #{inspect(resp.body)}" + + update_seq = resp.body["update_seq"] + + resp = + Couch.put("/#{db_name}/_local/peer-checkpoint-foo", + body: %{ + update_seq: update_seq + } + ) + + assert resp.status_code == 201, + "update_peer_checkpoint failed #{resp.status_code} #{inspect(resp.body)}" + + seq_to_shards(update_seq) + end + + def update_drop_seq(db_name) do + resp = Couch.post("/#{db_name}/_update_drop_seq") + + assert resp.status_code == 201, + "update_drop_seq failed #{resp.status_code} #{inspect(resp.body)}" + + resp.body + end + + def compact_db(db_name) do + compact(db_name) + # try to avoid seeing pre-compact state of shards immediately after + # compactor pids exit + :timer.sleep(1000) + end + + def changes(db_name) do + resp = Couch.get("/#{db_name}/_changes") + assert resp.status_code == 200 + + List.foldl(resp.body["results"], {[], []}, fn change, {doc_ids, del_doc_ids} -> + if change["deleted"] do + {doc_ids, Enum.sort([change["id"] | del_doc_ids])} + else + {Enum.sort([change["id"] | doc_ids]), del_doc_ids} + end + end) + end + + def split_shard(db_name) do + resp = Couch.get("/#{db_name}/_shards") + assert resp.status_code == 200 + range = Enum.random(Map.keys(resp.body["shards"])) + + resp = + Couch.post("/_reshard/jobs", + body: %{ + type: "split", + db: db_name, + range: range + } + ) + + assert resp.status_code == 201, + "split_shard failed #{resp.status_code} #{inspect(resp.body)}" + + retry_until( + fn -> + resp = Couch.get("/_reshard/jobs") + assert resp.status_code == 200 + + Enum.all?(resp.body["jobs"], fn job -> + if job["job_state"] == "completed" do + resp = Couch.delete("/_reshard/jobs/#{job["id"]}") + assert resp.status_code == 200 + end + + job["job_state"] == "completed" + end) + end, + 200, + 10_000 + ) + end + + def create_index(db_name, index_type) do + num = Enum.random(1..1_000_000) + ddoc_id = "_design/#{index_type}-#{num}" + + case get_document(db_name, ddoc_id) do + {:ok, _doc} -> + create_index(db_name, index_type) + + {:not_found, _} -> + :ok + end + + doc = + case index_type do + :mrview -> + %{ + views: %{ + bar: %{ + map: """ + function(doc) { + emit(#{num}); + } + """ + } + } + } + + :nouveau -> + %{ + nouveau: %{ + bar: %{ + index: """ + function(doc) { + index("double", "foo", #{num}); + } + """ + } + } + } + end + + resp = Couch.put("/#{db_name}/#{ddoc_id}", body: doc) + + assert resp.status_code == 201, + "create_index failed #{resp.status_code} #{inspect(resp.body)}" + + ddoc_id + end + + def update_indexes(db_name) do + resp = Couch.get("/#{db_name}/_design_docs") + assert resp.status_code == 200 + + Enum.each(resp.body["rows"], fn row -> + case row["id"] do + "_design/mrview-" <> _ -> + resp = Couch.get("/#{db_name}/#{row["id"]}/_view/bar") + + assert resp.status_code == 200, + "query mrview failed #{resp.status_code} #{inspect(resp.body)}" + + "_design/nouveau-" <> _ -> + resp = Couch.get("/#{db_name}/#{row["id"]}/_nouveau/bar?q=*:*") + + assert resp.status_code == 200, + "query nouveau failed #{resp.status_code} #{inspect(resp.body)}" + end + end) + end + + def sync_shards(db_name) do + resp = Couch.post("/#{db_name}/_sync_shards") + + assert resp.status_code == 202, + "sync_shards failed #{resp.status_code} #{inspect(resp.body)}" + + :timer.sleep(1000) + end + + def precondition(s, {:call, _, :update_document, [_db_name, doc_id]}) do + not doc_exists(s, doc_id) + end + + def precondition(s, {:call, _, :delete_document, [_db_name, doc_id]}) do + doc_exists(s, doc_id) + end + + def precondition(_, _) do + true + end + + def next_state(s, _v, {:call, _, :update_document, [_db_name, doc_id]}) do + state(s, + current_seq: state(s, :current_seq) + 1, + docs: Enum.sort([doc_id | state(s, :docs)]), + deleted_docs: List.keydelete(state(s, :deleted_docs), doc_id, 0), + changed: true, + stale: true + ) + end + + def next_state(s, _v, {:call, _, :delete_document, [_db_name, doc_id]}) do + state(s, + current_seq: state(s, :current_seq) + 1, + docs: List.delete(state(s, :docs), doc_id), + deleted_docs: + Enum.sort([{doc_id, state(s, :current_seq) + 1} | state(s, :deleted_docs)]), + changed: true, + stale: true + ) + end + + def next_state(s, _v, {:call, _, :update_peer_checkpoint, [_db_name]}) do + state(s, peer_checkpoint_seq: state(s, :current_seq), changed: true) + end + + def next_state(s, _v, {:call, _, :update_drop_seq, [_db_name]}) do + # we'll drop all tombstones if _update_drop_seq is called when there + # are no peer checkpoint docs as the only peers are the shard syncs + # which update automatically + # n.b: indexes and their peer checkpoints will always be fresh as we + # force update_indexes after every doc update. + drop_seq = + if state(s, :peer_checkpoint_seq) == nil, + do: state(s, :current_seq), + else: state(s, :peer_checkpoint_seq) + + state(s, drop_seq: drop_seq, changed: true) + end + + def next_state(s, _v, {:call, _, :compact_db, [_db_name]}) do + {keep_docs, drop_docs} = + Enum.split_with(state(s, :deleted_docs), fn {_, seq} -> + state(s, :drop_seq) == nil or seq > state(s, :drop_seq) + end) + + state(s, + deleted_docs: keep_docs, + drop_count: state(s, :drop_count) + length(drop_docs), + changed: true + ) + end + + def next_state(s, _v, {:call, _, :changes, [_db_name]}) do + state(s, changed: false) + end + + def next_state(s, _v, {:call, _, :split_shard, [_db_name]}) do + state(s, changed: true) + end + + def next_state(s, v, {:call, _, :create_index, [_db_name, _index_type]}) do + state(s, + current_seq: state(s, :current_seq) + 1, + docs: Enum.sort([v | state(s, :docs)]), + changed: true, + stale: true + ) + end + + def next_state(s, _v, {:call, _, :update_indexes, [_db_name]}) do + state(s, stale: false) + end + + def postcondition(s, {:call, _, :changes, [_db_name]}, {doc_ids, del_doc_ids}) do + doc_ids == doc_ids(s) and del_doc_ids == deleted_doc_ids(s) + end + + def postcondition(_, _, _), do: true + + def doc_exists(s, doc_id), do: doc_id in state(s, :docs) + + def deleted_doc_exists(s, doc_id) do + List.keymember?(state(s, :deleted_docs), doc_id, 0) + end + + def doc_ids(s), do: state(s, :docs) + + def deleted_doc_ids(s) do + Enum.map(state(s, :deleted_docs), fn {doc_id, _} -> doc_id end) + end + + def seq_to_shards(seq) do + for {node, [b, e], {seq_num, _uuid, _epoch}} <- decode_seq(seq) do + b_hex = :couch_util.to_hex(<<b::32-integer>>) + e_hex = :couch_util.to_hex(<<e::32-integer>>) + range = "#{b_hex}-#{e_hex}" + {node, range, seq_num} + end + end + + def decode_seq(seq) do + seq = String.replace(seq, ~r/\d+-/, "", global: false) + :erlang.binary_to_term(Base.url_decode64!(seq, padding: false)) + end +end diff --git a/test/elixir/test/drop_seq_test.exs b/test/elixir/test/drop_seq_test.exs new file mode 100644 index 000000000..28b29ef0c --- /dev/null +++ b/test/elixir/test/drop_seq_test.exs @@ -0,0 +1,248 @@ +defmodule DropSeqTest do + use CouchTestCase + + @moduletag :drop_seq + + @tag :with_db + test "peer checkpoint - mrview", context do + db_name = context[:db_name] + ddoc_id = "_design/foo" + + create_checkpoint_fn = fn -> + resp = Couch.put("/#{db_name}/#{ddoc_id}", body: %{ + views: %{ + bar: %{ + map: "function(doc) {}" + } + } + }) + assert resp.status_code == 201 + end + + update_checkpoint_fn = fn -> + assert Couch.get("/#{db_name}/#{ddoc_id}/_view/bar").status_code == 200 + end + + checkpoint_test_helper(context[:db_name], create_checkpoint_fn, update_checkpoint_fn) + end + + @tag :with_db + test "peer checkpoint - search", context do + db_name = context[:db_name] + ddoc_id = "_design/foo" + + create_checkpoint_fn = fn -> + resp = Couch.put("/#{db_name}/#{ddoc_id}", body: %{ + indexes: %{ + bar: %{ + index: "function(doc) {}" + } + } + }) + assert resp.status_code == 201 + end + + update_checkpoint_fn = fn -> + assert Couch.get("/#{db_name}/#{ddoc_id}/_search/bar?q=*:*").status_code == 200 + end + + checkpoint_test_helper(context[:db_name], create_checkpoint_fn, update_checkpoint_fn) + end + + @tag :with_db + test "peer checkpoint - nouveau", context do + db_name = context[:db_name] + ddoc_id = "_design/foo" + + create_checkpoint_fn = fn -> + resp = Couch.put("/#{db_name}/#{ddoc_id}", body: %{ + nouveau: %{ + bar: %{ + index: "function(doc) {}" + } + } + }) + assert resp.status_code == 201 + end + + update_checkpoint_fn = fn -> + # need to add a new document to force nouveau update to run each time + # as we only advance the peer checkpoint when JVM-side nouveau has committed + # the index + assert Couch.post("/#{db_name}", body: %{}).status_code == 201 + assert Couch.get("/#{db_name}/#{ddoc_id}/_nouveau/bar?q=*:*").status_code == 200 + end + + checkpoint_test_helper(context[:db_name], create_checkpoint_fn, update_checkpoint_fn) + end + + @tag :with_db + test "peer checkpoint - custom", context do + db_name = context[:db_name] + peer_checkpoint_id = "_local/peer-checkpoint-foo" + + update_checkpoint_fn = fn -> + resp = Couch.get("/#{db_name}") + assert resp.status_code == 200 + update_seq = resp.body["update_seq"] + + resp = Couch.put("/#{db_name}/#{peer_checkpoint_id}", body: %{ + update_seq: update_seq + }) + assert resp.status_code == 201 + end + + checkpoint_test_helper(context[:db_name], update_checkpoint_fn, update_checkpoint_fn) + end + + @tag :with_db + test "peer checkpoint - shard split", context do + db_name = context[:db_name] + peer_checkpoint_id = "_local/peer-checkpoint-foo" + + create_checkpoint_fn = fn -> + resp = Couch.get("/#{db_name}") + assert resp.status_code == 200 + update_seq = resp.body["update_seq"] + + resp = Couch.put("/#{db_name}/#{peer_checkpoint_id}", body: %{ + update_seq: update_seq + }) + assert resp.status_code == 201 + + resp = Couch.get("/#{db_name}/_shards") + assert resp.status_code == 200 + ranges = Map.keys(resp.body["shards"]) + Enum.each(ranges, fn r -> + resp = Couch.post("/_reshard/jobs", body: %{ + type: "split", + db: db_name, + range: r + }) + assert resp.status_code == 201 + end) + wait_for_reshard_jobs_to_complete() + end + + after_doc_deletion_fn = fn -> + split_all_shard_ranges(db_name) + wait_for_reshard_jobs_to_complete() + end + + update_checkpoint_fn = fn -> + resp = Couch.get("/#{db_name}") + assert resp.status_code == 200 + update_seq = resp.body["update_seq"] + + resp = Couch.put("/#{db_name}/#{peer_checkpoint_id}", body: %{ + update_seq: update_seq + }) + assert resp.status_code == 201 + end + + checkpoint_test_helper(context[:db_name], + create_checkpoint_fn, update_checkpoint_fn, after_doc_deletion_fn) + end + + defp checkpoint_test_helper(db_name, create_checkpoint_fn, update_checkpoint_fn) do + checkpoint_test_helper(db_name, create_checkpoint_fn, update_checkpoint_fn, fn() -> true end) + end + + defp checkpoint_test_helper(db_name, create_checkpoint_fn, update_checkpoint_fn, after_doc_deletion_fn) do + doc_id = "testdoc" + + drop_count = get_drop_count(db_name) + drop_seq = update_drop_seq(db_name) + + # create something that will create a peer checkpoint + create_checkpoint_fn.() + assert get_drop_count(db_name) == drop_count + + # create a document + resp = Couch.put("/#{db_name}/#{doc_id}", body: %{}) + assert resp.status_code == 201 + rev = resp.body["rev"] + + # delete it + resp = Couch.delete("/#{db_name}/#{doc_id}?rev=#{rev}") + assert resp.status_code == 200 + + after_doc_deletion_fn.() + + # wait for drop seq to change + wait_for_drop_seq_change(db_name, drop_seq, update_checkpoint_fn) + assert get_drop_count(db_name) == drop_count + + # confirm deleted doc is still in _changes response + resp = Couch.get("/#{db_name}/_changes") + assert resp.status_code == 200 + assert Enum.member?(get_ids(resp), doc_id) + + # compact + compact(db_name) + + # confirm deleted doc is not in _changes response + resp = Couch.get("/#{db_name}/_changes") + assert resp.status_code == 200 + assert !Enum.member?(get_ids(resp), doc_id) + assert get_drop_count(db_name) == drop_count + 1 + end + + defp wait_for_drop_seq_change(db_name, previous_drop_seq, update_checkpoint_fn) do + retry_until( + fn -> + update_checkpoint_fn.() + new_drop_seq = update_drop_seq(db_name) + new_drop_seq != previous_drop_seq + end, + 200, + 10_000 + ) + end + + defp split_all_shard_ranges(db_name) do + resp = Couch.get("/#{db_name}/_shards") + assert resp.status_code == 200 + ranges = Map.keys(resp.body["shards"]) + Enum.each(ranges, fn r -> + resp = Couch.post("/_reshard/jobs", body: %{ + type: "split", + db: db_name, + range: r + }) + assert resp.status_code == 201 + end) + end + + defp wait_for_reshard_jobs_to_complete() do + retry_until( + fn -> + resp = Couch.get("/_reshard/jobs") + assert resp.status_code == 200 + Enum.all?(resp.body["jobs"], fn job -> + job["job_state"] == "completed" + end) + end, + 200, + 10_000 + ) + end + + defp update_drop_seq(db_name) do + resp = Couch.post("/#{db_name}/_update_drop_seq") + assert resp.status_code == 201 + resp.body["results"] + end + + defp get_drop_count(db_name) do + resp = Couch.get("/#{db_name}") + assert resp.status_code == 200 + resp.body["drop_count"] + end + + defp get_ids(resp) do + %{:body => %{"results" => results}} = resp + Enum.map(results, fn result -> result["id"] end) + end + +end
