This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 2fa2de73 feat(search): add mutex to HNSW index updating (#2649)
2fa2de73 is described below
commit 2fa2de7378a1a31740c64c36bb366dd320ff94d6
Author: Twice <[email protected]>
AuthorDate: Thu Nov 7 18:05:52 2024 +0800
feat(search): add mutex to HNSW index updating (#2649)
---
src/commands/cmd_search.cc | 2 +-
src/search/index_manager.h | 12 ++++++------
src/search/indexer.cc | 35 ++++++++++++++++++++---------------
src/search/indexer.h | 17 +++++++++--------
src/search/redis_query_transformer.h | 2 +-
tests/cppunit/indexer_test.cc | 16 ++++++++--------
tests/cppunit/plan_executor_test.cc | 6 +++---
7 files changed, 48 insertions(+), 42 deletions(-)
diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc
index bf4e527f..8d3309ab 100644
--- a/src/commands/cmd_search.cc
+++ b/src/commands/cmd_search.cc
@@ -333,7 +333,7 @@ static StatusOr<std::unique_ptr<kqir::Node>>
ParseRediSearchQuery(const std::vec
kqir::ParamMap param_map;
while (parser.Good()) {
- if (parser.EatEqICase("RETURNS")) {
+ if (parser.EatEqICase("RETURN")) {
auto count = GET_OR_RET(parser.TakeInt<size_t>());
for (size_t i = 0; i < count; ++i) {
diff --git a/src/search/index_manager.h b/src/search/index_manager.h
index eb5d5dcc..a89445ea 100644
--- a/src/search/index_manager.h
+++ b/src/search/index_manager.h
@@ -124,8 +124,8 @@ struct IndexManager {
info->Add(kqir::FieldInfo(field_name.ToString(),
std::move(field_meta)));
}
- IndexUpdater updater(info.get());
- indexer->Add(updater);
+ auto updater = std::make_unique<IndexUpdater>(info.get());
+ indexer->Add(std::move(updater));
index_map.Insert(std::move(info));
}
@@ -180,12 +180,12 @@ struct IndexManager {
return {Status::NotOK, fmt::format("failed to write index metadata: {}",
s.ToString())};
}
- IndexUpdater updater(info.get());
- indexer->Add(updater);
+ auto updater = std::make_unique<IndexUpdater>(info.get());
+ indexer->Add(std::move(updater));
index_map.Insert(std::move(info));
- for (auto updater : indexer->updater_list) {
- GET_OR_RET(updater.Build(ctx));
+ for (const auto &updater : indexer->updater_list) {
+ GET_OR_RET(updater->Build(ctx));
}
return Status::OK();
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 0f771026..630e4c1e 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -280,10 +280,14 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context
&ctx, std::string_view k
Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx,
std::string_view key, const kqir::Value &original,
const kqir::Value ¤t, const
SearchKey &search_key,
- HnswVectorFieldMetadata *vector)
const {
+ HnswVectorFieldMetadata *vector) {
CHECK(original.IsNull() || original.Is<kqir::NumericArray>());
CHECK(current.IsNull() || current.Is<kqir::NumericArray>());
+ // TODO: we can remove the lock if we solve the race problem
+ // inside the HNSW indexer, refer to #2481 and #2489
+ std::unique_lock lock(update_mutex);
+
auto storage = indexer->storage;
auto hnsw = HnswIndex(search_key, vector, storage);
@@ -305,7 +309,7 @@ Status IndexUpdater::UpdateHnswVectorIndex(engine::Context
&ctx, std::string_vie
}
Status IndexUpdater::UpdateIndex(engine::Context &ctx, const std::string
&field, std::string_view key,
- const kqir::Value &original, const
kqir::Value ¤t) const {
+ const kqir::Value &original, const
kqir::Value ¤t) {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
@@ -331,7 +335,7 @@ Status IndexUpdater::UpdateIndex(engine::Context &ctx,
const std::string &field,
return Status::OK();
}
-Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original,
std::string_view key) const {
+Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original,
std::string_view key) {
auto current = GET_OR_RET(Record(ctx, key));
for (const auto &[field, i] : info->fields) {
@@ -354,7 +358,7 @@ Status IndexUpdater::Update(engine::Context &ctx, const
FieldValues &original, s
return Status::OK();
}
-Status IndexUpdater::Build(engine::Context &ctx) const {
+Status IndexUpdater::Build(engine::Context &ctx) {
auto storage = indexer->storage;
util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(),
ColumnFamilyID::Metadata);
@@ -380,26 +384,27 @@ Status IndexUpdater::Build(engine::Context &ctx) const {
return Status::OK();
}
-void GlobalIndexer::Add(IndexUpdater updater) {
- updater.indexer = this;
- for (const auto &prefix : updater.info->prefixes) {
- prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false),
updater);
+void GlobalIndexer::Add(std::unique_ptr<IndexUpdater> updater) {
+ updater->indexer = this;
+ for (const auto &prefix : updater->info->prefixes) {
+ prefix_map.insert(ComposeNamespaceKey(updater->info->ns, prefix, false),
updater.get());
}
- updater_list.push_back(updater);
+ updater_list.push_back(std::move(updater));
}
void GlobalIndexer::Remove(const kqir::IndexInfo *index) {
for (auto iter = prefix_map.begin(); iter != prefix_map.end();) {
- if (iter->info == index) {
+ if ((*iter)->info == index) {
iter = prefix_map.erase(iter);
} else {
++iter;
}
}
- updater_list.erase(std::remove_if(updater_list.begin(), updater_list.end(),
- [index](IndexUpdater updater) { return
updater.info == index; }),
- updater_list.end());
+ updater_list.erase(
+ std::remove_if(updater_list.begin(), updater_list.end(),
+ [index](const std::unique_ptr<IndexUpdater> &updater) {
return updater->info == index; }),
+ updater_list.end());
}
StatusOr<GlobalIndexer::RecordResult> GlobalIndexer::Record(engine::Context
&ctx, std::string_view key,
@@ -411,14 +416,14 @@ StatusOr<GlobalIndexer::RecordResult>
GlobalIndexer::Record(engine::Context &ctx
auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key, false));
if (iter != prefix_map.end()) {
auto updater = iter.value();
- return RecordResult{updater, std::string(key.begin(), key.end()),
GET_OR_RET(updater.Record(ctx, key))};
+ return RecordResult{updater, std::string(key.begin(), key.end()),
GET_OR_RET(updater->Record(ctx, key))};
}
return {Status::NoPrefixMatched};
}
Status GlobalIndexer::Update(engine::Context &ctx, const RecordResult
&original) {
- return original.updater.Update(ctx, original.fields, original.key);
+ return original.updater->Update(ctx, original.fields, original.key);
}
} // namespace redis
diff --git a/src/search/indexer.h b/src/search/indexer.h
index 20819a2f..b6d00431 100644
--- a/src/search/indexer.h
+++ b/src/search/indexer.h
@@ -75,15 +75,16 @@ struct IndexUpdater {
const kqir::IndexInfo *info = nullptr;
GlobalIndexer *indexer = nullptr;
+ std::mutex update_mutex;
explicit IndexUpdater(const kqir::IndexInfo *info) : info(info) {}
StatusOr<FieldValues> Record(engine::Context &ctx, std::string_view key)
const;
Status UpdateIndex(engine::Context &ctx, const std::string &field,
std::string_view key, const kqir::Value &original,
- const kqir::Value ¤t) const;
- Status Update(engine::Context &ctx, const FieldValues &original,
std::string_view key) const;
+ const kqir::Value ¤t);
+ Status Update(engine::Context &ctx, const FieldValues &original,
std::string_view key);
- Status Build(engine::Context &ctx) const;
+ Status Build(engine::Context &ctx);
Status UpdateTagIndex(engine::Context &ctx, std::string_view key, const
kqir::Value &original,
const kqir::Value ¤t, const SearchKey
&search_key, const TagFieldMetadata *tag) const;
@@ -92,25 +93,25 @@ struct IndexUpdater {
const NumericFieldMetadata *num) const;
Status UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key,
const kqir::Value &original,
const kqir::Value ¤t, const SearchKey
&search_key,
- HnswVectorFieldMetadata *vector) const;
+ HnswVectorFieldMetadata *vector);
};
struct GlobalIndexer {
using FieldValues = IndexUpdater::FieldValues;
struct RecordResult {
- IndexUpdater updater;
+ IndexUpdater *updater;
std::string key;
FieldValues fields;
};
- tsl::htrie_map<char, IndexUpdater> prefix_map;
- std::vector<IndexUpdater> updater_list;
+ tsl::htrie_map<char, IndexUpdater *> prefix_map;
+ std::vector<std::unique_ptr<IndexUpdater>> updater_list;
engine::Storage *storage = nullptr;
explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {}
- void Add(IndexUpdater updater);
+ void Add(std::unique_ptr<IndexUpdater> updater);
void Remove(const kqir::IndexInfo *index);
StatusOr<RecordResult> Record(engine::Context &ctx, std::string_view key,
const std::string &ns);
diff --git a/src/search/redis_query_transformer.h
b/src/search/redis_query_transformer.h
index ed7c8fc6..7cc3a903 100644
--- a/src/search/redis_query_transformer.h
+++ b/src/search/redis_query_transformer.h
@@ -171,7 +171,7 @@ struct Transformer : ir::TreeTransformer {
if (Is<UnsignedInteger>(knn_search->children[1])) {
k = *ParseInt(knn_search->children[1]->string());
} else {
- k = *ParseInt(GET_OR_RET(GetParam(node)));
+ k = *ParseInt(GET_OR_RET(GetParam(knn_search->children[1])));
}
return
std::make_unique<VectorKnnExpr>(std::make_unique<FieldRef>(knn_search->children[2]->string()),
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index 4e5ea3fc..5e7ecf1a 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -48,7 +48,7 @@ struct IndexerTest : TestBase {
map.emplace("hashtest", std::move(hash_info));
- redis::IndexUpdater hash_updater{map.at("hashtest").get()};
+ auto hash_updater =
std::make_unique<redis::IndexUpdater>(map.at("hashtest").get());
redis::IndexMetadata json_field_meta;
json_field_meta.on_data_type = redis::IndexOnDataType::JSON;
@@ -65,7 +65,7 @@ struct IndexerTest : TestBase {
map.emplace("jsontest", std::move(json_info));
- redis::IndexUpdater json_updater{map.at("jsontest").get()};
+ auto json_updater =
std::make_unique<redis::IndexUpdater>(map.at("jsontest").get());
indexer.Add(std::move(hash_updater));
indexer.Add(std::move(json_updater));
@@ -87,7 +87,7 @@ TEST_F(IndexerTest, HashTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_EQ(s.Msg(), Status::ok_msg);
- ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_TRUE(s->fields.empty());
uint64_t cnt = 0;
@@ -120,7 +120,7 @@ TEST_F(IndexerTest, HashTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_TRUE(s);
- ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_EQ(s->fields.size(), 1);
ASSERT_EQ(s->fields["x"], T("food,kitChen,Beauty"));
@@ -178,7 +178,7 @@ TEST_F(IndexerTest, JsonTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_TRUE(s);
- ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_TRUE(s->fields.empty());
auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})");
@@ -210,7 +210,7 @@ TEST_F(IndexerTest, JsonTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_TRUE(s);
- ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_EQ(s->fields.size(), 1);
ASSERT_EQ(s->fields["$.x"], T("food,kitChen,Beauty"));
@@ -262,7 +262,7 @@ TEST_F(IndexerTest, JsonTagBuildIndex) {
auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})");
ASSERT_TRUE(s_set.ok());
- auto s2 = indexer.updater_list[1].Build(*ctx_);
+ auto s2 = indexer.updater_list[1]->Build(*ctx_);
ASSERT_EQ(s2.Msg(), Status::ok_msg);
auto key = redis::SearchKey(ns, idxname,
"$.x").ConstructTagFieldData("food", key1);
@@ -301,7 +301,7 @@ TEST_F(IndexerTest, JsonHnswVector) {
{
auto s = indexer.Record(*ctx_, key3, ns);
ASSERT_TRUE(s);
- ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_TRUE(s->fields.empty());
auto s_set = db.Set(*ctx_, key3, "$", R"({"z": [1,2,3]})");
diff --git a/tests/cppunit/plan_executor_test.cc
b/tests/cppunit/plan_executor_test.cc
index 3524c143..ecff4bb1 100644
--- a/tests/cppunit/plan_executor_test.cc
+++ b/tests/cppunit/plan_executor_test.cc
@@ -386,7 +386,7 @@ std::vector<std::unique_ptr<ScopedUpdate>>
ScopedUpdates(engine::Context& ctx, r
TEST_F(PlanExecutorTestC, NumericFieldScan) {
redis::GlobalIndexer indexer(storage_.get());
- indexer.Add(redis::IndexUpdater(IndexI()));
+ indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));
{
engine::Context ctx(storage_.get());
@@ -428,7 +428,7 @@ TEST_F(PlanExecutorTestC, NumericFieldScan) {
TEST_F(PlanExecutorTestC, TagFieldScan) {
redis::GlobalIndexer indexer(storage_.get());
- indexer.Add(redis::IndexUpdater(IndexI()));
+ indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));
{
engine::Context ctx(storage_.get());
@@ -467,7 +467,7 @@ TEST_F(PlanExecutorTestC, TagFieldScan) {
TEST_F(PlanExecutorTestC, HnswVectorFieldScans) {
redis::GlobalIndexer indexer(storage_.get());
- indexer.Add(redis::IndexUpdater(IndexI()));
+ indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));
{
auto updates = ScopedUpdates(*ctx_, indexer,