This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 363340e6e7c08b8229cd66b1bfe9403c18586ce5
Author: Robert Newson <[email protected]>
AuthorDate: Thu Mar 20 17:08:55 2025 +0000

    Implement _update_drop_seq
---
 .gitignore                                         |   2 +
 Makefile                                           |   8 +-
 configure                                          |   1 +
 configure.ps1                                      |   1 +
 dev/run                                            |   1 +
 erlang_ls.config                                   |   5 +
 mix.exs                                            |   5 +-
 .../nouveau/NouveauApplicationConfiguration.java   |   2 +-
 .../org/apache/couchdb/nouveau/api/IndexInfo.java  |  13 +-
 .../org/apache/couchdb/nouveau/core/Index.java     |  12 +-
 rel/nouveau.yaml                                   |   2 +-
 src/chttpd/src/chttpd_db.erl                       |  16 +-
 src/chttpd/src/chttpd_httpd_handlers.erl           |   1 +
 src/config/src/config.erl                          |   6 +-
 src/couch/src/couch_bt_engine.erl                  |  18 +-
 src/couch/src/couch_bt_engine_compactor.erl        |   3 +-
 src/couch/src/couch_bt_engine_header.erl           |  12 +-
 src/couch/src/couch_db.erl                         |   6 +
 src/couch/src/couch_db_engine.erl                  |  18 +-
 src/couch_mrview/src/couch_mrview_cleanup.erl      |   6 +-
 src/couch_mrview/src/couch_mrview_index.erl        |  26 +-
 .../src/couch_replicator_scheduler_job.erl         |  53 +-
 src/dreyfus/src/dreyfus_fabric_cleanup.erl         |   1 +
 src/dreyfus/src/dreyfus_index.erl                  |   7 +
 src/dreyfus/src/dreyfus_index_updater.erl          |  13 +-
 src/fabric/src/fabric.erl                          |  15 +-
 src/fabric/src/fabric_db_info.erl                  |   2 +
 src/fabric/src/fabric_drop_seq.erl                 | 860 +++++++++++++++++++--
 src/mem3/src/mem3_reshard_index.erl                |  10 +-
 src/nouveau/src/nouveau_fabric_cleanup.erl         |   1 +
 src/nouveau/src/nouveau_index_updater.erl          |  17 +
 test/elixir/lib/utils.ex                           |   2 +-
 test/elixir/test/config/nouveau.elixir             |   3 +
 test/elixir/test/config/search.elixir              |   3 +
 test/elixir/test/config/suite.elixir               |   5 +
 test/elixir/test/drop_seq_statem_test.exs          | 415 ++++++++++
 test/elixir/test/drop_seq_test.exs                 | 248 ++++++
 37 files changed, 1691 insertions(+), 128 deletions(-)

diff --git a/.gitignore b/.gitignore
index 080a7dd6f..4d4918a73 100644
--- a/.gitignore
+++ b/.gitignore
@@ -78,6 +78,8 @@ src/unicode_util_compat/
 src/file_system/
 src/rebar3/
 src/erlfmt/
+src/libgraph/
+src/propcheck/
 src/*.erl
 tmp/
 
diff --git a/Makefile b/Makefile
index 8b1df51e6..1acd42048 100644
--- a/Makefile
+++ b/Makefile
@@ -254,6 +254,12 @@ elixir-cluster-with-quorum: elixir-init devclean
                --degrade-cluster 1 \
                --no-eval 'mix test --trace --only with_quorum_test 
$(EXUNIT_OPTS)'
 
+.PHONY: elixir-cluster
+elixir-cluster: export MIX_ENV=integration
+elixir-cluster: elixir-init devclean
+       @dev/run -n 3 -q -a adm:pass --with-nouveau \
+               --no-eval 'mix test --trace --only with_cluster $(EXUNIT_OPTS)'
+
 .PHONY: elixir
 # target: elixir - Run Elixir-based integration tests
 elixir: export MIX_ENV=integration
@@ -575,5 +581,5 @@ nouveau-test-elixir: couch nouveau
 ifeq ($(with_nouveau), true)
        @dev/run "$(TEST_OPTS)" -n 1 -q -a adm:pass --with-nouveau \
                --locald-config test/config/test-config.ini \
-               --no-eval 'mix test --trace --include 
test/elixir/test/config/nouveau.elixir'
+               --no-eval 'mix test --trace --include 
test/elixir/test/config/nouveau.elixir $(EXUNIT_OPTS)'
 endif
diff --git a/configure b/configure
index 70350a07b..80b8921c3 100755
--- a/configure
+++ b/configure
@@ -373,6 +373,7 @@ cat > rel/couchdb.config << EOF
 {data_dir, "./data"}.
 {view_index_dir, "./data"}.
 {nouveau_index_dir, "./data/nouveau"}.
+{nouveau_commit_interval, 30}.
 {nouveau_url, "http://127.0.0.1:5987"}.
 {nouveau_port, 5987}.
 {nouveau_admin_port, 5988}.
diff --git a/configure.ps1 b/configure.ps1
index faf3669ec..ca0c417dd 100644
--- a/configure.ps1
+++ b/configure.ps1
@@ -194,6 +194,7 @@ $CouchDBConfig = @"
 {data_dir, "./data"}.
 {view_index_dir, "./data"}.
 {nouveau_index_dir, "./data/nouveau"}.
+{nouveau_commit_interval, 30}.
 {nouveau_url, "http://127.0.0.1:5987"}.
 {nouveau_port, 5987}.
 {nouveau_admin_port, 5988}.
diff --git a/dev/run b/dev/run
index 85ea9622d..fc070a035 100755
--- a/dev/run
+++ b/dev/run
@@ -424,6 +424,7 @@ def generate_nouveau_config(ctx):
 
     config = {
         "nouveau_index_dir": os.path.join(ctx["devdir"], "lib", "nouveau"),
+        "nouveau_commit_interval": 5,
         "nouveau_port": 5987,
         "nouveau_admin_port": 5988,
     }
diff --git a/erlang_ls.config b/erlang_ls.config
index 94483cfec..3464e6fdd 100644
--- a/erlang_ls.config
+++ b/erlang_ls.config
@@ -3,3 +3,8 @@ apps_dirs:
 include_dirs:
     - "src"
     - "src/*/include"
+macros:
+  - name: WITH_PROPER
+    value: true
+  - name: TEST
+    value: true
diff --git a/mix.exs b/mix.exs
index 4f0cb4ba3..86fff4227 100644
--- a/mix.exs
+++ b/mix.exs
@@ -71,7 +71,7 @@ defmodule CouchDBTest.Mixfile do
   end
 
   # Run "mix help compile.app" to learn about applications.
-  def application, do: [applications: [:logger, :httpotion]]
+  def application, do: [applications: [:logger, :httpotion, :propcheck]]
 
   # Specifies which paths to compile per environment.
   defp elixirc_paths(:test), do: ["test/elixir/lib", 
"test/elixir/test/support"]
@@ -85,7 +85,8 @@ defmodule CouchDBTest.Mixfile do
       {:httpotion, ">= 3.2.0", only: [:dev, :test, :integration], runtime: 
false},
       {:excoveralls, "~> 0.18.3", only: :test},
       {:ibrowse, path: path("ibrowse"), override: true},
-      {:credo, "~> 1.7.11", only: [:dev, :test, :integration], runtime: false}
+      {:credo, "~> 1.7.11", only: [:dev, :test, :integration], runtime: false},
+      {:propcheck, ">= 1.5.0", only: [:dev, :test, :integration]},
     ]
 
     extra_deps = [:b64url, :jiffy, :jwtf, :meck, :mochiweb]
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java
 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java
