This is an automated email from the ASF dual-hosted git repository.

pgj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d6d4ad97921a50e0e8ef8a4dc872aefa93993469
Author: Gabor Pali <[email protected]>
AuthorDate: Wed Oct 4 13:00:41 2023 +0200

    mango: prevent occasional duplication of paginated `text` results
    
    When an interleaved update happens to a `text` index while it is
    being queried, the affected documents might appear duplicated in
    the collected results.  That is because `search` results are
    paginated where the boundaries might move due to the updates and
    the document might show up once more on subsequent queries.
    
    Mitigate the situation by tracking and losing duplicated documents
    while the results are being streamed to the user.
---
 src/mango/src/mango_cursor_text.erl      | 138 ++++++++++++++++++++++++-------
 src/mango/test/24-text-paginated-test.py | 108 ++++++++++++++++++++++++
 src/mango/test/mango.py                  |  46 +++++++++--
 3 files changed, 256 insertions(+), 36 deletions(-)

diff --git a/src/mango/src/mango_cursor_text.erl 
b/src/mango/src/mango_cursor_text.erl
index f5899914a..09d878afd 100644
--- a/src/mango/src/mango_cursor_text.erl
+++ b/src/mango/src/mango_cursor_text.erl
@@ -37,7 +37,8 @@
     user_fun,
     user_acc,
     fields,
-    execution_stats
+    execution_stats,
+    documents_seen
 }).
 
 create(Db, {Indexes, Trace}, Selector, Opts0) ->
@@ -108,7 +109,8 @@ execute(Cursor, UserFun, UserAcc) ->
         user_fun = UserFun,
         user_acc = UserAcc,
         fields = Cursor#cursor.fields,
-        execution_stats = mango_execution_stats:log_start(Stats)
+        execution_stats = mango_execution_stats:log_start(Stats),
+        documents_seen = sets:new([{version, 2}])
     },
     try
         case Query of
@@ -179,28 +181,42 @@ handle_hit(CAcc0, Sort, Doc) ->
     #cacc{
         limit = Limit,
         skip = Skip,
-        execution_stats = Stats
+        execution_stats = Stats,
+        documents_seen = Seen
     } = CAcc0,
-    CAcc1 = update_bookmark(CAcc0, Sort),
     Stats1 = mango_execution_stats:incr_docs_examined(Stats),
     couch_stats:increment_counter([mango, docs_examined]),
-    CAcc2 = CAcc1#cacc{execution_stats = Stats1},
-    case mango_selector:match(CAcc2#cacc.selector, Doc) of
-        true when Skip > 0 ->
-            CAcc2#cacc{skip = Skip - 1};
-        true when Limit == 0 ->
-            % We hit this case if the user spcified with a
-            % zero limit. Notice that in this case we need
-            % to return the bookmark from before this match
-            throw({stop, CAcc0});
-        true when Limit == 1 ->
-            NewCAcc = apply_user_fun(CAcc2, Doc),
-            throw({stop, NewCAcc});
-        true when Limit > 1 ->
-            NewCAcc = apply_user_fun(CAcc2, Doc),
-            NewCAcc#cacc{limit = Limit - 1};
+    CAcc1 = CAcc0#cacc{execution_stats = Stats1},
+    case mango_selector:match(CAcc1#cacc.selector, Doc) of
+        true ->
+            DocId = mango_doc:get_field(Doc, <<"_id">>),
+            case sets:is_element(DocId, Seen) of
+                true ->
+                    CAcc1;
+                false ->
+                    CAcc2 = update_bookmark(CAcc1, Sort),
+                    CAcc3 = CAcc2#cacc{
+                        documents_seen = sets:add_element(DocId, Seen)
+                    },
+                    if
+                        Skip > 0 ->
+                            CAcc3#cacc{skip = Skip - 1};
+                        Limit == 0 ->
+                            % We hit this case if the user specified
+                            % with a zero limit. Notice that in this
+                            % case we need to return the bookmark from
+                            % before this match.
+                            throw({stop, CAcc0});
+                        Limit == 1 ->
+                            CAcc4 = apply_user_fun(CAcc3, Doc),
+                            throw({stop, CAcc4});
+                        Limit > 1 ->
+                            CAcc4 = apply_user_fun(CAcc3, Doc),
+                            CAcc4#cacc{limit = Limit - 1}
+                    end
+            end;
         false ->
-            CAcc2
+            CAcc1
     end.
 
 apply_user_fun(CAcc, Doc) ->
@@ -477,6 +493,7 @@ execute_test_() ->
                 append_sort_type,
                 fun(RawField, selector) -> <<RawField/binary, "<type>">> end
             ),
