This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 2fa2de73 feat(search): add mutex to HNSW index updating (#2649)
2fa2de73 is described below

commit 2fa2de7378a1a31740c64c36bb366dd320ff94d6
Author: Twice <[email protected]>
AuthorDate: Thu Nov 7 18:05:52 2024 +0800

    feat(search): add mutex to HNSW index updating (#2649)
---
 src/commands/cmd_search.cc           |  2 +-
 src/search/index_manager.h           | 12 ++++++------
 src/search/indexer.cc                | 35 ++++++++++++++++++++---------------
 src/search/indexer.h                 | 17 +++++++++--------
 src/search/redis_query_transformer.h |  2 +-
 tests/cppunit/indexer_test.cc        | 16 ++++++++--------
 tests/cppunit/plan_executor_test.cc  |  6 +++---
 7 files changed, 48 insertions(+), 42 deletions(-)

diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc
index bf4e527f..8d3309ab 100644
--- a/src/commands/cmd_search.cc
+++ b/src/commands/cmd_search.cc
@@ -333,7 +333,7 @@ static StatusOr<std::unique_ptr<kqir::Node>> 
ParseRediSearchQuery(const std::vec
 
   kqir::ParamMap param_map;
   while (parser.Good()) {
-    if (parser.EatEqICase("RETURNS")) {
+    if (parser.EatEqICase("RETURN")) {
       auto count = GET_OR_RET(parser.TakeInt<size_t>());
 
       for (size_t i = 0; i < count; ++i) {
diff --git a/src/search/index_manager.h b/src/search/index_manager.h
index eb5d5dcc..a89445ea 100644
--- a/src/search/index_manager.h
+++ b/src/search/index_manager.h
@@ -124,8 +124,8 @@ struct IndexManager {
         info->Add(kqir::FieldInfo(field_name.ToString(), 
std::move(field_meta)));
       }
 
-      IndexUpdater updater(info.get());
-      indexer->Add(updater);
+      auto updater = std::make_unique<IndexUpdater>(info.get());
+      indexer->Add(std::move(updater));
       index_map.Insert(std::move(info));
     }
 
@@ -180,12 +180,12 @@ struct IndexManager {
       return {Status::NotOK, fmt::format("failed to write index metadata: {}", 
s.ToString())};
     }
 
-    IndexUpdater updater(info.get());
-    indexer->Add(updater);
+    auto updater = std::make_unique<IndexUpdater>(info.get());
+    indexer->Add(std::move(updater));
     index_map.Insert(std::move(info));
 
-    for (auto updater : indexer->updater_list) {
-      GET_OR_RET(updater.Build(ctx));
+    for (const auto &updater : indexer->updater_list) {
+      GET_OR_RET(updater->Build(ctx));
     }
 
     return Status::OK();
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 0f771026..630e4c1e 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -280,10 +280,14 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context 
&ctx, std::string_view k
 
 Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx, 
std::string_view key, const kqir::Value &original,
                                            const kqir::Value &current, const 
SearchKey &search_key,
-                                           HnswVectorFieldMetadata *vector) 
const {
+                                           HnswVectorFieldMetadata *vector) {
   CHECK(original.IsNull() || original.Is<kqir::NumericArray>());
   CHECK(current.IsNull() || current.Is<kqir::NumericArray>());
 
+  // TODO: we can remove the lock if we solve the race problem
+  // inside the HNSW indexer, refer to #2481 and #2489
+  std::unique_lock lock(update_mutex);
+
   auto storage = indexer->storage;
   auto hnsw = HnswIndex(search_key, vector, storage);
 
@@ -305,7 +309,7 @@ Status IndexUpdater::UpdateHnswVectorIndex(engine::Context 
&ctx, std::string_vie
 }
 
 Status IndexUpdater::UpdateIndex(engine::Context &ctx, const std::string 
&field, std::string_view key,
-                                 const kqir::Value &original, const 
kqir::Value &current) const {
+                                 const kqir::Value &original, const 
kqir::Value &current) {
   if (original == current) {
     // the value of this field is unchanged, no need to update
     return Status::OK();
@@ -331,7 +335,7 @@ Status IndexUpdater::UpdateIndex(engine::Context &ctx, 
const std::string &field,
   return Status::OK();
 }
 
-Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, 
std::string_view key) const {
+Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, 
std::string_view key) {
   auto current = GET_OR_RET(Record(ctx, key));
 
   for (const auto &[field, i] : info->fields) {
@@ -354,7 +358,7 @@ Status IndexUpdater::Update(engine::Context &ctx, const 
FieldValues &original, s
   return Status::OK();
 }
 
-Status IndexUpdater::Build(engine::Context &ctx) const {
+Status IndexUpdater::Build(engine::Context &ctx) {
   auto storage = indexer->storage;
   util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), 
ColumnFamilyID::Metadata);
 
@@ -380,26 +384,27 @@ Status IndexUpdater::Build(engine::Context &ctx) const {
   return Status::OK();
 }
 
-void GlobalIndexer::Add(IndexUpdater updater) {
-  updater.indexer = this;
-  for (const auto &prefix : updater.info->prefixes) {
-    prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false), 
updater);
+void GlobalIndexer::Add(std::unique_ptr<IndexUpdater> updater) {
+  updater->indexer = this;
+  for (const auto &prefix : updater->info->prefixes) {
+    prefix_map.insert(ComposeNamespaceKey(updater->info->ns, prefix, false), 
updater.get());
   }
-  updater_list.push_back(updater);
+  updater_list.push_back(std::move(updater));
 }
 
 void GlobalIndexer::Remove(const kqir::IndexInfo *index) {
   for (auto iter = prefix_map.begin(); iter != prefix_map.end();) {
-    if (iter->info == index) {
+    if ((*iter)->info == index) {
       iter = prefix_map.erase(iter);
     } else {
       ++iter;
     }
   }
 
-  updater_list.erase(std::remove_if(updater_list.begin(), updater_list.end(),
-                                    [index](IndexUpdater updater) { return 
updater.info == index; }),
-                     updater_list.end());
+  updater_list.erase(
+      std::remove_if(updater_list.begin(), updater_list.end(),
+                     [index](const std::unique_ptr<IndexUpdater> &updater) { 
return updater->info == index; }),
+      updater_list.end());
 }
 
 StatusOr<GlobalIndexer::RecordResult> GlobalIndexer::Record(engine::Context 
&ctx, std::string_view key,
@@ -411,14 +416,14 @@ StatusOr<GlobalIndexer::RecordResult> 
GlobalIndexer::Record(engine::Context &ctx
   auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key, false));
   if (iter != prefix_map.end()) {
     auto updater = iter.value();
-    return RecordResult{updater, std::string(key.begin(), key.end()), 
GET_OR_RET(updater.Record(ctx, key))};
+    return RecordResult{updater, std::string(key.begin(), key.end()), 
GET_OR_RET(updater->Record(ctx, key))};
   }
 
   return {Status::NoPrefixMatched};
 }
 
 Status GlobalIndexer::Update(engine::Context &ctx, const RecordResult 
&original) {
-  return original.updater.Update(ctx, original.fields, original.key);
+  return original.updater->Update(ctx, original.fields, original.key);
 }
 
 }  // namespace redis
diff --git a/src/search/indexer.h b/src/search/indexer.h
index 20819a2f..b6d00431 100644
--- a/src/search/indexer.h
+++ b/src/search/indexer.h
@@ -75,15 +75,16 @@ struct IndexUpdater {
 
   const kqir::IndexInfo *info = nullptr;
   GlobalIndexer *indexer = nullptr;
+  std::mutex update_mutex;
 
   explicit IndexUpdater(const kqir::IndexInfo *info) : info(info) {}
 
   StatusOr<FieldValues> Record(engine::Context &ctx, std::string_view key) 
const;
   Status UpdateIndex(engine::Context &ctx, const std::string &field, 
std::string_view key, const kqir::Value &original,
-                     const kqir::Value &current) const;
-  Status Update(engine::Context &ctx, const FieldValues &original, 
std::string_view key) const;
+                     const kqir::Value &current);
+  Status Update(engine::Context &ctx, const FieldValues &original, 
std::string_view key);
 
-  Status Build(engine::Context &ctx) const;
+  Status Build(engine::Context &ctx);
 
   Status UpdateTagIndex(engine::Context &ctx, std::string_view key, const 
kqir::Value &original,
                         const kqir::Value &current, const SearchKey 
&search_key, const TagFieldMetadata *tag) const;
@@ -92,25 +93,25 @@ struct IndexUpdater {
                             const NumericFieldMetadata *num) const;
   Status UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, 
const kqir::Value &original,
                                const kqir::Value &current, const SearchKey 
&search_key,
-                               HnswVectorFieldMetadata *vector) const;
+                               HnswVectorFieldMetadata *vector);
 };
 
 struct GlobalIndexer {
   using FieldValues = IndexUpdater::FieldValues;
   struct RecordResult {
-    IndexUpdater updater;
+    IndexUpdater *updater;
     std::string key;
     FieldValues fields;
   };
 
-  tsl::htrie_map<char, IndexUpdater> prefix_map;
-  std::vector<IndexUpdater> updater_list;
+  tsl::htrie_map<char, IndexUpdater *> prefix_map;
+  std::vector<std::unique_ptr<IndexUpdater>> updater_list;
 
   engine::Storage *storage = nullptr;
 
   explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {}
 
-  void Add(IndexUpdater updater);
+  void Add(std::unique_ptr<IndexUpdater> updater);
   void Remove(const kqir::IndexInfo *index);
 
   StatusOr<RecordResult> Record(engine::Context &ctx, std::string_view key, 
const std::string &ns);
diff --git a/src/search/redis_query_transformer.h 
b/src/search/redis_query_transformer.h
index ed7c8fc6..7cc3a903 100644
--- a/src/search/redis_query_transformer.h
+++ b/src/search/redis_query_transformer.h
@@ -171,7 +171,7 @@ struct Transformer : ir::TreeTransformer {
       if (Is<UnsignedInteger>(knn_search->children[1])) {
         k = *ParseInt(knn_search->children[1]->string());
       } else {
-        k = *ParseInt(GET_OR_RET(GetParam(node)));
+        k = *ParseInt(GET_OR_RET(GetParam(knn_search->children[1])));
       }
 
       return 
std::make_unique<VectorKnnExpr>(std::make_unique<FieldRef>(knn_search->children[2]->string()),
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index 4e5ea3fc..5e7ecf1a 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -48,7 +48,7 @@ struct IndexerTest : TestBase {
 
     map.emplace("hashtest", std::move(hash_info));
 
-    redis::IndexUpdater hash_updater{map.at("hashtest").get()};
+    auto hash_updater = 
std::make_unique<redis::IndexUpdater>(map.at("hashtest").get());
 
     redis::IndexMetadata json_field_meta;
     json_field_meta.on_data_type = redis::IndexOnDataType::JSON;
@@ -65,7 +65,7 @@ struct IndexerTest : TestBase {
 
     map.emplace("jsontest", std::move(json_info));
 
-    redis::IndexUpdater json_updater{map.at("jsontest").get()};
+    auto json_updater = 
std::make_unique<redis::IndexUpdater>(map.at("jsontest").get());
 
     indexer.Add(std::move(hash_updater));
     indexer.Add(std::move(json_updater));
@@ -87,7 +87,7 @@ TEST_F(IndexerTest, HashTag) {
   {
     auto s = indexer.Record(*ctx_, key1, ns);
     ASSERT_EQ(s.Msg(), Status::ok_msg);
-    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_EQ(s->updater->info->name, idxname);
     ASSERT_TRUE(s->fields.empty());
 
     uint64_t cnt = 0;
@@ -120,7 +120,7 @@ TEST_F(IndexerTest, HashTag) {
   {
     auto s = indexer.Record(*ctx_, key1, ns);
     ASSERT_TRUE(s);
-    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_EQ(s->updater->info->name, idxname);
     ASSERT_EQ(s->fields.size(), 1);
     ASSERT_EQ(s->fields["x"], T("food,kitChen,Beauty"));
 
@@ -178,7 +178,7 @@ TEST_F(IndexerTest, JsonTag) {
   {
     auto s = indexer.Record(*ctx_, key1, ns);
     ASSERT_TRUE(s);
-    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_EQ(s->updater->info->name, idxname);
     ASSERT_TRUE(s->fields.empty());
 
     auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})");
@@ -210,7 +210,7 @@ TEST_F(IndexerTest, JsonTag) {
   {
     auto s = indexer.Record(*ctx_, key1, ns);
     ASSERT_TRUE(s);
-    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_EQ(s->updater->info->name, idxname);
     ASSERT_EQ(s->fields.size(), 1);
     ASSERT_EQ(s->fields["$.x"], T("food,kitChen,Beauty"));
 
@@ -262,7 +262,7 @@ TEST_F(IndexerTest, JsonTagBuildIndex) {
     auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})");
     ASSERT_TRUE(s_set.ok());
 
-    auto s2 = indexer.updater_list[1].Build(*ctx_);
+    auto s2 = indexer.updater_list[1]->Build(*ctx_);
     ASSERT_EQ(s2.Msg(), Status::ok_msg);
 
     auto key = redis::SearchKey(ns, idxname, 
"$.x").ConstructTagFieldData("food", key1);
@@ -301,7 +301,7 @@ TEST_F(IndexerTest, JsonHnswVector) {
   {
     auto s = indexer.Record(*ctx_, key3, ns);
     ASSERT_TRUE(s);
-    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_EQ(s->updater->info->name, idxname);
     ASSERT_TRUE(s->fields.empty());
 
     auto s_set = db.Set(*ctx_, key3, "$", R"({"z": [1,2,3]})");
diff --git a/tests/cppunit/plan_executor_test.cc 
b/tests/cppunit/plan_executor_test.cc
index 3524c143..ecff4bb1 100644
--- a/tests/cppunit/plan_executor_test.cc
+++ b/tests/cppunit/plan_executor_test.cc
@@ -386,7 +386,7 @@ std::vector<std::unique_ptr<ScopedUpdate>> 
ScopedUpdates(engine::Context& ctx, r
 
 TEST_F(PlanExecutorTestC, NumericFieldScan) {
   redis::GlobalIndexer indexer(storage_.get());
-  indexer.Add(redis::IndexUpdater(IndexI()));
+  indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));
 
   {
     engine::Context ctx(storage_.get());
@@ -428,7 +428,7 @@ TEST_F(PlanExecutorTestC, NumericFieldScan) {
 
 TEST_F(PlanExecutorTestC, TagFieldScan) {
   redis::GlobalIndexer indexer(storage_.get());
-  indexer.Add(redis::IndexUpdater(IndexI()));
+  indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));
 
   {
     engine::Context ctx(storage_.get());
@@ -467,7 +467,7 @@ TEST_F(PlanExecutorTestC, TagFieldScan) {
 
 TEST_F(PlanExecutorTestC, HnswVectorFieldScans) {
   redis::GlobalIndexer indexer(storage_.get());
-  indexer.Add(redis::IndexUpdater(IndexI()));
+  indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));
 
   {
     auto updates = ScopedUpdates(*ctx_, indexer,

Reply via email to