index dce6fe6da..67eed38af 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplicationConfiguration.java
@@ -24,7 +24,7 @@ public class NouveauApplicationConfiguration extends 
Configuration {
     @Min(10)
     private int maxIndexesOpen = 10;
 
-    @Min(10)
+    @Min(5)
     private int commitIntervalSeconds = 10;
 
     @Min(30)
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
index 9958c7780..fbf003c69 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
@@ -24,6 +24,9 @@ public final class IndexInfo {
     @PositiveOrZero
     private final long updateSeq;
 
+    @PositiveOrZero
+    private final long committedUpdateSeq;
+
     @PositiveOrZero
     private final long purgeSeq;
 
@@ -35,10 +38,12 @@ public final class IndexInfo {
 
     public IndexInfo(
             @JsonProperty("update_seq") final long updateSeq,
+            @JsonProperty("committed_update_seq") final long 
committedUpdateSeq,
             @JsonProperty("purge_seq") final long purgeSeq,
             @JsonProperty("num_docs") final int numDocs,
             @JsonProperty("disk_size") final long diskSize) {
         this.updateSeq = updateSeq;
+        this.committedUpdateSeq = committedUpdateSeq;
         this.purgeSeq = purgeSeq;
         this.numDocs = numDocs;
         this.diskSize = diskSize;
@@ -56,13 +61,17 @@ public final class IndexInfo {
         return updateSeq;
     }
 
+    public long getCommittedUpdateSeq() {
+        return committedUpdateSeq;
+    }
+
     public long getPurgeSeq() {
         return purgeSeq;
     }
 
     @Override
     public String toString() {
-        return "IndexInfo [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq 
+ ", numDocs=" + numDocs + ", diskSize="
-                + diskSize + "]";
+        return "IndexInfo [updateSeq=" + updateSeq + ", committedUpdateSeq=" + 
committedUpdateSeq + ", purgeSeq="
+                + purgeSeq + ", numDocs=" + numDocs + ", diskSize=" + diskSize 
+ "]";
     }
 }
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index 69c9bd48f..bcb5859e4 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -36,18 +36,20 @@ import org.apache.couchdb.nouveau.api.SearchResults;
 public abstract class Index implements Closeable {
 
     private long updateSeq;
+    private long committedUpdateSeq;
     private long purgeSeq;
     private boolean deleteOnClose = false;
 
     protected Index(final long updateSeq, final long purgeSeq) {
         this.updateSeq = updateSeq;
+        this.committedUpdateSeq = updateSeq;
         this.purgeSeq = purgeSeq;
     }
 
     public final IndexInfo info() throws IOException {
         final int numDocs = doNumDocs();
         final long diskSize = doDiskSize();
-        return new IndexInfo(updateSeq, purgeSeq, numDocs, diskSize);
+        return new IndexInfo(updateSeq, committedUpdateSeq, purgeSeq, numDocs, 
diskSize);
     }
 
     protected abstract int doNumDocs() throws IOException;
@@ -93,7 +95,13 @@ public abstract class Index implements Closeable {
             updateSeq = this.updateSeq;
             purgeSeq = this.purgeSeq;
         }
-        return doCommit(updateSeq, purgeSeq);
+        final boolean result = doCommit(updateSeq, purgeSeq);
+        if (result) {
+            synchronized (this) {
+                this.committedUpdateSeq = Math.max(this.committedUpdateSeq, 
updateSeq);
+            }
+        }
+        return result;
     }
 
     protected abstract boolean doCommit(final long updateSeq, final long 
purgeSeq) throws IOException;
diff --git a/rel/nouveau.yaml b/rel/nouveau.yaml
index 40837f12c..55dd192fa 100644
--- a/rel/nouveau.yaml
+++ b/rel/nouveau.yaml
@@ -1,5 +1,5 @@
 maxIndexesOpen: 3000
-commitIntervalSeconds: 30
+commitIntervalSeconds: {{nouveau_commit_interval}}
 idleSeconds: 60
 rootDir: {{nouveau_index_dir}}
 
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index b4c141f8c..3e22dc423 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -30,7 +30,8 @@
     handle_view_cleanup_req/2,
     update_doc/4,
     http_code_from_status/1,
-    handle_partition_req/2
+    handle_partition_req/2,
+    handle_update_drop_seq_req/2
 ]).
 
 -import(
@@ -390,6 +391,19 @@ update_partition_stats(PathParts) ->
             ok
     end.
 
+handle_update_drop_seq_req(
+    #httpd{method = 'POST', path_parts = [_DbName, <<"_update_drop_seq">>]} = 
Req, Db
+) ->
+    chttpd:validate_ctype(Req, "application/json"),
+    case fabric:update_drop_seq(Db) of
+        {ok, Results} ->
+            send_json(Req, 201, {[{ok, true}, {results, Results}]});
+        {error, Reason} ->
+            chttpd:send_error(Req, Reason)
+    end;
+handle_update_drop_seq_req(Req, _Db) ->
+    send_method_not_allowed(Req, "POST").
+
 handle_design_req(
     #httpd{
         path_parts = [_DbName, _Design, Name, <<"_", _/binary>> = Action | 
_Rest]
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl 
b/src/chttpd/src/chttpd_httpd_handlers.erl
index 932b52e5f..306885e83 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -35,6 +35,7 @@ db_handler(<<"_design">>) -> fun 
chttpd_db:handle_design_req/2;
 db_handler(<<"_partition">>) -> fun chttpd_db:handle_partition_req/2;
 db_handler(<<"_temp_view">>) -> fun chttpd_view:handle_temp_view_req/2;
 db_handler(<<"_changes">>) -> fun chttpd_db:handle_changes_req/2;
+db_handler(<<"_update_drop_seq">>) -> fun 
chttpd_db:handle_update_drop_seq_req/2;
 db_handler(_) -> no_match.
 
 design_handler(<<"_view">>) -> fun chttpd_view:handle_view_req/3;
diff --git a/src/config/src/config.erl b/src/config/src/config.erl
index bf8358d1d..695bc40b9 100644
--- a/src/config/src/config.erl
+++ b/src/config/src/config.erl
@@ -512,8 +512,10 @@ remove_comments(Line) ->
         NoLeadingComment when is_list(NoLeadingComment) ->
             % Check for in-line comments. In-line comments must be preceded by
             % space or a tab character.
-            [NoComments | _] = re:split(NoLeadingComment, " ;|\t;", [{return, 
list}]),
-            NoComments
+            case re:split(NoLeadingComment, " ;|\t;", [{return, list}]) of
+                [] -> [];
+                [NoComments | _] -> NoComments
+            end
     end.
 
 trim(String) ->
diff --git a/src/couch/src/couch_bt_engine.erl 
b/src/couch/src/couch_bt_engine.erl
index 6f9c86d6e..9d3c97fd7 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -45,6 +45,7 @@
     get_partition_info/2,
     get_update_seq/1,
     get_drop_seq/1,
+    get_drop_count/1,
     get_uuid/1,
 
     set_revs_limit/2,
@@ -54,6 +55,7 @@
 
     set_update_seq/2,
     set_drop_seq/3,
+    increment_drop_count/1,
 
     open_docs/2,
     open_local_docs/2,
@@ -331,6 +333,9 @@ get_update_seq(#st{header = Header}) ->
 get_drop_seq(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, drop_seq).
 
+get_drop_count(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, drop_count).
+
 get_uuid(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, uuid).
 
@@ -820,12 +825,11 @@ set_drop_seq(#st{header = Header} = St, 
ExpectedUuidPrefix, NewDropSeq) when
     CurrentDropSeq = get_drop_seq(St),
     Uuid = get_uuid(St),
     ActualUuidPrefix = binary:part(Uuid, 0, byte_size(ExpectedUuidPrefix)),
-
     if
-        NewDropSeq < CurrentDropSeq ->
-            {error, drop_seq_cant_decrease};
         ExpectedUuidPrefix /= ActualUuidPrefix ->
             {error, uuid_mismatch};
+        NewDropSeq < CurrentDropSeq ->
+            {error, {drop_seq_cant_decrease, CurrentDropSeq, NewDropSeq}};
         true ->
             NewSt = St#st{
                 header = couch_bt_engine_header:set(Header, [
@@ -836,6 +840,14 @@ set_drop_seq(#st{header = Header} = St, 
ExpectedUuidPrefix, NewDropSeq) when
             {ok, increment_update_seq(NewSt)}
     end.
 
+increment_drop_count(#st{header = Header} = St) ->
+    CurrentDropCount = get_drop_count(St),
+    NewSt = St#st{
+        header = couch_bt_engine_header:set(Header, [{drop_count, 
CurrentDropCount + 1}]),
+        needs_commit = true
+    },
+    {ok, NewSt}.
+
 copy_security(#st{header = Header} = St, SecProps) ->
     Options = [{compression, St#st.compression}],
     {ok, Ptr, _} = couch_file:append_term(St#st.fd, SecProps, Options),
diff --git a/src/couch/src/couch_bt_engine_compactor.erl 
b/src/couch/src/couch_bt_engine_compactor.erl
index 619200bce..2f230f489 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -330,7 +330,8 @@ copy_compact(#comp_st{} = CompSt) ->
             if
                 Deleted andalso Seq =< DropSeq ->
                     %% drop this document completely
-                    {ok, {AccNewSt, AccUncopied, AccUncopiedSize, 
AccCopiedSize}};
+                    {ok, NewSt2} = 
couch_bt_engine:increment_drop_count(AccNewSt),
+                    {ok, {NewSt2, AccUncopied, AccUncopiedSize, 
AccCopiedSize}};
                 AccUncopiedSize2 >= BufferSize ->
                     NewSt2 = copy_docs(
                         St, AccNewSt, lists:reverse([DocInfo | AccUncopied]), 
Retry
diff --git a/src/couch/src/couch_bt_engine_header.erl 
b/src/couch/src/couch_bt_engine_header.erl
index 9f5285d49..0d3c55a14 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -39,7 +39,8 @@
     uuid/1,
     epochs/1,
     compacted_seq/1,
-    drop_seq/1
+    drop_seq/1,
+    drop_count/1
 ]).
 
 -include_lib("stdlib/include/assert.hrl").
@@ -73,7 +74,8 @@
     compacted_seq,
     purge_infos_limit = 1000,
     props_ptr,
-    drop_seq = 0
+    drop_seq = 0,
+    drop_count = 0
 }).
 
 -define(PARTITION_DISK_VERSION, 8).
@@ -90,7 +92,8 @@ from(Header0) ->
         uuid = Header#db_header.uuid,
         epochs = Header#db_header.epochs,
         compacted_seq = Header#db_header.compacted_seq,
-        drop_seq = Header#db_header.drop_seq
+        drop_seq = Header#db_header.drop_seq,
+        drop_count = Header#db_header.drop_count
     }.
 
 is_header(Header) ->
@@ -182,6 +185,9 @@ compacted_seq(Header) ->
 drop_seq(Header) ->
     get_field(Header, drop_seq).
 
+drop_count(Header) ->
+    get_field(Header, drop_count).
+
 purge_infos_limit(Header) ->
     get_field(Header, purge_infos_limit).
 
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 8718b5b82..e4d16a36b 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -42,6 +42,7 @@
     get_db_info/1,
     get_partition_info/2,
     get_del_doc_count/1,
+    get_drop_count/1,
     get_doc_count/1,
     get_epochs/1,
     get_filepath/1,
@@ -579,6 +580,9 @@ get_pid(#db{main_pid = Pid}) ->
 get_del_doc_count(Db) ->
     {ok, couch_db_engine:get_del_doc_count(Db)}.
 
+get_drop_count(Db) ->
+    {ok, couch_db_engine:get_drop_count(Db)}.
+
 get_doc_count(Db) ->
     {ok, couch_db_engine:get_doc_count(Db)}.
 
@@ -619,6 +623,7 @@ get_db_info(Db) ->
     } = Db,
     {ok, DocCount} = get_doc_count(Db),
     {ok, DelDocCount} = get_del_doc_count(Db),
+    {ok, DropCount} = get_drop_count(Db),
     SizeInfo = couch_db_engine:get_size_info(Db),
     DiskVersion = couch_db_engine:get_disk_version(Db),
     Uuid =
@@ -641,6 +646,7 @@ get_db_info(Db) ->
         {engine, couch_db_engine:get_engine(Db)},
         {doc_count, DocCount},
         {doc_del_count, DelDocCount},
+        {drop_count, DropCount},
         {update_seq, get_update_seq(Db)},
         {purge_seq, couch_db_engine:get_purge_seq(Db)},
         {compact_running, Compactor /= nil},
diff --git a/src/couch/src/couch_db_engine.erl 
b/src/couch/src/couch_db_engine.erl
index 1d89066e8..7a3dd8b32 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -194,6 +194,11 @@
 -callback get_del_doc_count(DbHandle :: db_handle()) ->
     DelDocCount :: non_neg_integer().
 
+% The number of tombstones (deleted documents with no content) in the
+% database which have been completely dropped from the database.
+-callback get_drop_count(DbHandle :: db_handle()) ->
+    DropCount :: non_neg_integer().
+
 % This number is reported in the database info properties and
 % as such can be any JSON value.
 -callback get_disk_version(DbHandle :: db_handle()) -> Version :: json().
@@ -679,6 +684,7 @@
     get_engine/1,
     get_compacted_seq/1,
     get_del_doc_count/1,
+    get_drop_count/1,
     get_disk_version/1,
     get_doc_count/1,
     get_epochs/1,
@@ -806,6 +812,10 @@ get_del_doc_count(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     Engine:get_del_doc_count(EngineState).
 
+get_drop_count(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:get_drop_count(EngineState).
+
 get_disk_version(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     Engine:get_disk_version(EngineState).
@@ -889,8 +899,12 @@ set_update_seq(#db{} = Db, UpdateSeq) ->
 
 set_drop_seq(#db{} = Db, UuidPrefix, UpdateSeq) ->
     #db{engine = {Engine, EngineState}} = Db,
-    {ok, NewSt} = Engine:set_drop_seq(EngineState, UuidPrefix, UpdateSeq),
-    {ok, Db#db{engine = {Engine, NewSt}}}.
+    case Engine:set_drop_seq(EngineState, UuidPrefix, UpdateSeq) of
+        {ok, NewSt} ->
+            {ok, Db#db{engine = {Engine, NewSt}}};
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
 open_docs(#db{} = Db, DocIds) ->
     #db{engine = {Engine, EngineState}} = Db,
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl 
b/src/couch_mrview/src/couch_mrview_cleanup.erl
index 5b5afbdce..d769ded8f 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -15,7 +15,8 @@
 -export([
     run/1,
     cleanup_purges/3,
-    cleanup_indices/2
+    cleanup_indices/2,
+    cleanup_peer_checkpoints/2
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -43,6 +44,9 @@ cleanup_indices(#{} = Sigs, #{} = IndexMap) ->
     maps:map(Fun, maps:without(maps:keys(Sigs), IndexMap)),
     ok.
 
+cleanup_peer_checkpoints(DbName, Sigs) when is_binary(DbName), is_map(Sigs) ->
+    fabric_drop_seq:cleanup_peer_checkpoint_docs(DbName, <<"mrview">>, 
maps:keys(Sigs)).
+
 delete_file(File) ->
     RootDir = couch_index_util:root_dir(),
     couch_log:debug("~p : deleting inactive index : ~s", [?MODULE, File]),
diff --git a/src/couch_mrview/src/couch_mrview_index.erl 
b/src/couch_mrview/src/couch_mrview_index.erl
index 51777480c..5ec895434 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -129,12 +129,14 @@ open(Db, State0) ->
                     NewSt = init_and_upgrade_state(Db, Fd, State, Header),
                     ok = commit(NewSt),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt};
                 % end of upgrade code for <= 2.x
                 {ok, {Sig, Header}} ->
                     % Matching view signatures.
                     NewSt = init_and_upgrade_state(Db, Fd, State, Header),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     check_collator_versions(DbName, NewSt),
                     {ok, NewSt};
                 {ok, {WrongSig, _}} ->
@@ -144,6 +146,7 @@ open(Db, State0) ->
                     ),
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt};
                 {ok, Else} ->
                     couch_log:error(
@@ -152,10 +155,12 @@ open(Db, State0) ->
                     ),
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt};
                 no_valid_header ->
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
                     ensure_local_purge_doc(Db, NewSt),
+                    ensure_peer_checkpoint_doc(NewSt),
                     {ok, NewSt}
             end;
         {error, Reason} = Error ->
@@ -210,7 +215,17 @@ finish_update(State) ->
 
 commit(State) ->
     Header = {State#mrst.sig, couch_mrview_util:make_header(State)},
-    couch_file:write_header(State#mrst.fd, Header).
+    ok = couch_file:sync(State#mrst.fd),
+    ok = couch_file:write_header(State#mrst.fd, Header),
+    ok = couch_file:sync(State#mrst.fd),
+    fabric_drop_seq:update_peer_checkpoint_doc(
+        State#mrst.db_name,
+        <<"mrview">>,
+        State#mrst.idx_name,
+        fabric_drop_seq:peer_id_from_sig(State#mrst.db_name, 
couch_util:to_hex_bin(State#mrst.sig)),
+        State#mrst.update_seq
+    ),
+    ok.
 
 compact(Db, State, Opts) ->
     couch_mrview_compactor:compact(Db, State, Opts).
@@ -295,6 +310,15 @@ ensure_local_purge_doc(Db, #mrst{} = State) ->
             ok
     end.
 
+ensure_peer_checkpoint_doc(#mrst{} = State) ->
+    fabric_drop_seq:create_peer_checkpoint_doc_if_missing(
+        State#mrst.db_name,
+        <<"mrview">>,
+        State#mrst.idx_name,
+        fabric_drop_seq:peer_id_from_sig(State#mrst.db_name, 
couch_util:to_hex_bin(State#mrst.sig)),
+        State#mrst.update_seq
+    ).
+
 create_local_purge_doc(Db, State) ->
     PurgeSeq = couch_db:get_purge_seq(Db),
     update_local_purge_doc(Db, State, PurgeSeq).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl 
b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 62e604b5e..0b14564c0 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -678,6 +678,7 @@ init_state(Rep) ->
     StartSeq = {0, StartSeq1},
 
     SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+    create_peer_checkpoint_doc_if_missing(Source, BaseId, SourceSeq),
 
     #doc{body = {CheckpointHistory}} = SourceLog,
     State = #rep_state{
@@ -809,7 +810,7 @@ do_checkpoint(State) ->
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats,
-        rep_details = #rep{options = Options},
+        rep_details = #rep{id = {BaseId, _}, options = Options},
         session_id = SessionId
     } = State,
     case commit_to_both(Source, Target) of
@@ -886,7 +887,14 @@ do_checkpoint(State) ->
                 {TgtRevPos, TgtRevId} = update_checkpoint(
                     Target, TargetLog#doc{body = NewRepHistory}, target
                 ),
-                %% TODO update_checkpoint(Source, peer_checkpoint_doc(State), 
source),
+                if
+                    is_binary(NewSeq) ->
+                        update_checkpoint(
+                            Source, peer_checkpoint_doc(Source, BaseId, 
NewSeq), source
+                        );
+                    true ->
+                        ok
+                end,
                 NewState = State#rep_state{
                     checkpoint_history = NewRepHistory,
                     committed_seq = NewTsSeq,
@@ -913,14 +921,39 @@ do_checkpoint(State) ->
             >>}
     end.
 
-peer_checkpoint_doc(#rep_state{} = State) ->
-    #rep_state{
-        session_id = SessionId
-    } = State,
-    #doc{
-        id = <<"peer-checkpoint-", SessionId/binary>>,
-        body = {[{<<"update_seq">>, State#rep_state.committed_seq}]}
-    }.
+create_peer_checkpoint_doc_if_missing(#httpdb{} = Db, BaseId, SourceSeq) when
+    is_list(BaseId), is_binary(SourceSeq)
+->
+    case couch_replicator_api_wrap:open_doc(Db, peer_checkpoint_id(BaseId), 
[]) of
+        {ok, _} ->
+            ok;
+        {error, <<"not_found">>} ->
+            Doc = peer_checkpoint_doc(Db, BaseId, SourceSeq),
+            case couch_replicator_api_wrap:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    ok;
+                {error, Reason} ->
+                    throw({checkpoint_commit_failure, Reason})
+            end;
+        {error, Reason} ->
+            throw({checkpoint_commit_failure, Reason})
+    end;
+create_peer_checkpoint_doc_if_missing(#httpdb{} = _Db, BaseId, SourceSeq) when
+    is_list(BaseId), is_integer(SourceSeq)
+->
+    %% ignored
+    ok.
+
+peer_checkpoint_doc(#httpdb{} = Db, BaseId, UpdateSeq) ->
+    fabric_drop_seq:peer_checkpoint_doc(
+        peer_checkpoint_id(BaseId),
+        <<"replication">>,
+        ?l2b(couch_replicator_api_wrap:db_uri(Db)),
+        UpdateSeq
+    ).
+
+peer_checkpoint_id(BaseId) when is_list(BaseId) ->
+    ?l2b("repl-" ++ BaseId).
 
 update_checkpoint(Db, Doc, DbType) ->
     try
diff --git a/src/dreyfus/src/dreyfus_fabric_cleanup.erl 
b/src/dreyfus/src/dreyfus_fabric_cleanup.erl
index e2710744d..1a3aec9dc 100644
--- a/src/dreyfus/src/dreyfus_fabric_cleanup.erl
+++ b/src/dreyfus/src/dreyfus_fabric_cleanup.erl
@@ -29,6 +29,7 @@ go(DbName) ->
         )
     ),
     cleanup_local_purge_doc(DbName, ActiveSigs),
+    fabric_drop_seq:cleanup_peer_checkpoint_docs(DbName, <<"search">>, 
ActiveSigs),
     clouseau_rpc:cleanup(DbName, ActiveSigs),
     ok.
 
diff --git a/src/dreyfus/src/dreyfus_index.erl 
b/src/dreyfus/src/dreyfus_index.erl
index c97a837d5..c2b6fa6ea 100644
--- a/src/dreyfus/src/dreyfus_index.erl
+++ b/src/dreyfus/src/dreyfus_index.erl
@@ -124,6 +124,13 @@ init({DbName, Index}) ->
                         couch_db:close(Db)
                     end,
                     dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index),
+                    fabric_drop_seq:create_peer_checkpoint_doc_if_missing(
+                        DbName,
+                        <<"search">>,
+                        <<(Index#index.ddoc_id)/binary, "/", 
(Index#index.name)/binary>>,
+                        fabric_drop_seq:peer_id_from_sig(DbName, 
Index#index.sig),
+                        Seq
+                    ),
                     proc_lib:init_ack({ok, self()}),
                     gen_server:enter_loop(?MODULE, [], State);
                 Error ->
diff --git a/src/dreyfus/src/dreyfus_index_updater.erl 
b/src/dreyfus/src/dreyfus_index_updater.erl
index 6edc5a257..f8404cc53 100644
--- a/src/dreyfus/src/dreyfus_index_updater.erl
+++ b/src/dreyfus/src/dreyfus_index_updater.erl
@@ -60,7 +60,18 @@ update(IndexPid, Index) ->
             [Changes] = couch_task_status:get([changes_done]),
             Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, 
erlang:timestamp(), ExcludeIdRevs},
             {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []),
-            ok = clouseau_rpc:commit(IndexPid, NewCurSeq)
+            ok = clouseau_rpc:commit(IndexPid, NewCurSeq),
+            {ok, CommittedSeq} = clouseau_rpc:get_update_seq(IndexPid),
+            fabric_drop_seq:update_peer_checkpoint_doc(
+                DbName,
+                <<"search">>,
+                <<(Index#index.ddoc_id)/binary, "/", 
(Index#index.name)/binary>>,
+                fabric_drop_seq:peer_id_from_sig(
+                    DbName,
+                    Index#index.sig
+                ),
+                CommittedSeq
+            )
         after
             ret_os_process(Proc)
         end,
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 92f30e4e8..c1fb021a8 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -34,7 +34,7 @@
     get_purged_infos/1,
     compact/1, compact/2,
     get_partition_info/2,
-    calculate_drop_seq/1
+    update_drop_seq/1
 ]).
 
 % Documents
@@ -120,9 +120,9 @@ get_db_info(DbName) ->
 get_partition_info(DbName, Partition) ->
     fabric_db_partition_info:go(dbname(DbName), Partition).
 
--spec calculate_drop_seq(dbname()) ->
+-spec update_drop_seq(dbname()) ->
     {ok, [{node(), binary(), non_neg_integer()}]}.
-calculate_drop_seq(DbName) ->
+update_drop_seq(DbName) ->
     fabric_drop_seq:go(dbname(DbName)).
 
 %% @doc the number of docs in a database
@@ -586,18 +586,19 @@ cleanup_index_files() ->
 cleanup_index_files(DbName) ->
     try
         ShardNames = [mem3:name(S) || S <- mem3:local_shards(dbname(DbName))],
-        cleanup_local_indices_and_purge_checkpoints(ShardNames)
+        Sigs = couch_mrview_util:get_signatures(hd(ShardNames)),
+        couch_mrview_cleanup:cleanup_peer_checkpoints(dbname(DbName), Sigs),
+        cleanup_local_indices_and_purge_checkpoints(Sigs, ShardNames)
     catch
         error:database_does_not_exist ->
             ok
     end.
 
-cleanup_local_indices_and_purge_checkpoints([]) ->
+cleanup_local_indices_and_purge_checkpoints(_Sigs, []) ->
     ok;
-cleanup_local_indices_and_purge_checkpoints([_ | _] = Dbs) ->
+cleanup_local_indices_and_purge_checkpoints(Sigs, [_ | _] = Dbs) ->
     AllIndices = lists:map(fun couch_mrview_util:get_index_files/1, Dbs),
     AllPurges = lists:map(fun couch_mrview_util:get_purge_checkpoints/1, Dbs),
-    Sigs = couch_mrview_util:get_signatures(hd(Dbs)),
     ok = cleanup_purges(Sigs, AllPurges, Dbs),
     ok = cleanup_indices(Sigs, AllIndices).
 
diff --git a/src/fabric/src/fabric_db_info.erl 
b/src/fabric/src/fabric_db_info.erl
index 5461404c5..08d3747aa 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -105,6 +105,8 @@ merge_results(Info) ->
                 [{doc_count, lists:sum(X)} | Acc];
             (doc_del_count, X, Acc) ->
                 [{doc_del_count, lists:sum(X)} | Acc];
+            (drop_count, X, Acc) ->
+                [{drop_count, lists:sum(X)} | Acc];
             (compact_running, X, Acc) ->
                 [{compact_running, lists:member(true, X)} | Acc];
             (sizes, X, Acc) ->
diff --git a/src/fabric/src/fabric_drop_seq.erl 
b/src/fabric/src/fabric_drop_seq.erl
index c00ea24ea..f01bc3fee 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -6,12 +6,25 @@
 
 -export([go/1]).
 
+-export([
+    create_peer_checkpoint_doc_if_missing/5,
+    update_peer_checkpoint_doc/5,
+    cleanup_peer_checkpoint_docs/3,
+    peer_checkpoint_doc/4,
+    peer_id_from_sig/2
+]).
+
+%% rpc
+-export([gather_drop_seq_info_rpc/1]).
+
 -type range() :: [non_neg_integer()].
 
 -type uuid() :: binary().
 
 -type seq() :: non_neg_integer().
 
+-type uuid_map() :: #{{Range :: range(), Node :: node()} => uuid()}.
+
 -type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(), 
Seq :: seq()}}.
 
 -type history_item() :: {
@@ -23,69 +36,114 @@
 }.
 
 go(DbName) ->
-    {PeerCheckpoints0, ShardSyncHistory} = parse_local_docs(DbName),
-    ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory),
-    PeerCheckpoints1 = maps:merge_with(fun merge_peers/3, PeerCheckpoints0, 
ShardSyncCheckpoints),
-    PeerCheckpoints2 = crossref(PeerCheckpoints1, ShardSyncHistory),
-    Shards = mem3:live_shards(DbName, [node() | nodes()]),
-    RexiMon = fabric_util:create_monitors(Shards),
+    Shards0 = mem3:shards(DbName),
+    #{
+        uuid_map := UuidMap,
+        peer_checkpoints := PeerCheckpoints,
+        shard_sync_history := ShardSyncHistory
+    } = gather_drop_seq_info(
+        Shards0
+    ),
+    {Shards1, DropSeqs} = go_int(
+        Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory
+    ),
     Workers = lists:filtermap(
         fun(Shard) ->
             #shard{range = Range, node = Node, name = ShardName} = Shard,
-            case maps:find({Range, Node}, PeerCheckpoints2) of
+            case maps:find({Range, Node}, DropSeqs) of
+                {ok, {_UuidPrefix, 0}} ->
+                    false;
                 {ok, {UuidPrefix, DropSeq}} ->
                     Ref = rexi:cast(
                         Node,
                         {fabric_rpc, set_drop_seq, [ShardName, UuidPrefix, 
DropSeq, [?ADMIN_CTX]]}
                     ),
-                    {true, Shard#shard{ref = Ref}};
+                    {true, Shard#shard{ref = Ref, opts = [{drop_seq, 
DropSeq}]}};
                 error ->
                     false
             end
         end,
-        Shards
+        Shards1
     ),
-    Acc0 = {Workers, length(Workers) - 1},
-    try
-        case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) 
of
-            {ok, ok} ->
-                ok;
-            {timeout, {WorkersDict, _}} ->
-                DefunctWorkers = fabric_util:remove_done_workers(
-                    WorkersDict,
-                    nil
-                ),
-                fabric_util:log_timeout(
-                    DefunctWorkers,
-                    "set_drop_seq"
-                ),
-                {error, timeout};
-            {error, Reason} ->
-                {error, Reason}
-        end
-    after
-        rexi_monitor:stop(RexiMon)
+    if
+        Workers == [] ->
+            %% nothing to do
+            {ok, #{}};
+        true ->
+            RexiMon = fabric_util:create_monitors(Shards1),
+            Acc0 = {#{}, length(Workers) - 1},
+            try
+                case fabric_util:recv(Workers, #shard.ref, fun 
handle_set_drop_seq_reply/3, Acc0) of
+                    {ok, Results} ->
+                        {ok, Results};
+                    {timeout, {WorkersDict, _}} ->
+                        DefunctWorkers = fabric_util:remove_done_workers(
+                            WorkersDict,
+                            nil
+                        ),
+                        fabric_util:log_timeout(
+                            DefunctWorkers,
+                            "set_drop_seq"
+                        ),
+                        {error, timeout};
+                    {error, Reason} ->
+                        {error, Reason}
+                end
+            after
+                rexi_monitor:stop(RexiMon)
+            end
     end.
 
-handle_message(ok, _Worker, {_Workers, 0}) ->
-    {stop, ok};
-handle_message(ok, Worker, {Workers, Waiting}) ->
-    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
-handle_message(Error, _, _Acc) ->
+go_int(Shards0, UuidFetcher, PeerCheckpoints0, ShardSyncHistory) ->
+    Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
+    PeerCheckpoints1 = crossref(PeerCheckpoints0, ShardSyncHistory),
+    PeerCheckpoints2 = substitute_splits(Shards1, UuidFetcher, 
PeerCheckpoints1),
+    DropSeqs = calculate_drop_seqs(PeerCheckpoints2, ShardSyncHistory),
+    {Shards1, DropSeqs}.
+
+-spec calculate_drop_seqs(peer_checkpoints(), shard_sync_history()) ->
+    peer_checkpoints().
+calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory) ->
+    ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory),
+    PeerCheckpoints1 = maps:merge_with(fun merge_peers/3, PeerCheckpoints0, 
ShardSyncCheckpoints),
+    crossref(PeerCheckpoints1, ShardSyncHistory).
+
+handle_set_drop_seq_reply(ok, Worker, {Results0, Waiting}) ->
+    DropSeq = proplists:get_value(drop_seq, Worker#shard.opts),
+    [B, E] = Worker#shard.range,
+    BHex = couch_util:to_hex(<<B:32/integer>>),
+    EHex = couch_util:to_hex(<<E:32/integer>>),
+    Range = list_to_binary([BHex, "-", EHex]),
+    Results1 = maps:merge_with(
+        fun(_Key, Val1, Val2) ->
+            maps:merge(Val1, Val2)
+        end,
+        Results0,
+        #{Range => #{Worker#shard.node => DropSeq}}
+    ),
+    if
+        Waiting == 0 ->
+            {stop, Results1};
+        true ->
+            {ok, {Results1, Waiting - 1}}
+    end;
+handle_set_drop_seq_reply(Error, _, _Acc) ->
     {error, Error}.
 
 crossref(PeerCheckpoints0, ShardSyncHistory) ->
     PeerCheckpoints1 = maps:fold(
         fun({Range, Node}, {Uuid, Seq}, Acc1) ->
             Others = maps:filter(
-                fun({R, _S, T}, _V) -> R == Range andalso T /= Node end, 
ShardSyncHistory
+                fun({R, _S, T}, _History) -> R == Range andalso T /= Node end, 
ShardSyncHistory
             ),
             maps:fold(
-                fun({R, _S, T}, H, Acc2) ->
+                fun({R, _S, T}, History, Acc2) ->
                     case
                         lists:search(
-                            fun({SU, SS, _TU, _TS}) -> Uuid == SU andalso SS 
=< Seq end,
-                            H
+                            fun({SU, SS, _TU, _TS}) ->
+                                uuids_match([Uuid, SU]) andalso SS =< Seq
+                            end,
+                            History
                         )
                     of
                         {value, {_SU, _SS, TU, TS}} ->
@@ -107,29 +165,114 @@ crossref(PeerCheckpoints0, ShardSyncHistory) ->
     %% crossreferences may be possible.
     if
         PeerCheckpoints0 == PeerCheckpoints1 ->
-            PeerCheckpoints1;
+            %% insert {<<>>, 0} for any missing crossref so that shard sync
+            %% history is subordinate.
+            maps:fold(
+                fun({Range, Node}, {_Uuid, _Seq}, Acc1) ->
+                    Others = maps:filter(
+                        fun({R, _S, T}, _History) -> R == Range andalso T /= 
Node end,
+                        ShardSyncHistory
+                    ),
+                    maps:fold(
+                        fun({R, _S, T}, _History, Acc3) ->
+                            maps:merge(#{{R, T} => {<<>>, 0}}, Acc3)
+                        end,
+                        Acc1,
+                        Others
+                    )
+                end,
+                PeerCheckpoints1,
+                PeerCheckpoints1
+            );
         true ->
             crossref(PeerCheckpoints1, ShardSyncHistory)
     end.
 
--spec parse_local_docs(DbName :: binary()) -> {peer_checkpoints(), 
shard_sync_history()}.
-parse_local_docs(DbName) ->
-    {ok, Result} = fabric:all_docs(
-        DbName, fun parse_local_docs_cb/2, {#{}, #{}}, all_docs_mrargs()
+%% return only the shards that have synced with every other replica
+fully_replicated_shards_only(Shards, ShardSyncHistory) ->
+    lists:filter(
+        fun(#shard{range = Range, node = Node}) ->
+            ExpectedPeers = [
+                S#shard.node
+             || S <- Shards, S#shard.range == Range, S#shard.node /= Node
+            ],
+            ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
+            lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end, 
ExpectedKeys)
+        end,
+        Shards
+    ).
+
+-spec gather_drop_seq_info(Shards :: [#shard{}]) -> {peer_checkpoints(), 
shard_sync_history()}.
+gather_drop_seq_info([#shard{} | _] = Shards) ->
+    Workers = fabric_util:submit_jobs(
+        Shards, ?MODULE, gather_drop_seq_info_rpc, []
     ),
-    Result.
+    RexiMon = fabric_util:create_monitors(Workers),
+    Acc0 = #{uuid_map => #{}, peer_checkpoints => #{}, shard_sync_history => 
#{}},
+    try
+        case
+            rexi_utils:recv(
+                Workers,
+                #shard.ref,
+                fun gather_drop_seq_info_cb/3,
+                {Acc0, length(Workers) - 1},
+                fabric_util:request_timeout(),
+                infinity
+            )
+        of
+            {ok, Result} ->
+                Result;
+            {timeout, _State} ->
+                {error, timeout};
+            {error, Reason} ->
+                {error, Reason}
+        end
+    after
+        rexi_monitor:stop(RexiMon),
+        fabric_streams:cleanup(Workers)
+    end.
 
-parse_local_docs_cb({row, Row}, Acc) ->
-    case lists:keyfind(doc, 1, Row) of
-        false ->
+gather_drop_seq_info_rpc(DbName) ->
+    case couch_db:open_int(DbName, []) of
+        {ok, Db} ->
+            try
+                Uuid = couch_db:get_uuid(Db),
+                Acc0 = {#{}, #{}},
+                {ok, {PeerCheckpoints, ShardSyncHistory}} = 
couch_db:fold_local_docs(
+                    Db, fun gather_drop_seq_info_fun/2, Acc0, []
+                ),
+                rexi:reply(
+                    {ok, #{
+                        uuid => Uuid,
+                        peer_checkpoints => PeerCheckpoints,
+                        shard_sync_history => ShardSyncHistory
+                    }}
+                )
+            after
+                couch_db:close(Db)
+            end;
+        Else ->
+            rexi:reply(Else)
+    end.
+
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", _/binary>>} = Doc,
+    {PeerCheckpoints0, ShardSyncHistory} = Acc
+) ->
+    {Props} = Doc#doc.body,
+    case couch_util:get_value(<<"update_seq">>, Props) of
+        undefined ->
             {ok, Acc};
-        {doc, Doc} ->
-            parse_local_doc(couch_doc:from_json_obj(Doc), Acc)
+        UpdateSeq ->
+            PeerCheckpoints1 = maps:merge_with(
+                fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0
+            ),
+            {ok, {PeerCheckpoints1, ShardSyncHistory}}
     end;
-parse_local_docs_cb(_Else, Acc) ->
-    {ok, Acc}.
-
-parse_local_doc(#doc{id = <<"_local/shard-sync-", _/binary>>} = Doc, Acc) ->
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>>} = Doc,
+    {PeerCheckpoints, ShardSyncHistory0} = Acc
+) ->
     {Props} = Doc#doc.body,
     case couch_util:get_value(<<"dbname">>, Props) of
         undefined ->
@@ -138,7 +281,6 @@ parse_local_doc(#doc{id = <<"_local/shard-sync-", 
_/binary>>} = Doc, Acc) ->
         DbName ->
             Range = mem3:range(DbName),
             {[{_SrcNode, History}]} = couch_util:get_value(<<"history">>, 
Props),
-            {PeerCheckpoints, ShardSyncHistory0} = Acc,
             KeyFun = fun({Item}) ->
                 {Range, 
binary_to_existing_atom(couch_util:get_value(<<"source_node">>, Item)),
                     
binary_to_existing_atom(couch_util:get_value(<<"target_node">>, Item))}
@@ -151,35 +293,55 @@ parse_local_doc(#doc{id = <<"_local/shard-sync-", 
_/binary>>} = Doc, Acc) ->
                     couch_util:get_value(<<"target_seq">>, Item)
                 }
             end,
-
             ShardSyncHistory1 = maps:merge(
                 maps:groups_from_list(KeyFun, ValueFun, History), 
ShardSyncHistory0
             ),
             {ok, {PeerCheckpoints, ShardSyncHistory1}}
     end;
-parse_local_doc(#doc{id = <<"_local/peer-checkpoint-", _/binary>>} = Doc, Acc) 
->
-    {Props} = Doc#doc.body,
-    case couch_util:get_value(<<"update_seq">>, Props) of
-        undefined ->
-            {ok, Acc};
-        UpdateSeq ->
-            {PeerCheckpoints0, ShardSyncHistory} = Acc,
-            PeerCheckpoints1 = maps:merge_with(
-                fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0
-            ),
-            {ok, {PeerCheckpoints1, ShardSyncHistory}}
-    end;
-parse_local_doc(_Doc, Acc) ->
+gather_drop_seq_info_fun(#doc{}, Acc) ->
+    %% ignored
     {ok, Acc}.
 
-merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when is_integer(Val1), 
is_integer(Val2) ->
-    PrefixLen = min(byte_size(Uuid1), byte_size(Uuid2)),
-    case binary:longest_common_prefix([Uuid1, Uuid2]) == PrefixLen of
+gather_drop_seq_info_cb({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count}) ->
+    MergedInfo = merge_info(Worker, Info, Acc),
+    if
+        Count == 0 ->
+            {stop, MergedInfo};
         true ->
-            {Uuid1, min(Val1, Val2)};
-        false ->
-            {Uuid2, Val2}
-    end.
+            {ok, {MergedInfo, Count - 1}}
+    end;
+gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}}.
+
+merge_info(#shard{} = Shard, Info, Acc) ->
+    #{
+        uuid_map =>
+            maps:put(
+                {Shard#shard.range, Shard#shard.node}, maps:get(uuid, Info), 
maps:get(uuid_map, Acc)
+            ),
+        peer_checkpoints => maps:merge_with(
+            fun merge_peers/3,
+            maps:get(peer_checkpoints, Info),
+            maps:get(peer_checkpoints, Acc)
+        ),
+        shard_sync_history => maps:merge(
+            maps:get(shard_sync_history, Info), maps:get(shard_sync_history, 
Acc)
+        )
+    }.
+
+merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
+    is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2)
+->
+    true = uuids_match([Uuid1, Uuid2]),
+    {Uuid1, min(Val1, Val2)}.
+
+uuids_match(Uuids) when is_list(Uuids) ->
+    PrefixLen = lists:min([byte_size(Uuid) || Uuid <- Uuids]),
+    binary:longest_common_prefix(Uuids) == PrefixLen.
 
 decode_seq(OpaqueSeq) ->
     Decoded = fabric_view_changes:decode_seq(OpaqueSeq),
@@ -203,16 +365,6 @@ decode_seq(OpaqueSeq) ->
         Decoded
     ).
 
-all_docs_mrargs() ->
-    #mrargs{
-        view_type = map,
-        include_docs = true,
-        extra = [
-            {include_system, true},
-            {namespace, <<"_local">>}
-        ]
-    }.
-
 latest_shard_sync_checkpoints(ShardSyncHistory) ->
     maps:fold(
         fun({R, SN, _TN}, History, Acc) ->
@@ -222,3 +374,541 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
         #{},
         ShardSyncHistory
     ).
+
+%% A shard may have been split since a peer saw it.
+-spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) -> 
peer_checkpoints().
+substitute_splits(Shards, UuidMap, PeerCheckpoints) ->
+    maps:fold(
+        fun({[B1, E1], Node}, {Uuid, Seq}, Acc) ->
+            ShardsInRange = [
+                S
+             || #shard{range = [B2, E2]} = S <- Shards,
+                Node == S#shard.node,
+                B2 >= B1 andalso E2 =< E1
+            ],
+            %% lookup uuid from map if substituted
+            AsMap = maps:from_list(
+                lists:filtermap(
+                    fun(#shard{} = Shard) ->
+                        Key = {Shard#shard.range, Shard#shard.node},
+                        if
+                            [B1, E1] == Shard#shard.range ->
+                                {true, {Key, {Uuid, Seq}}};
+                            true ->
+                                case maps:find(Key, UuidMap) of
+                                    {ok, SubstUuid} ->
+                                        {true, {Key, {SubstUuid, Seq}}};
+                                    error ->
+                                        false
+                                end
+                        end
+                    end,
+                    ShardsInRange
+                )
+            ),
+            maps:merge_with(fun merge_peers/3, AsMap, Acc)
+        end,
+        #{},
+        PeerCheckpoints
+    ).
+
+create_peer_checkpoint_doc_if_missing(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName), is_binary(PeerId), is_integer(UpdateSeq)
+->
+    create_peer_checkpoint_doc_if_missing(
+        DbName, Subtype, Source, PeerId, pack_seq(DbName, UpdateSeq)
+    );
+create_peer_checkpoint_doc_if_missing(
+    DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_integer(UpdateSeq)
+->
+    ok;
+create_peer_checkpoint_doc_if_missing(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_binary(UpdateSeq)
+->
+    {_, Ref} = spawn_monitor(fun() ->
+        case
+            fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(Subtype, 
PeerId), [?ADMIN_CTX])
+        of
+            {ok, _} ->
+                ok;
+            {not_found, _} ->
+                update_peer_checkpoint_doc(DbName, Subtype, Source, PeerId, 
UpdateSeq);
+            {error, Reason} ->
+                throw({checkpoint_commit_failure, Reason})
+        end
+    end),
+    receive
+        {'DOWN', Ref, _, _, ok} ->
+            ok;
+        {'DOWN', Ref, _, _, Else} ->
+            Else
+    end.
+
+update_peer_checkpoint_doc(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_integer(UpdateSeq)
+->
+    update_peer_checkpoint_doc(DbName, Subtype, Source, PeerId, 
pack_seq(DbName, UpdateSeq));
+update_peer_checkpoint_doc(
+    DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_integer(UpdateSeq)
+->
+    ok;
+update_peer_checkpoint_doc(
+    <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
+) when
+    is_binary(DbName),
+    is_binary(Subtype),
+    is_binary(Source),
+    is_binary(PeerId),
+    is_binary(UpdateSeq)
+->
+    {_, OpenRef} = spawn_monitor(fun() ->
+        case
+            fabric:open_doc(mem3:dbname(DbName), peer_checkpoint_id(Subtype, 
PeerId), [?ADMIN_CTX])
+        of
+            {ok, ExistingDoc} ->
+                exit(ExistingDoc#doc.revs);
+            {not_found, _Reason} ->
+                exit({0, []});
+            {error, Reason} ->
+                throw({checkpoint_fetch_failure, Reason})
+        end
+    end),
+    receive
+        {'DOWN', OpenRef, _, _, Revs} ->
+            NewDoc0 = peer_checkpoint_doc(PeerId, Subtype, Source, UpdateSeq),
+            NewDoc1 = NewDoc0#doc{revs = Revs},
+            {_, UpdateRef} = spawn_monitor(fun() ->
+                case fabric:update_doc(mem3:dbname(DbName), NewDoc1, 
[?ADMIN_CTX]) of
+                    {ok, _} ->
+                        couch_log:notice(
+                            "updated peer checkpoint for db:~s, subtype:~s, 
source:~s, peer_id:~s, update_seq:~s",
+                            [
+                                DbName, Subtype, Source, PeerId, UpdateSeq
+                            ]
+                        ),
+                        ok;
+                    {error, Reason} ->
+                        throw({checkpoint_commit_failure, Reason})
+                end
+            end),
+            receive
+                {'DOWN', UpdateRef, _, _, ok} ->
+                    ok;
+                {'DOWN', UpdateRef, _, _, Else} ->
+                    Else
+            end;
+        {'DOWN', OpenRef, _, _, not_found} ->
+            ok;
+        {'DOWN', OpenRef, _, _, Else} ->
+            Else
+    end.
+
+peer_checkpoint_doc(PeerId, Subtype, Source, UpdateSeq) when
+    is_binary(PeerId), is_binary(Subtype), is_binary(Source), 
is_binary(UpdateSeq)
+->
+    #doc{
+        id = peer_checkpoint_id(Subtype, PeerId),
+        body =
+            {[
+                {<<"type">>, <<"peer-checkpoint">>},
+                {<<"subtype">>, Subtype},
+                {<<"source">>, Source},
+                {<<"update_seq">>, UpdateSeq},
+                {<<"last_updated">>, ?l2b(couch_log_util:iso8601_timestamp())}
+            ]}
+    }.
+
+peer_checkpoint_id(Subtype, PeerId) ->
+    <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", Subtype/binary, "-", 
PeerId/binary>>.
+
+peer_id_from_sig(DbName, Sig) when is_binary(DbName), is_binary(Sig) ->
+    Hash = couch_util:encodeBase64Url(
+        crypto:hash(sha256, [atom_to_binary(node()), $0, DbName])
+    ),
+    <<Sig/binary, "$", Hash/binary>>.
+
+pack_seq(DbName, UpdateSeq) ->
+    PrefixLen = fabric_util:get_uuid_prefix_len(),
+    DbUuid = couch_util:with_db(DbName, fun(Db) -> couch_db:get_uuid(Db) end),
+    fabric_view_changes:pack_seqs(
+        [
+            {
+                #shard{node = node(), range = mem3:range(DbName)},
+                {UpdateSeq, binary:part(DbUuid, {0, PrefixLen}), node()}
+            }
+        ]
+    ).
+
+cleanup_peer_checkpoint_docs(DbName, SubType, KeepSigs) when
+    is_binary(DbName), is_binary(SubType), is_list(KeepSigs)
+->
+    MrArgs = #mrargs{
+        view_type = map,
+        include_docs = true,
+        start_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, 
"-">>,
+        end_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, 
".">>,
+        inclusive_end = false,
+        extra = [
+            {include_system, true},
+            {namespace, <<"_local">>}
+        ]
+    },
+    {ok, {SubType, KeepSigs, DocsToDelete}} = fabric:all_docs(
+        DbName, fun cleanup_peer_checkpoints_cb/2, {SubType, KeepSigs, []}, 
MrArgs
+    ),
+    {ok, _} = fabric:update_docs(DbName, DocsToDelete, [?ADMIN_CTX]).
+
+cleanup_peer_checkpoints_cb({row, Row}, {SubType, KeepSigs, DocsToDelete} = 
Acc) ->
+    {doc, JsonDoc} = lists:keyfind(doc, 1, Row),
+    Doc = couch_doc:from_json_obj(JsonDoc),
+    #doc{
+        id =
+            <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", 
SubType:(byte_size(SubType))/binary, "-",
+                SigHash/binary>>
+    } = Doc,
+    [Sig, _Hash] = binary:split(SigHash, <<"$">>),
+    case lists:member(Sig, KeepSigs) of
+        true ->
+            {ok, Acc};
+        false ->
+            {ok, {SubType, KeepSigs, [Doc#doc{deleted = true, body = {[]}} | 
DocsToDelete]}}
+    end;
+cleanup_peer_checkpoints_cb(_Else, Acc) ->
+    {ok, Acc}.
+
+-ifdef(TEST).
+-include_lib("couch/include/couch_eunit.hrl").
+
+empty_sync_history_means_no_change_test() ->
+    Range = [0, 10],
+    Node1 = '[email protected]',
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+    ShardSyncHistory = #{},
+    ?assertEqual(PeerCheckpoints, calculate_drop_seqs(PeerCheckpoints, 
ShardSyncHistory)).
+
+matching_sync_history_expands_result_test() ->
+    Range = [0, 10],
+    Node1 = '[email protected]',
+    Node2 = '[email protected]',
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+    ShardSyncHistory = #{{Range, Node1, Node2} => [{<<"uuid1">>, 12, 
<<"uuid2">>, 5}]},
+    ?assertEqual(
+        #{
+            {Range, Node1} => {<<"uuid1">>, 12},
+            {Range, Node2} => {<<"uuid2">>, 5}
+        },
+        calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)
+    ).
+
+transitive_sync_history_expands_result_test() ->
+    Range = [0, 10],
+    Node1 = '[email protected]',
+    Node2 = '[email protected]',
+    Node3 = '[email protected]',
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+    ShardSyncHistory = #{
+        {Range, Node1, Node2} => [{<<"uuid1">>, 12, <<"uuid2">>, 5}],
+        {Range, Node2, Node3} => [{<<"uuid2">>, 11, <<"uuid3">>, 11}]
+    },
+    ?assertEqual(
+        #{
+            {Range, Node1} => {<<"uuid1">>, 12},
+            {Range, Node2} => {<<"uuid2">>, 5},
+            {Range, Node3} => {<<"uuid3">>, 11}
+        },
+        calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)
+    ).
+
+shard_sync_history_caps_peer_checkpoint_test() ->
+    Range = [0, 10],
+    Node1 = '[email protected]',
+    Node2 = '[email protected]',
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+    ShardSyncHistory = #{{Range, Node1, Node2} => [{<<"uuid1">>, 10, 
<<"uuid2">>, 5}]},
+    ?assertEqual(
+        #{
+            {Range, Node1} => {<<"uuid1">>, 10},
+            {Range, Node2} => {<<"uuid2">>, 5}
+        },
+        calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)
+    ).
+
+multiple_range_test() ->
+    Range1 = [0, 10],
+    Range2 = [11, 20],
+    Node1 = '[email protected]',
+    Node2 = '[email protected]',
+    PeerCheckpoints = #{{Range1, Node1} => {<<"r1n1">>, 12}, {Range2, Node2} 
=> {<<"r2n2">>, 20}},
+    ShardSyncHistory = #{
+        {Range1, Node1, Node2} => [{<<"r1n1">>, 10, <<"r1n2">>, 5}],
+        {Range2, Node2, Node1} => [{<<"r2n2">>, 19, <<"r2n1">>, 17}]
+    },
+    ?assertEqual(
+        #{
+            {Range1, Node1} => {<<"r1n1">>, 10},
+            {Range1, Node2} => {<<"r1n2">>, 5},
+            {Range2, Node2} => {<<"r2n2">>, 19},
+            {Range2, Node1} => {<<"r2n1">>, 17}
+        },
+        calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)
+    ).
+
+search_history_for_latest_safe_crossover_test() ->
+    Range = [0, 10],
+    Node1 = '[email protected]',
+    Node2 = '[email protected]',
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 50}},
+    ShardSyncHistory = #{
+        {Range, Node1, Node2} => [
+            {<<"uuid1">>, 100, <<"uuid2">>, 99},
+            {<<"uuid1">>, 75, <<"uuid2">>, 76},
+            {<<"uuid1">>, 50, <<"uuid2">>, 51},
+            {<<"uuid1">>, 40, <<"uuid2">>, 41}
+        ]
+    },
+    ?assertEqual(
+        #{
+            {Range, Node1} => {<<"uuid1">>, 50},
+            {Range, Node2} => {<<"uuid2">>, 51}
+        },
+        calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)
+    ).
+
+fully_replicated_shards_only_test_() ->
+    Range1 = [0, 1],
+    Range2 = [1, 2],
+    Shards = [
+        #shard{node = node1, range = Range1},
+        #shard{node = node2, range = Range1},
+        #shard{node = node3, range = Range1},
+        #shard{node = node1, range = Range2},
+        #shard{node = node2, range = Range2}
+    ],
+    [
+        %% empty history means no fully replicated shards
+        ?_assertEqual([], fully_replicated_shards_only(Shards, #{})),
+        %% some but not all peers
+        ?_assertEqual(
+            [],
+            fully_replicated_shards_only(Shards, #{
+                {Range1, node2, node1} => {0, <<>>}
+            })
+        ),
+        %% all peers of one replica
+        ?_assertEqual(
+            [#shard{node = node1, range = Range1}],
+            fully_replicated_shards_only(Shards, #{
+                {Range1, node2, node1} => {0, <<>>},
+                {Range1, node3, node1} => {0, <<>>}
+            })
+        ),
+        %% all peers of one range
+        ?_assertEqual(
+            [
+                #shard{node = node1, range = Range1},
+                #shard{node = node2, range = Range1},
+                #shard{node = node3, range = Range1}
+            ],
+            fully_replicated_shards_only(Shards, #{
+                {Range1, node2, node1} => {0, <<>>},
+                {Range1, node3, node1} => {0, <<>>},
+                {Range1, node1, node2} => {0, <<>>},
+                {Range1, node3, node2} => {0, <<>>},
+                {Range1, node1, node3} => {0, <<>>},
+                {Range1, node2, node3} => {0, <<>>}
+            })
+        )
+    ].
+
+substitute_splits_test() ->
+    Range = [0, 10],
+    Subrange1 = [0, 5],
+    Subrange2 = [6, 10],
+    Node1 = '[email protected]',
+    Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = 
Subrange2, node = Node1}],
+    UuidMap = #{
+        {Subrange1, Node1} => <<"uuid2">>,
+        {Subrange2, Node1} => <<"uuid3">>
+    },
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+
+    ?assertEqual(
+        #{{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => 
{<<"uuid3">>, 12}},
+        substitute_splits(Shards, UuidMap, PeerCheckpoints)
+    ).
+
+crossref_test_() ->
+    Range = [0, 10],
+    Node1 = '[email protected]',
+    Node2 = '[email protected]',
+    Node3 = '[email protected]',
+    [
+        ?_assertEqual(#{}, crossref(#{}, #{})),
+        ?_assertEqual(
+            #{
+                {Range, Node1} => {<<"n1">>, 5},
+                {Range, Node2} => {<<"n2">>, 4},
+                {Range, Node3} => {<<"n3">>, 3}
+            },
+            crossref(
+                #{{Range, Node1} => {<<"n1">>, 5}},
+                #{
+                    {Range, Node1, Node2} => [
+                        {<<"n1">>, 10, <<"n2">>, 9},
+                        {<<"n1">>, 5, <<"n2">>, 4},
+                        {<<"n1">>, 2, <<"n2">>, 1}
+                    ],
+                    {Range, Node1, Node3} => [
+                        {<<"n1">>, 9, <<"n3">>, 8},
+                        {<<"n1">>, 4, <<"n3">>, 3},
+                        {<<"n1">>, 3, <<"n3">>, 2}
+                    ]
+                }
+            )
+        ),
+        ?_assertEqual(
+            #{
+                {Range, Node1} => {<<"n1">>, 5},
+                {Range, Node2} => {<<"n2x">>, 4},
+                {Range, Node3} => {<<"n3x">>, 3}
+            },
+            crossref(
+                #{{Range, Node1} => {<<"n1">>, 5}},
+                #{
+                    {Range, Node1, Node2} => [
+                        {<<"n1x">>, 10, <<"n2x">>, 9},
+                        {<<"n1x">>, 5, <<"n2x">>, 4},
+                        {<<"n1x">>, 2, <<"n2x">>, 1}
+                    ],
+                    {Range, Node1, Node3} => [
+                        {<<"n1x">>, 9, <<"n3x">>, 8},
+                        {<<"n1x">>, 4, <<"n3x">>, 3},
+                        {<<"n1x">>, 3, <<"n3x">>, 2}
+                    ]
+                }
+            )
+        )
+    ].
+
+go_int_test_() ->
+    Range = [0, 10],
+    Subrange1 = [0, 5],
+    Subrange2 = [6, 10],
+    Node1 = '[email protected]',
+    Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = 
Subrange2, node = Node1}],
+    UuidMap = #{
+        {Subrange1, Node1} => <<"uuid2">>,
+        {Subrange2, Node1} => <<"uuid3">>
+    },
+    [
+        ?_assertEqual(
+            {Shards, #{
+                {Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => 
{<<"uuid3">>, 12}
+            }},
+            go_int(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}}, 
#{})
+        ),
+        ?_assertEqual(
+            {Shards, #{
+                {Subrange1, Node1} => {<<"uuid2">>, 10}, {Subrange2, Node1} => 
{<<"uuid3">>, 12}
+            }},
+            go_int(
+                Shards,
+                UuidMap,
+                #{{Range, Node1} => {<<"uuid1">>, 12}, {Subrange1, Node1} => 
{<<"uuid2">>, 10}},
+                #{}
+            )
+        )
+    ].
+
+go_int2_test_() ->
+    Range = [0, 10],
+    Subrange1 = [0, 5],
+    Subrange2 = [6, 10],
+    Node1 = '[email protected]',
+    Node2 = '[email protected]',
+    Shards = [
+        #shard{range = Subrange1, node = Node1},
+        #shard{range = Subrange2, node = Node1},
+        #shard{range = Subrange1, node = Node2},
+        #shard{range = Subrange2, node = Node2}
+    ],
+    UuidMap = #{
+        {Subrange1, Node1} => <<"s1n1">>,
+        {Subrange2, Node1} => <<"s2n1">>,
+        {Subrange1, Node2} => <<"s1n2">>,
+        {Subrange2, Node2} => <<"s2n2">>
+    },
+    ShardSyncHistory =
+        #{
+            {Subrange1, Node1, Node2} => [
+                {<<"s1n1">>, 100, <<"s1n2">>, 99},
+                {<<"s1n1">>, 75, <<"s1n2">>, 76},
+                {<<"s1n1">>, 50, <<"s1n2">>, 51},
+                {<<"s1n1">>, 12, <<"s1n2">>, 11}
+            ],
+            {Subrange2, Node1, Node2} => [
+                {<<"s2n1">>, 100, <<"s2n2">>, 99},
+                {<<"s2n1">>, 75, <<"s2n2">>, 76},
+                {<<"s2n1">>, 50, <<"s2n2">>, 51},
+                {<<"s2n1">>, 12, <<"s2n2">>, 11}
+            ],
+            {Subrange1, Node2, Node1} => [
+                {<<"s1n2">>, 100, <<"s1n1">>, 99},
+                {<<"s1n2">>, 75, <<"s1n1">>, 76},
+                {<<"s1n2">>, 50, <<"s1n1">>, 51},
+                {<<"s1n2">>, 12, <<"s1n1">>, 11}
+            ],
+            {Subrange2, Node2, Node1} => [
+                {<<"s2n2">>, 100, <<"s2n1">>, 99},
+                {<<"s2n2">>, 75, <<"s2n1">>, 76},
+                {<<"s2n2">>, 50, <<"s2n1">>, 51},
+                {<<"s2n2">>, 12, <<"s2n1">>, 11}
+            ]
+        },
+    [
+        ?_assertEqual(
+            #{
+                {Subrange1, Node1} => {<<"s1n1">>, 12},
+                {Subrange2, Node1} => {<<"s2n1">>, 12},
+                {Subrange1, Node2} => {<<"s1n2">>, 11},
+                {Subrange2, Node2} => {<<"s2n2">>, 11}
+            },
+            element(
+                2,
+                go_int(
+                    Shards,
+                    UuidMap,
+                    #{{Range, Node1} => {<<"ignored">>, 12}},
+                    ShardSyncHistory
+                )
+            )
+        )
+    ].
+
+-endif.
diff --git a/src/mem3/src/mem3_reshard_index.erl 
b/src/mem3/src/mem3_reshard_index.erl
index 41e225d22..5881c9f12 100644
--- a/src/mem3/src/mem3_reshard_index.erl
+++ b/src/mem3/src/mem3_reshard_index.erl
@@ -130,7 +130,13 @@ build_index({?MRVIEW, DbName, MRSt} = Ctx, Try) ->
         Try
     );
 build_index({?NOUVEAU, _DbName, DIndex} = Ctx, Try) ->