+            meck:expect(mango_doc, get_field, fun({doc, N}, <<"_id">>) -> N 
end),
             meck:expect(mango_fields, extract, fun({doc, N}, fields) -> 
{final_doc, N} end),
             meck:expect(
                 foo,
@@ -502,6 +519,7 @@ execute_test_() ->
             ?TDEF_FE(t_execute_more_results, 10),
             ?TDEF_FE(t_execute_unique_results, 10),
             ?TDEF_FE(t_execute_limit_cutoff, 10),
+            ?TDEF_FE(t_execute_limit_cutoff_unique, 10),
             ?TDEF_FE(t_execute_limit_zero, 10),
             ?TDEF_FE(t_execute_limit_unique, 10),
             ?TDEF_FE(t_execute_skip, 10),
@@ -619,6 +637,7 @@ t_execute_more_results(_) ->
     ?assertEqual(3, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
 
 t_execute_unique_results(_) ->
+    UniqueHits = 3,
     Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}],
     Cursor = #cursor{
         db = db,
@@ -661,10 +680,10 @@ t_execute_unique_results(_) ->
     ),
     meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
     meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end),
-    ?assertEqual({ok, {acc, 9}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
+    ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
     ?assertEqual(6, meck:num_calls(couch_stats, increment_counter, 1)),
     ?assertEqual(6, meck:num_calls(mango_execution_stats, incr_docs_examined, 
1)),
-    ?assertEqual(6, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
+    ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
 
 t_execute_limit_cutoff(_) ->
     Limit = 2,
@@ -708,6 +727,57 @@ t_execute_limit_cutoff(_) ->
     ?assertEqual(Limit, meck:num_calls(mango_execution_stats, 
incr_docs_examined, 1)),
     ?assertEqual(Limit, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
 
+t_execute_limit_cutoff_unique(_) ->
+    Limit = 4,
+    ActualHits = 3,
+    AllHits = 6,
+    Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}],
+    Cursor = #cursor{
+        db = db,
+        index = #idx{ddoc = <<"ddoc">>, name = <<"index">>},
+        limit = Limit,
+        skip = 0,
+        fields = fields,
+        selector = selector,
+        opts = Options,
+        execution_stats = stats
+    },
+    meck:expect(
+        dreyfus_fabric_search,
+        go,
+        fun(db_name, <<"ddoc">>, <<"index">>, QueryArgs) ->
+            #index_query_args{
+                q = query,
+                partition = partition,
+                bookmark = B,
+                sort = relevance,
+                raw_bookmark = true
+            } = QueryArgs,
+            {Bookmark, Hits} =
+                case B of
+                    nil ->
+                        Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 1}}]}},
+                        Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 2}}]}},
+                        Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 3}}]}},
+                        {[bookmark, 0], [Hit1, Hit2, Hit3]};
+                    [bookmark, 3] ->
+                        Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 1}}]}},
+                        Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 2}}]}},
+                        Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 3}}]}},
+                        {[bookmark, 4], [Hit3, Hit2, Hit1]};
+                    [bookmark, 4] ->
+                        {[bookmark, 5], []}
+                end,
+            {ok, Bookmark, undefined, Hits, undefined, undefined}
+        end
+    ),
+    meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
+    meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end),
+    ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
+    ?assertEqual(AllHits, meck:num_calls(couch_stats, increment_counter, 1)),
+    ?assertEqual(AllHits, meck:num_calls(mango_execution_stats, 
incr_docs_examined, 1)),
+    ?assertEqual(ActualHits, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
+
 t_execute_limit_zero(_) ->
     Limit = 0,
     Skip = 0,
@@ -751,6 +821,8 @@ t_execute_limit_zero(_) ->
 
 t_execute_limit_unique(_) ->
     Limit = 5,
+    AllHits = 6,
+    UniqueHits = 3,
     Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}],
     Cursor = #cursor{
         db = db,
@@ -784,17 +856,19 @@ t_execute_limit_unique(_) ->
                         Hit1 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 1}}]}},
                         Hit2 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 2}}]}},
                         Hit3 = #sortable{item = #hit{fields = [{<<"_id">>, 
{id, 3}}]}},
-                        {[bookmark, 4], [Hit3, Hit2, Hit1]}
+                        {[bookmark, 4], [Hit3, Hit2, Hit1]};
+                    [bookmark, 4] ->
+                        {[bookmark, 5], []}
                 end,
             {ok, Bookmark, undefined, Hits, undefined, undefined}
         end
     ),
     meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
     meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end),
-    ?assertEqual({ok, {acc, 8}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
-    ?assertEqual(Limit, meck:num_calls(couch_stats, increment_counter, 1)),
-    ?assertEqual(Limit, meck:num_calls(mango_execution_stats, 
incr_docs_examined, 1)),
-    ?assertEqual(Limit, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
+    ?assertEqual({ok, {acc, 6}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
+    ?assertEqual(AllHits, meck:num_calls(couch_stats, increment_counter, 1)),
+    ?assertEqual(AllHits, meck:num_calls(mango_execution_stats, 
incr_docs_examined, 1)),
+    ?assertEqual(UniqueHits, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
 
 t_execute_skip(_) ->
     UniqueHits = 3,
@@ -845,12 +919,14 @@ t_execute_skip(_) ->
 
 t_execute_skip_unique(_) ->
     AllHits = 6,
+    UniqueHits = 3,
+    Skip = 2,
     Options = [{partition, partition}, {sort, {[]}}, {bookmark, []}],
     Cursor = #cursor{
         db = db,
         index = #idx{ddoc = <<"ddoc">>, name = <<"index">>},
         limit = 10,
-        skip = 2,
+        skip = Skip,
         fields = fields,
         selector = selector,
         opts = Options,
@@ -887,10 +963,12 @@ t_execute_skip_unique(_) ->
     ),
     meck:expect(mango_selector_text, convert, [selector], meck:val(query)),
     meck:expect(mango_selector, match, fun(selector, {doc, _}) -> true end),
-    ?assertEqual({ok, {acc, 7}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
+    ?assertEqual({ok, {acc, 4}}, execute(Cursor, fun foo:normal/2, {acc, 0})),
     ?assertEqual(AllHits, meck:num_calls(couch_stats, increment_counter, 1)),
     ?assertEqual(AllHits, meck:num_calls(mango_execution_stats, 
incr_docs_examined, 1)),
-    ?assertEqual(4, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)).
+    ?assertEqual(
+        UniqueHits - Skip, meck:num_calls(mango_execution_stats, 
incr_results_returned, 1)
+    ).
 
 t_execute_no_matches(_) ->
     UniqueHits = 3,
diff --git a/src/mango/test/24-text-paginated-test.py 
b/src/mango/test/24-text-paginated-test.py
new file mode 100644
index 000000000..84b3a2f4b
--- /dev/null
+++ b/src/mango/test/24-text-paginated-test.py
@@ -0,0 +1,108 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+
+import mango
+import unittest
+import time
+
+
[email protected](mango.has_text_service(), "requires text service")
+class PaginatedResultsTest(mango.DbPerClass):
+    # Great enough to make faster systems busy while running the
+    # query.
+    NUM_DOCS = 10000
+    UPDATES = 25
+
+    def setUp(self):
+        self.db.recreate()
+        self.db.create_text_index(
+            analyzer="keyword",
+            default_field={},
+            selector={},
+            fields=[
+                {"name": "_id", "type": "string"},
+                {"name": "name", "type": "string"},
+            ],
+            index_array_lengths=True,
+        )
+        docs = [
+            {"_id": f"{doc_id:08X}", "name": mango.random_string(32)}
+            for doc_id in range(self.NUM_DOCS)
+        ]
+        self.db.save_docs(docs)
+
+    def test_query_with_lot_of_results(self):
+        # 200 is the maximum for `text` searches.
+        docs = self.db.find(selector={"_id": {"$lte": f"{1000:08X}"}}, 
limit=200)
+        assert len(docs) == 200
+
+    def do_query(self, delay, find_args):
+        time.sleep(delay)
+        return self.db.find(*find_args)
+
+    def do_updates(self, pause, doc_id):
+        for i in range(self.UPDATES):
+            doc = self.db.open_doc(doc_id)
+            updated_doc = {
+                "_id": doc_id,
+                "_rev": doc["_rev"],
+                "update": i,
+                "name": "foobar",
+            }
+            self.db.save_doc(updated_doc)
+            time.sleep(pause)
+
+    def test_no_duplicates_on_interleaved_updates(self):
+        # Give ~500 ms head start for the updates before running the
+        # query.
+        query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"},)))
+        # Keep updating the target document in every 200 ms.
+        updates = mango.Concurrently(self.do_updates, (0.2, f"{2:08X}"))
+        docs = query.get_result()
+        updates.join()
+        assert len(docs) == 1
+
+    def test_no_duplicates_on_interleaved_updates_heavy(self):
+        query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"},)))
+        updates = [
+            mango.Concurrently(self.do_updates, (0.05, f"{2:08X}")),
+            mango.Concurrently(self.do_updates, (0.2, f"{3:08X}")),
+            mango.Concurrently(self.do_updates, (0.3, f"{4:08X}")),
+            mango.Concurrently(self.do_updates, (0.15, f"{5:08X}")),
+            mango.Concurrently(self.do_updates, (0.1, f"{6:08X}")),
+        ]
+        docs = query.get_result()
+        for ref in updates:
+            ref.join()
+        ids = list(map(lambda d: d["_id"], docs))
+        assert sorted(ids) == [
+            f"{2:08X}",
+            f"{3:08X}",
+            f"{4:08X}",
+            f"{5:08X}",
+            f"{6:08X}",
+        ]
+
+    def test_no_duplicates_on_interleaved_updates_with_limit_skip(self):
+        query = mango.Concurrently(self.do_query, (0.5, ({"name": "foobar"}, 
1, 3)))
+        updates = [
+            mango.Concurrently(self.do_updates, (0.05, f"{2:08X}")),
+            mango.Concurrently(self.do_updates, (0.2, f"{3:08X}")),
+            mango.Concurrently(self.do_updates, (0.3, f"{4:08X}")),
+            mango.Concurrently(self.do_updates, (0.15, f"{5:08X}")),
+            mango.Concurrently(self.do_updates, (0.1, f"{6:08X}")),
+        ]
+        docs = query.get_result()
+        for ref in updates:
+            ref.join()
+        assert len(docs) == 1
diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py
index 2dff18f40..b8a6cd9a5 100644
--- a/src/mango/test/mango.py
+++ b/src/mango/test/mango.py
@@ -11,10 +11,13 @@
 # the License.
 
 import json
+import random
+import string
 import time
 import unittest
 import uuid
 import os
+import threading
 
 import requests
 
@@ -28,10 +31,18 @@ COUCH_USER = os.environ.get("COUCH_USER")
 COUCH_PASS = os.environ.get("COUCH_PASS")
 
 
+BULK_BATCH_SIZE = 500
+
+
 def random_db_name():
     return "mango_test_" + uuid.uuid4().hex
 
 
+def random_string(n_max):
+    n = random.choice(range(n_max))
+    return "".join(random.choice(string.ascii_letters) for _ in range(n))
+
+
 def has_text_service():
     features = requests.get(COUCH_HOST).json()["features"]
     return "search" in features
@@ -47,6 +58,27 @@ def delay(n=5, t=0.5):
         time.sleep(t)
 
 
+class Concurrently(object):
+    def __init__(self, thread, thread_args, start=True):
+        self.thread = threading.Thread(target=self.wrapper, args=(thread, 
thread_args))
+        self.return_value = None
+        if start:
+            self.start()
+
+    def wrapper(self, body, args):
+        self.return_value = body(*args)
+
+    def start(self):
+        self.thread.start()
+
+    def get_result(self):
+        self.thread.join()
+        return self.return_value
+
+    def join(self):
+        self.thread.join()
+
+
 class Database(object):
     def __init__(
         self,
@@ -103,12 +135,14 @@ class Database(object):
         r.raise_for_status()
 
     def save_docs(self, docs, **kwargs):
-        body = json.dumps({"docs": docs})
-        r = self.sess.post(self.path("_bulk_docs"), data=body, params=kwargs)
-        r.raise_for_status()
-        for doc, result in zip(docs, r.json()):
-            doc["_id"] = result["id"]
-            doc["_rev"] = result["rev"]
+        for offset in range(0, len(docs), BULK_BATCH_SIZE):
+            chunk = docs[offset : (offset + BULK_BATCH_SIZE)]
+            body = {"docs": chunk}
+            r = self.sess.post(self.path("_bulk_docs"), json=body, 
params=kwargs)
+            r.raise_for_status()
+            for doc, result in zip(chunk, r.json()):
+                doc["_id"] = result["id"]
+                doc["_rev"] = result["rev"]
 
     def open_doc(self, docid):
         r = self.sess.get(self.path(docid))

Reply via email to