-    UpdateFun = fun() -> nouveau_index_updater:update(DIndex) end,
+    UpdateFun = fun() ->
+        {IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, 
update, [DIndex]),
+        receive
+            {'DOWN', IndexerRef, process, IndexerPid, Reason} ->
+                Reason
+        end
+    end,
     retry_loop(Ctx, UpdateFun, Try);
 build_index({?DREYFUS, DbName, DIndex} = Ctx, Try) ->
     await_retry(
@@ -197,6 +203,8 @@ index_info({?MRVIEW, DbName, MRSt}) ->
     {DbName, GroupName};
 index_info({?DREYFUS, DbName, Index}) ->
     {DbName, Index};
+index_info({?NOUVEAU, DbName, Index}) ->
+    {DbName, Index};
 index_info({?HASTINGS, DbName, Index}) ->
     {DbName, Index}.
 
diff --git a/src/nouveau/src/nouveau_fabric_cleanup.erl 
b/src/nouveau/src/nouveau_fabric_cleanup.erl
index cd4128fb1..f010eef20 100644
--- a/src/nouveau/src/nouveau_fabric_cleanup.erl
+++ b/src/nouveau/src/nouveau_fabric_cleanup.erl
@@ -31,6 +31,7 @@ go(DbName) ->
             )
         ),
     Shards = mem3:shards(DbName),
+    fabric_drop_seq:cleanup_peer_checkpoint_docs(DbName, <<"nouveau">>, 
ActiveSigs),
     lists:foreach(
         fun(Shard) ->
             rexi:cast(Shard#shard.node, {nouveau_rpc, cleanup, 
[Shard#shard.name, ActiveSigs]})
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index efed245db..6ba3c3738 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -87,6 +87,13 @@ update(#index{} = Index) ->
                 {ok, PurgeAcc1} = purge_index(ConnPid, Db, Index, PurgeAcc0),
 
                 NewCurSeq = couch_db:get_update_seq(Db),
+                fabric_drop_seq:create_peer_checkpoint_doc_if_missing(
+                    Index#index.dbname,
+                    <<"nouveau">>,
+                    <<(Index#index.ddoc_id)/binary, "/", 
(Index#index.name)/binary>>,
+                    fabric_drop_seq:peer_id_from_sig(Index#index.dbname, 
Index#index.sig),
+                    NewCurSeq
+                ),
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -107,6 +114,16 @@ update(#index{} = Index) ->
                         Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
                     ),
                     nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
+                    {ok, #{<<"committed_update_seq">> := CommittedUpdateSeq}} 
= nouveau_api:index_info(
+                        Index
+                    ),
+                    fabric_drop_seq:update_peer_checkpoint_doc(
+                        Index#index.dbname,
+                        <<"nouveau">>,
+                        <<(Index#index.ddoc_id)/binary, "/", 
(Index#index.name)/binary>>,
+                        fabric_drop_seq:peer_id_from_sig(Index#index.dbname, 
Index#index.sig),
+                        CommittedUpdateSeq
+                    ),
                     exit(nouveau_api:set_update_seq(ConnPid, Index, 
Acc1#acc.update_seq, NewCurSeq))
                 after
                     ibrowse:stop_worker_process(ConnPid),
diff --git a/test/elixir/lib/utils.ex b/test/elixir/lib/utils.ex
index 3ecf878e7..15e321240 100644
--- a/test/elixir/lib/utils.ex
+++ b/test/elixir/lib/utils.ex
@@ -58,4 +58,4 @@ defmodule Couch.Test.Utils do
       end)
   end
 
-end
\ No newline at end of file
+end
diff --git a/test/elixir/test/config/nouveau.elixir 
b/test/elixir/test/config/nouveau.elixir
index 02157e2ca..14c22b24f 100644
--- a/test/elixir/test/config/nouveau.elixir
+++ b/test/elixir/test/config/nouveau.elixir
@@ -1,4 +1,7 @@
 %{
+  "DropSeqTest": [
+    "peer checkpoint - nouveau"
+  ],
   "NouveauTest": [
     "user-agent header is forbidden",
     "search analyze",
diff --git a/test/elixir/test/config/search.elixir 
b/test/elixir/test/config/search.elixir
index b49b8d762..438a8bd2c 100644
--- a/test/elixir/test/config/search.elixir
+++ b/test/elixir/test/config/search.elixir
@@ -1,4 +1,7 @@
 %{
+  "DropSeqTest": [
+    "peer checkpoint - search"
+  ],
   "PartitionSearchTest": [
     "Cannot do global query with partition view",
     "Cannot do partition query with global search ddoc",
diff --git a/test/elixir/test/config/suite.elixir 
b/test/elixir/test/config/suite.elixir
index 1d1e6059a..f283096c2 100644
--- a/test/elixir/test/config/suite.elixir
+++ b/test/elixir/test/config/suite.elixir
@@ -211,6 +211,11 @@
     "design doc path",
     "design doc path with slash in db name"
   ],
+  "DropSeqTest": [
+    "peer checkpoint - mrview",
+    "peer checkpoint - custom",
+    "peer checkpoint - shard split"
+  ],
   "ErlangViewsTest": [
     "Erlang map function",
     "Erlang reduce function",
diff --git a/test/elixir/test/drop_seq_statem_test.exs 
b/test/elixir/test/drop_seq_statem_test.exs
new file mode 100644
index 000000000..1e509df0b
--- /dev/null
+++ b/test/elixir/test/drop_seq_statem_test.exs
@@ -0,0 +1,415 @@
+defmodule DropSeqStateM do
+  use PropCheck, default_opts: &PropCheck.TestHelpers.config/0
+  use PropCheck.StateM
+  use CouchTestCase
+
+  # alias Couch.Test.Utils
+  # import Utils
+
+  @moduletag capture_log: true
+
+  # expected to pass in all three cluster scenarios
+  @moduletag :with_cluster
+  @moduletag :without_quorum_test
+  @moduletag :with_quorum_test
+
+  property "_update_drop_seq works correctly", start_size: 5, max_size: 100, 
numtests: 10000 do
+    forall cmds <- commands(__MODULE__) do
+      trap_exit do
+        db_name = random_db_name()
+        n = Enum.random(1..3)
+        q = Enum.random(1..10)
+        {:ok, _} = create_db(db_name, query: %{n: n, q: q})
+        r = run_commands(__MODULE__, cmds, [{:dbname, db_name}])
+        {history, state, result} = r
+        delete_db(db_name)
+
+        (result == :ok)
+        |> when_fail(
+          IO.puts("""
+          n: #{n}, q: #{q}
+          Commands: #{inspect(cmds, pretty: true)}
+          History: #{inspect(history, pretty: true)}
+          State: #{inspect(state, pretty: true)}
+          Result: #{inspect(result, pretty: true)}
+          """)
+        )
+      end
+    end
+  end
+
+  require Record
+
+  Record.defrecord(:state,
+    docs: [],
+    deleted_docs: [],
+    current_seq: 0,
+    peer_checkpoint_seq: nil,
+    drop_seq: nil,
+    drop_count: 0,
+    changed: false,
+    stale: false
+  )
+
+  def initial_state() do
+    state()
+  end
+
+  @max_docids 20
+  @docids 1..@max_docids |> Enum.map(&"doc-#{&1}")
+
+  def doc_id, do: oneof(@docids)
+
+  def index_type do
+    oneof([:mrview, :nouveau])
+  end
+
+  def command(s) do
+    cond do
+      state(s, :stale) ->
+        {:call, __MODULE__, :update_indexes, [{:var, :dbname}]}
+
+      state(s, :changed) ->
+        {:call, __MODULE__, :changes, [{:var, :dbname}]}
+
+      true ->
+        oneof([
+          {:call, __MODULE__, :update_document, [{:var, :dbname}, doc_id()]},
+          {:call, __MODULE__, :delete_document, [{:var, :dbname}, doc_id()]},
+          {:call, __MODULE__, :update_peer_checkpoint, [{:var, :dbname}]},
+          {:call, __MODULE__, :update_drop_seq, [{:var, :dbname}]},
+          {:call, __MODULE__, :compact_db, [{:var, :dbname}]},
+          {:call, __MODULE__, :split_shard, [{:var, :dbname}]},
+          {:call, __MODULE__, :create_index, [{:var, :dbname}, index_type()]}
+        ])
+    end
+  end
+
+  def get_document(db_name, doc_id) do
+    resp = Couch.get("/#{db_name}/#{doc_id}")
+
+    case resp.status_code do
+      200 ->
+        {:ok, resp.body}
+
+      404 ->
+        {:not_found, resp.body["reason"]}
+    end
+  end
+
+  def update_document(db_name, doc_id) do
+    case get_document(db_name, doc_id) do
+      {:ok, doc} ->
+        resp = Couch.put("/#{db_name}/#{doc_id}", body: doc)
+
+        assert resp.status_code == 201,
+               "Couch.put failed #{resp.status_code} #{inspect(resp.body)}"
+
+      {:not_found, _} ->
+        resp = Couch.put("/#{db_name}/#{doc_id}", body: %{})
+
+        assert resp.status_code == 201,
+               "Couch.put failed #{resp.status_code} #{inspect(resp.body)}"
+    end
+
+    sync_shards(db_name)
+  end
+
+  def delete_document(db_name, doc_id) do
+    case get_document(db_name, doc_id) do
+      {:ok, doc} ->
+        rev = doc["_rev"]
+        resp = Couch.delete("/#{db_name}/#{doc_id}?rev=#{rev}")
+
+        assert resp.status_code == 200,
+               "Couch.delete failed #{resp.status_code} #{inspect(resp.body)}"
+
+      {:not_found, _} ->
+        :ok
+    end
+
+    sync_shards(db_name)
+  end
+
+  def update_peer_checkpoint(db_name) do
+    resp = Couch.get("/#{db_name}")
+
+    assert resp.status_code == 200,
+           "Couch.get failed #{resp.status_code} #{inspect(resp.body)}"
+
+    update_seq = resp.body["update_seq"]
+
+    resp =
+      Couch.put("/#{db_name}/_local/peer-checkpoint-foo",
+        body: %{
+          update_seq: update_seq
+        }
+      )
+
+    assert resp.status_code == 201,
+           "update_peer_checkpoint failed #{resp.status_code} 
#{inspect(resp.body)}"
+
+    seq_to_shards(update_seq)
+  end
+
+  def update_drop_seq(db_name) do
+    resp = Couch.post("/#{db_name}/_update_drop_seq")
+
+    assert resp.status_code == 201,
+           "update_drop_seq failed #{resp.status_code} #{inspect(resp.body)}"
+
+    resp.body
+  end
+
+  def compact_db(db_name) do
+    compact(db_name)
+    # try to avoid seeing pre-compact state of shards immediately after
+    # compactor pids exit
+    :timer.sleep(1000)
+  end
+
+  def changes(db_name) do
+    resp = Couch.get("/#{db_name}/_changes")
+    assert resp.status_code == 200
+
+    List.foldl(resp.body["results"], {[], []}, fn change, {doc_ids, 
del_doc_ids} ->
+      if change["deleted"] do
+        {doc_ids, Enum.sort([change["id"] | del_doc_ids])}
+      else
+        {Enum.sort([change["id"] | doc_ids]), del_doc_ids}
+      end
+    end)
+  end
+
+  def split_shard(db_name) do
+    resp = Couch.get("/#{db_name}/_shards")
+    assert resp.status_code == 200
+    range = Enum.random(Map.keys(resp.body["shards"]))
+
+    resp =
+      Couch.post("/_reshard/jobs",
+        body: %{
+          type: "split",
+          db: db_name,
+          range: range
+        }
+      )
+
+    assert resp.status_code == 201,
+           "split_shard failed #{resp.status_code} #{inspect(resp.body)}"
+
+    retry_until(
+      fn ->
+        resp = Couch.get("/_reshard/jobs")
+        assert resp.status_code == 200
+
+        Enum.all?(resp.body["jobs"], fn job ->
+          if job["job_state"] == "completed" do
+            resp = Couch.delete("/_reshard/jobs/#{job["id"]}")
+            assert resp.status_code == 200
+          end
+
+          job["job_state"] == "completed"
+        end)
+      end,
+      200,
+      10_000
+    )
+  end
+
+  def create_index(db_name, index_type) do
+    num = Enum.random(1..1_000_000)
+    ddoc_id = "_design/#{index_type}-#{num}"
+
+    case get_document(db_name, ddoc_id) do
+      {:ok, _doc} ->
+        create_index(db_name, index_type)
+
+      {:not_found, _} ->
+        :ok
+    end
+
+    doc =
+      case index_type do
+        :mrview ->
+          %{
+            views: %{
+              bar: %{
+                map: """
+                function(doc) {
+                  emit(#{num});
+                }
+                """
+              }
+            }
+          }
+
+        :nouveau ->
+          %{
+            nouveau: %{
+              bar: %{
+                index: """
+                function(doc) {
+                  index("double", "foo", #{num});
+                }
+                """
+              }
+            }
+          }
+      end
+
+    resp = Couch.put("/#{db_name}/#{ddoc_id}", body: doc)
+
+    assert resp.status_code == 201,
+           "create_index failed #{resp.status_code} #{inspect(resp.body)}"
+
+    ddoc_id
+  end
+
+  def update_indexes(db_name) do
+    resp = Couch.get("/#{db_name}/_design_docs")
+    assert resp.status_code == 200
+
+    Enum.each(resp.body["rows"], fn row ->
+      case row["id"] do
+        "_design/mrview-" <> _ ->
+          resp = Couch.get("/#{db_name}/#{row["id"]}/_view/bar")
+
+          assert resp.status_code == 200,
+                 "query mrview failed #{resp.status_code} 
#{inspect(resp.body)}"
+
+        "_design/nouveau-" <> _ ->
+          resp = Couch.get("/#{db_name}/#{row["id"]}/_nouveau/bar?q=*:*")
+
+          assert resp.status_code == 200,
+                 "query nouveau failed #{resp.status_code} 
#{inspect(resp.body)}"
+      end
+    end)
+  end
+
+  def sync_shards(db_name) do
+    resp = Couch.post("/#{db_name}/_sync_shards")
+
+    assert resp.status_code == 202,
+           "sync_shards failed #{resp.status_code} #{inspect(resp.body)}"
+
+    :timer.sleep(1000)
+  end
+
+  def precondition(s, {:call, _, :update_document, [_db_name, doc_id]}) do
+    not doc_exists(s, doc_id)
+  end
+
+  def precondition(s, {:call, _, :delete_document, [_db_name, doc_id]}) do
+    doc_exists(s, doc_id)
+  end
+
+  def precondition(_, _) do
+    true
+  end
+
+  def next_state(s, _v, {:call, _, :update_document, [_db_name, doc_id]}) do
+    state(s,
+      current_seq: state(s, :current_seq) + 1,
+      docs: Enum.sort([doc_id | state(s, :docs)]),
+      deleted_docs: List.keydelete(state(s, :deleted_docs), doc_id, 0),
+      changed: true,
+      stale: true
+    )
+  end
+
+  def next_state(s, _v, {:call, _, :delete_document, [_db_name, doc_id]}) do
+    state(s,
+      current_seq: state(s, :current_seq) + 1,
+      docs: List.delete(state(s, :docs), doc_id),
+      deleted_docs:
+        Enum.sort([{doc_id, state(s, :current_seq) + 1} | state(s, 
:deleted_docs)]),
+      changed: true,
+      stale: true
+    )
+  end
+
+  def next_state(s, _v, {:call, _, :update_peer_checkpoint, [_db_name]}) do
+    state(s, peer_checkpoint_seq: state(s, :current_seq), changed: true)
+  end
+
+  def next_state(s, _v, {:call, _, :update_drop_seq, [_db_name]}) do
+    # we'll drop all tombstones if _update_drop_seq is called when there
+    # are no peer checkpoint docs as the only peers are the shard syncs
+    # which update automatically
+    # n.b: indexes and their peer checkpoints will always be fresh as we
+    # force update_indexes after every doc update.
+    drop_seq =
+      if state(s, :peer_checkpoint_seq) == nil,
+        do: state(s, :current_seq),
+        else: state(s, :peer_checkpoint_seq)
+
+    state(s, drop_seq: drop_seq, changed: true)
+  end
+
+  def next_state(s, _v, {:call, _, :compact_db, [_db_name]}) do
+    {keep_docs, drop_docs} =
+      Enum.split_with(state(s, :deleted_docs), fn {_, seq} ->
+        state(s, :drop_seq) == nil or seq > state(s, :drop_seq)
+      end)
+
+    state(s,
+      deleted_docs: keep_docs,
+      drop_count: state(s, :drop_count) + length(drop_docs),
+      changed: true
+    )
+  end
+
+  def next_state(s, _v, {:call, _, :changes, [_db_name]}) do
+    state(s, changed: false)
+  end
+
+  def next_state(s, _v, {:call, _, :split_shard, [_db_name]}) do
+    state(s, changed: true)
+  end
+
+  def next_state(s, v, {:call, _, :create_index, [_db_name, _index_type]}) do
+    state(s,
+      current_seq: state(s, :current_seq) + 1,
+      docs: Enum.sort([v | state(s, :docs)]),
+      changed: true,
+      stale: true
+    )
+  end
+
+  def next_state(s, _v, {:call, _, :update_indexes, [_db_name]}) do
+    state(s, stale: false)
+  end
+
+  def postcondition(s, {:call, _, :changes, [_db_name]}, {doc_ids, 
del_doc_ids}) do
+    doc_ids == doc_ids(s) and del_doc_ids == deleted_doc_ids(s)
+  end
+
+  def postcondition(_, _, _), do: true
+
+  def doc_exists(s, doc_id), do: doc_id in state(s, :docs)
+
+  def deleted_doc_exists(s, doc_id) do
+    List.keymember?(state(s, :deleted_docs), doc_id, 0)
+  end
+
+  def doc_ids(s), do: state(s, :docs)
+
+  def deleted_doc_ids(s) do
+    Enum.map(state(s, :deleted_docs), fn {doc_id, _} -> doc_id end)
+  end
+
+  def seq_to_shards(seq) do
+    for {node, [b, e], {seq_num, _uuid, _epoch}} <- decode_seq(seq) do
+      b_hex = :couch_util.to_hex(<<b::32-integer>>)
+      e_hex = :couch_util.to_hex(<<e::32-integer>>)
+      range = "#{b_hex}-#{e_hex}"
+      {node, range, seq_num}
+    end
+  end
+
+  def decode_seq(seq) do
+    seq = String.replace(seq, ~r/\d+-/, "", global: false)
+    :erlang.binary_to_term(Base.url_decode64!(seq, padding: false))
+  end
+end
diff --git a/test/elixir/test/drop_seq_test.exs 
b/test/elixir/test/drop_seq_test.exs
new file mode 100644
index 000000000..28b29ef0c
--- /dev/null
+++ b/test/elixir/test/drop_seq_test.exs
@@ -0,0 +1,248 @@
+defmodule DropSeqTest do
+  use CouchTestCase
+
+  @moduletag :drop_seq
+
+  @tag :with_db
+  test "peer checkpoint - mrview", context do
+    db_name = context[:db_name]
+    ddoc_id = "_design/foo"
+
+    create_checkpoint_fn = fn ->
+      resp = Couch.put("/#{db_name}/#{ddoc_id}", body: %{
+        views: %{
+          bar: %{
+            map: "function(doc) {}"
+          }
+        }
+      })
+      assert resp.status_code == 201
+    end
+
+    update_checkpoint_fn = fn ->
+      assert Couch.get("/#{db_name}/#{ddoc_id}/_view/bar").status_code == 200
+    end
+
+    checkpoint_test_helper(context[:db_name], create_checkpoint_fn, 
update_checkpoint_fn)
+  end
+
+  @tag :with_db
+  test "peer checkpoint - search", context do
+    db_name = context[:db_name]
+    ddoc_id = "_design/foo"
+
+    create_checkpoint_fn = fn ->
+      resp = Couch.put("/#{db_name}/#{ddoc_id}", body: %{
+        indexes: %{
+          bar: %{
+            index: "function(doc) {}"
+          }
+        }
+      })
+      assert resp.status_code == 201
+    end
+
+    update_checkpoint_fn = fn ->
+      assert Couch.get("/#{db_name}/#{ddoc_id}/_search/bar?q=*:*").status_code 
== 200
+    end
+
+    checkpoint_test_helper(context[:db_name], create_checkpoint_fn, 
update_checkpoint_fn)
+  end
+
+  @tag :with_db
+  test "peer checkpoint - nouveau", context do
+    db_name = context[:db_name]
+    ddoc_id = "_design/foo"
+
+    create_checkpoint_fn = fn ->
+      resp = Couch.put("/#{db_name}/#{ddoc_id}", body: %{
+        nouveau: %{
+          bar: %{
+            index: "function(doc) {}"
+          }
+        }
+      })
+      assert resp.status_code == 201
+    end
+
+    update_checkpoint_fn = fn ->
+      # need to add a new document to force nouveau update to run each time
+      # as we only advance the peer checkpoint when JVM-side nouveau has 
committed
+      # the index
+      assert Couch.post("/#{db_name}", body: %{}).status_code == 201
+      assert 
Couch.get("/#{db_name}/#{ddoc_id}/_nouveau/bar?q=*:*").status_code == 200
+    end
+
+    checkpoint_test_helper(context[:db_name], create_checkpoint_fn, 
update_checkpoint_fn)
+  end
+
+  @tag :with_db
+  test "peer checkpoint - custom", context do
+    db_name = context[:db_name]
+    peer_checkpoint_id = "_local/peer-checkpoint-foo"
+
+    update_checkpoint_fn = fn ->
+      resp = Couch.get("/#{db_name}")
+      assert resp.status_code == 200
+      update_seq = resp.body["update_seq"]
+
+      resp = Couch.put("/#{db_name}/#{peer_checkpoint_id}", body: %{
+        update_seq: update_seq
+      })
+      assert resp.status_code == 201
+    end
+
+    checkpoint_test_helper(context[:db_name], update_checkpoint_fn, 
update_checkpoint_fn)
+  end
+
+  @tag :with_db
+  test "peer checkpoint - shard split", context do
+    db_name = context[:db_name]
+    peer_checkpoint_id = "_local/peer-checkpoint-foo"
+
+    create_checkpoint_fn = fn ->
+      resp = Couch.get("/#{db_name}")
+      assert resp.status_code == 200
+      update_seq = resp.body["update_seq"]
+
+      resp = Couch.put("/#{db_name}/#{peer_checkpoint_id}", body: %{
+        update_seq: update_seq
+      })
+      assert resp.status_code == 201
+
+      resp = Couch.get("/#{db_name}/_shards")
+      assert resp.status_code == 200
+      ranges = Map.keys(resp.body["shards"])
+      Enum.each(ranges, fn r ->
+        resp = Couch.post("/_reshard/jobs", body: %{
+          type: "split",
+          db: db_name,
+          range: r
+        })
+        assert resp.status_code == 201
+      end)
+      wait_for_reshard_jobs_to_complete()
+    end
+
+    after_doc_deletion_fn = fn ->
+      split_all_shard_ranges(db_name)
+      wait_for_reshard_jobs_to_complete()
+    end
+
+    update_checkpoint_fn = fn ->
+      resp = Couch.get("/#{db_name}")
+      assert resp.status_code == 200
+      update_seq = resp.body["update_seq"]
+
+      resp = Couch.put("/#{db_name}/#{peer_checkpoint_id}", body: %{
+        update_seq: update_seq
+      })
+      assert resp.status_code == 201
+    end
+
+    checkpoint_test_helper(context[:db_name],
+      create_checkpoint_fn, update_checkpoint_fn, after_doc_deletion_fn)
+  end
+
+  defp checkpoint_test_helper(db_name, create_checkpoint_fn, 
update_checkpoint_fn) do
+    checkpoint_test_helper(db_name, create_checkpoint_fn, 
update_checkpoint_fn, fn() -> true end)
+  end
+
+  defp checkpoint_test_helper(db_name, create_checkpoint_fn, 
update_checkpoint_fn, after_doc_deletion_fn) do
+    doc_id = "testdoc"
+
+    drop_count = get_drop_count(db_name)
+    drop_seq = update_drop_seq(db_name)
+
+    # create something that will create a peer checkpoint
+    create_checkpoint_fn.()
+    assert get_drop_count(db_name) == drop_count
+
+    # create a document
+    resp = Couch.put("/#{db_name}/#{doc_id}", body: %{})
+    assert resp.status_code == 201
+    rev = resp.body["rev"]
+
+    # delete it
+    resp = Couch.delete("/#{db_name}/#{doc_id}?rev=#{rev}")
+    assert resp.status_code == 200
+
+    after_doc_deletion_fn.()
+
+    # wait for drop seq to change
+    wait_for_drop_seq_change(db_name, drop_seq, update_checkpoint_fn)
+    assert get_drop_count(db_name) == drop_count
+
+    # confirm deleted doc is still in _changes response
+    resp = Couch.get("/#{db_name}/_changes")
+    assert resp.status_code == 200
+    assert Enum.member?(get_ids(resp), doc_id)
+
+    # compact
+    compact(db_name)
+
+    # confirm deleted doc is not in _changes response
+    resp = Couch.get("/#{db_name}/_changes")
+    assert resp.status_code == 200
+    assert !Enum.member?(get_ids(resp), doc_id)
+    assert get_drop_count(db_name) == drop_count + 1
+  end
+
+  defp wait_for_drop_seq_change(db_name, previous_drop_seq, 
update_checkpoint_fn) do
+    retry_until(
+      fn ->
+        update_checkpoint_fn.()
+        new_drop_seq = update_drop_seq(db_name)
+        new_drop_seq != previous_drop_seq
+      end,
+      200,
+      10_000
+    )
+  end
+
+  defp split_all_shard_ranges(db_name) do
+    resp = Couch.get("/#{db_name}/_shards")
+    assert resp.status_code == 200
+    ranges = Map.keys(resp.body["shards"])
+    Enum.each(ranges, fn r ->
+      resp = Couch.post("/_reshard/jobs", body: %{
+        type: "split",
+        db: db_name,
+        range: r
+      })
+      assert resp.status_code == 201
+    end)
+  end
+
+  defp wait_for_reshard_jobs_to_complete() do
+    retry_until(
+      fn ->
+        resp = Couch.get("/_reshard/jobs")
+        assert resp.status_code == 200
+        Enum.all?(resp.body["jobs"], fn job ->
+          job["job_state"] == "completed"
+        end)
+      end,
+      200,
+      10_000
+    )
+  end
+
+  defp update_drop_seq(db_name) do
+    resp = Couch.post("/#{db_name}/_update_drop_seq")
+    assert resp.status_code == 201
+    resp.body["results"]
+  end
+
+  defp get_drop_count(db_name) do
+    resp = Couch.get("/#{db_name}")
+    assert resp.status_code == 200
+    resp.body["drop_count"]
+  end
+
+  defp get_ids(resp) do
+    %{:body => %{"results" => results}} = resp
+    Enum.map(results, fn result -> result["id"] end)
+  end
+
+end


Reply via email to