This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 2d896298 feat(search): implement IndexManager and command 
FT.CREATE|INFO|_LIST|SEARCH|SEARCHSQL (#2349)
2d896298 is described below

commit 2d89629854c1100236e1aa84d01c6d838777846e
Author: Twice <[email protected]>
AuthorDate: Sun Jun 2 17:04:42 2024 +0900

    feat(search): implement IndexManager and command 
FT.CREATE|INFO|_LIST|SEARCH|SEARCHSQL (#2349)
---
 src/commands/cmd_search.cc            | 289 ++++++++++++++++++++++++++++++++++
 src/commands/commander.h              |  28 ++++
 src/search/index_info.h               |  11 +-
 src/search/index_manager.h            | 209 ++++++++++++++++++++++++
 src/search/indexer.cc                 |  10 +-
 src/search/indexer.h                  |   9 +-
 src/search/ir_sema_checker.h          |   3 +-
 src/search/search_encoding.h          |  19 ++-
 src/server/namespace.h                |   1 +
 src/server/redis_connection.cc        |  32 ++++
 src/server/server.cc                  |  15 +-
 src/server/server.h                   |   6 +
 src/storage/compact_filter.h          |  19 +++
 src/storage/storage.cc                |   9 +-
 src/types/json.h                      |   1 +
 tests/cppunit/indexer_test.cc         |  28 ++--
 tests/cppunit/ir_dot_dumper_test.cc   |   4 +-
 tests/cppunit/ir_pass_test.cc         |   3 +-
 tests/cppunit/ir_sema_checker_test.cc |   3 +-
 tests/cppunit/plan_executor_test.cc   |   9 +-
 20 files changed, 670 insertions(+), 38 deletions(-)

diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc
new file mode 100644
index 00000000..0ca8f5dd
--- /dev/null
+++ b/src/commands/cmd_search.cc
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <variant>
+
+#include "commander.h"
+#include "commands/command_parser.h"
+#include "search/index_info.h"
+#include "search/ir.h"
+#include "search/redis_query_transformer.h"
+#include "search/search_encoding.h"
+#include "search/sql_transformer.h"
+#include "server/redis_reply.h"
+#include "server/server.h"
+#include "tao/pegtl/string_input.hpp"
+
+namespace redis {
+
+class CommandFTCreate : public Commander {
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+
+    auto index_name = GET_OR_RET(parser.TakeStr());
+    if (index_name.empty()) {
+      return {Status::RedisParseErr, "index name cannot be empty"};
+    }
+
+    index_info_ = std::make_unique<kqir::IndexInfo>(index_name, 
redis::IndexMetadata{}, "");
+    auto data_type = IndexOnDataType(0);
+
+    while (parser.Good()) {
+      if (parser.EatEqICase("ON")) {
+        if (parser.EatEqICase("HASH")) {
+          data_type = IndexOnDataType::HASH;
+        } else if (parser.EatEqICase("JSON")) {
+          data_type = IndexOnDataType::JSON;
+        } else {
+          return {Status::RedisParseErr, "expect HASH or JSON after ON"};
+        }
+      } else if (parser.EatEqICase("PREFIX")) {
+        size_t count = GET_OR_RET(parser.TakeInt<size_t>());
+
+        for (size_t i = 0; i < count; ++i) {
+          
index_info_->prefixes.prefixes.push_back(GET_OR_RET(parser.TakeStr()));
+        }
+      } else {
+        break;
+      }
+    }
+
+    if (int(data_type) == 0) {
+      return {Status::RedisParseErr, "expect ON HASH | JSON"};
+    } else {
+      index_info_->metadata.on_data_type = data_type;
+    }
+
+    if (parser.EatEqICase("SCHEMA")) {
+      while (parser.Good()) {
+        auto field_name = GET_OR_RET(parser.TakeStr());
+        if (field_name.empty()) {
+          return {Status::RedisParseErr, "field name cannot be empty"};
+        }
+
+        std::unique_ptr<redis::IndexFieldMetadata> field_meta;
+        if (parser.EatEqICase("TAG")) {
+          field_meta = std::make_unique<redis::TagFieldMetadata>();
+        } else if (parser.EatEqICase("NUMERIC")) {
+          field_meta = std::make_unique<redis::NumericFieldMetadata>();
+        } else {
+          return {Status::RedisParseErr, "expect field type TAG or NUMERIC"};
+        }
+
+        while (parser.Good()) {
+          if (parser.EatEqICase("NOINDEX")) {
+            field_meta->noindex = true;
+          } else if (auto tag = dynamic_cast<redis::TagFieldMetadata 
*>(field_meta.get())) {
+            if (parser.EatEqICase("CASESENSITIVE")) {
+              tag->case_sensitive = true;
+            } else if (parser.EatEqICase("SEPARATOR")) {
+              auto sep = GET_OR_RET(parser.TakeStr());
+
+              if (sep.size() != 1) {
+                return {Status::NotOK, "only one character separator is 
supported"};
+              }
+
+              tag->separator = sep[0];
+            } else {
+              break;
+            }
+          } else {
+            break;
+          }
+        }
+
+        kqir::FieldInfo field_info(field_name, std::move(field_meta));
+
+        index_info_->Add(std::move(field_info));
+      }
+    } else {
+      return {Status::RedisParseErr, "expect SCHEMA section for this index"};
+    }
+
+    if (parser.Good()) {
+      return {Status::RedisParseErr, "more token than expected in command 
arguments"};
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    index_info_->ns = conn->GetNamespace();
+
+    GET_OR_RET(srv->index_mgr.Create(std::move(index_info_)));
+
+    output->append(redis::SimpleString("OK"));
+    return Status::OK();
+  };
+
+ private:
+  std::unique_ptr<kqir::IndexInfo> index_info_;
+};
+
+class CommandFTSearchSQL : public Commander {
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    const auto &sql = args_[1];
+
+    auto ir = GET_OR_RET(kqir::sql::ParseToIR(kqir::peg::string_input(sql, 
"ft.searchsql")));
+
+    auto results = GET_OR_RET(srv->index_mgr.Search(std::move(ir), 
conn->GetNamespace()));
+
+    output->append(MultiLen(results.size()));
+    for (const auto &[key, fields, _] : results) {
+      output->append(MultiLen(2));
+      output->append(redis::BulkString(key));
+      output->append(MultiLen(fields.size()));
+      for (const auto &[_, field] : fields) {
+        output->append(redis::BulkString(field));
+      }
+    }
+
+    return Status::OK();
+  };
+};
+
+class CommandFTSearch : public Commander {
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+
+    auto index_name = GET_OR_RET(parser.TakeStr());
+    auto query_str = GET_OR_RET(parser.TakeStr());
+
+    auto index_ref = std::make_unique<kqir::IndexRef>(index_name);
+    auto query = kqir::Node::MustAs<kqir::QueryExpr>(
+        
GET_OR_RET(kqir::redis_query::ParseToIR(kqir::peg::string_input(query_str, 
"ft.search"))));
+
+    auto select = 
std::make_unique<kqir::SelectClause>(std::vector<std::unique_ptr<kqir::FieldRef>>{});
+    std::unique_ptr<kqir::SortByClause> sort_by;
+    std::unique_ptr<kqir::LimitClause> limit;
+    while (parser.Good()) {
+      if (parser.EatEqICase("RETURNS")) {
+        auto count = GET_OR_RET(parser.TakeInt<size_t>());
+
+        for (size_t i = 0; i < count; ++i) {
+          auto field = GET_OR_RET(parser.TakeStr());
+          select->fields.push_back(std::make_unique<kqir::FieldRef>(field));
+        }
+      } else if (parser.EatEqICase("SORTBY")) {
+        auto field = GET_OR_RET(parser.TakeStr());
+        auto order = kqir::SortByClause::ASC;
+        if (parser.EatEqICase("ASC")) {
+          // NOOP
+        } else if (parser.EatEqICase("DESC")) {
+          order = kqir::SortByClause::DESC;
+        }
+
+        sort_by = std::make_unique<kqir::SortByClause>(order, 
std::make_unique<kqir::FieldRef>(field));
+      } else if (parser.EatEqICase("LIMIT")) {
+        auto offset = GET_OR_RET(parser.TakeInt<size_t>());
+        auto count = GET_OR_RET(parser.TakeInt<size_t>());
+
+        limit = std::make_unique<kqir::LimitClause>(offset, count);
+      } else {
+        return parser.InvalidSyntax();
+      }
+    }
+
+    ir_ = std::make_unique<kqir::SearchExpr>(std::move(index_ref), 
std::move(query), std::move(limit),
+                                             std::move(sort_by), 
std::move(select));
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    CHECK(ir_);
+    auto results = GET_OR_RET(srv->index_mgr.Search(std::move(ir_), 
conn->GetNamespace()));
+
+    output->append(MultiLen(results.size()));
+    for (const auto &[key, fields, _] : results) {
+      output->append(MultiLen(2));
+      output->append(redis::BulkString(key));
+      output->append(MultiLen(fields.size()));
+      for (const auto &[_, field] : fields) {
+        output->append(redis::BulkString(field));
+      }
+    }
+
+    return Status::OK();
+  };
+
+ private:
+  std::unique_ptr<kqir::Node> ir_;
+};
+
+class CommandFTInfo : public Commander {
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    const auto &index_map = srv->index_mgr.index_map;
+    const auto &index_name = args_[1];
+
+    auto iter = index_map.Find(index_name, conn->GetNamespace());
+    if (iter == index_map.end()) {
+      return {Status::RedisExecErr, "index not found"};
+    }
+
+    const auto &info = iter->second;
+    output->append(MultiLen(8));
+
+    output->append(redis::SimpleString("index_name"));
+    output->append(redis::BulkString(info->name));
+
+    output->append(redis::SimpleString("on_data_type"));
+    
output->append(redis::BulkString(RedisTypeNames[(size_t)info->metadata.on_data_type]));
+
+    output->append(redis::SimpleString("prefixes"));
+    output->append(redis::ArrayOfBulkStrings(info->prefixes.prefixes));
+
+    output->append(redis::SimpleString("fields"));
+    output->append(MultiLen(info->fields.size()));
+    for (const auto &[_, field] : info->fields) {
+      output->append(MultiLen(2));
+      output->append(redis::BulkString(field.name));
+      auto type = field.metadata->Type();
+      output->append(redis::BulkString(std::string(type.begin(), type.end())));
+    }
+
+    return Status::OK();
+  };
+};
+
+class CommandFTList : public Commander {
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    const auto &index_map = srv->index_mgr.index_map;
+
+    std::vector<std::string> results;
+    for (const auto &[_, index] : index_map) {
+      if (index->ns == conn->GetNamespace()) {
+        results.push_back(index->name);
+      }
+    }
+
+    output->append(ArrayOfBulkStrings(results));
+
+    return Status::OK();
+  };
+};
+
+REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandFTCreate>("ft.create", -2, "write 
exclusive no-multi no-script", 0, 0, 0),
+                        MakeCmdAttr<CommandFTSearchSQL>("ft.searchsql", 2, 
"read-only", 0, 0, 0),
+                        MakeCmdAttr<CommandFTSearch>("ft.search", -3, 
"read-only", 0, 0, 0),
+                        MakeCmdAttr<CommandFTInfo>("ft.info", 2, "read-only", 
0, 0, 0),
+                        MakeCmdAttr<CommandFTList>("ft._list", 1, "read-only", 
0, 0, 0));
+
+}  // namespace redis
diff --git a/src/commands/commander.h b/src/commands/commander.h
index 1982d897..1441581d 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -106,6 +106,13 @@ struct CommandKeyRange {
   // step length of key position
   // e.g. key step 2 means "key other key other ..." sequence
   int key_step;
+
+  template <typename F>
+  void ForEachKey(F &&f, const std::vector<std::string> &args) const {
+    for (size_t i = first_key; last_key > 0 ? i <= size_t(last_key) : i <= 
args.size() + last_key; i += key_step) {
+      std::forward<F>(f)(args[i]);
+    }
+  }
 };
 
 using CommandKeyRangeGen = std::function<CommandKeyRange(const 
std::vector<std::string> &)>;
@@ -153,6 +160,27 @@ struct CommandAttributes {
   bool CheckArity(int cmd_size) const {
     return !((arity > 0 && cmd_size != arity) || (arity < 0 && cmd_size < 
-arity));
   }
+
+  template <typename F>
+  void ForEachKeyRange(F &&f, const std::vector<std::string> &args) const {
+    if (key_range.first_key > 0) {
+      std::forward<F>(f)(args, key_range);
+    } else if (key_range.first_key == -1) {
+      redis::CommandKeyRange range = key_range_gen(args);
+
+      if (range.first_key > 0) {
+        std::forward<F>(f)(args, range);
+      }
+    } else if (key_range.first_key == -2) {
+      std::vector<redis::CommandKeyRange> vec_range = key_range_vec_gen(args);
+
+      for (const auto &range : vec_range) {
+        if (range.first_key > 0) {
+          std::forward<F>(f)(args, range);
+        }
+      }
+    }
+  }
 };
 
 using CommandMap = std::map<std::string, const CommandAttributes *>;
diff --git a/src/search/index_info.h b/src/search/index_info.h
index ba5e6af3..3badc372 100644
--- a/src/search/index_info.h
+++ b/src/search/index_info.h
@@ -20,12 +20,14 @@
 
 #pragma once
 
+#include <algorithm>
 #include <map>
 #include <memory>
 #include <string>
 #include <utility>
 
 #include "search_encoding.h"
+#include "storage/redis_metadata.h"
 
 namespace kqir {
 
@@ -67,6 +69,13 @@ struct IndexInfo {
   }
 };
 
-using IndexMap = std::map<std::string, std::unique_ptr<IndexInfo>>;
+struct IndexMap : std::map<std::string, std::unique_ptr<IndexInfo>> {
+  auto Insert(std::unique_ptr<IndexInfo> index_info) {
+    auto key = ComposeNamespaceKey(index_info->ns, index_info->name, false);
+    return emplace(key, std::move(index_info));
+  }
+
+  auto Find(std::string_view index, std::string_view ns) const { return 
find(ComposeNamespaceKey(ns, index, false)); }
+};
 
 }  // namespace kqir
diff --git a/src/search/index_manager.h b/src/search/index_manager.h
new file mode 100644
index 00000000..222c1440
--- /dev/null
+++ b/src/search/index_manager.h
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "db_util.h"
+#include "encoding.h"
+#include "search/index_info.h"
+#include "search/indexer.h"
+#include "search/ir.h"
+#include "search/ir_sema_checker.h"
+#include "search/passes/manager.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "status.h"
+#include "storage/storage.h"
+#include "string_util.h"
+
+namespace redis {
+
+struct IndexManager {
+  kqir::IndexMap index_map;
+  GlobalIndexer *indexer;
+  engine::Storage *storage;
+
+  IndexManager(GlobalIndexer *indexer, engine::Storage *storage) : 
indexer(indexer), storage(storage) {}
+
+  Status Load(const std::string &ns) {
+    // currently index cannot work in cluster mode
+    if (storage->GetConfig()->cluster_enabled) {
+      return Status::OK();
+    }
+
+    util::UniqueIterator iter(storage, storage->DefaultScanOptions(), 
ColumnFamilyID::Search);
+    auto begin = SearchKey{ns, ""}.ConstructIndexMeta();
+
+    for (iter->Seek(begin); iter->Valid(); iter->Next()) {
+      auto key = iter->key();
+
+      uint8_t ns_size = 0;
+      if (!GetFixed8(&key, &ns_size)) break;
+      if (ns_size != ns.size()) break;
+      if (!key.starts_with(ns)) break;
+      key.remove_prefix(ns_size);
+
+      uint8_t subkey_type = 0;
+      if (!GetFixed8(&key, &subkey_type)) break;
+      if (subkey_type != (uint8_t)SearchSubkeyType::INDEX_META) break;
+
+      Slice index_name;
+      if (!GetSizedString(&key, &index_name)) break;
+
+      IndexMetadata metadata;
+      auto index_meta_value = iter->value();
+      if (auto s = metadata.Decode(&index_meta_value); !s.ok()) {
+        return {Status::NotOK, fmt::format("fail to decode index metadata for 
index: {}", index_name)};
+      }
+
+      auto index_key = SearchKey(ns, index_name.ToStringView());
+      std::string prefix_value;
+      if (auto s = storage->Get(storage->DefaultMultiGetOptions(), 
storage->GetCFHandle(ColumnFamilyID::Search),
+                                index_key.ConstructIndexPrefixes(), 
&prefix_value);
+          !s.ok()) {
+        return {Status::NotOK, fmt::format("fail to find index prefixes for 
index: {}", index_name)};
+      }
+
+      IndexPrefixes prefixes;
+      Slice prefix_slice = prefix_value;
+      if (auto s = prefixes.Decode(&prefix_slice); !s.ok()) {
+        return {Status::NotOK, fmt::format("fail to decode index prefixes for 
index: {}", index_name)};
+      }
+
+      auto info = std::make_unique<kqir::IndexInfo>(index_name.ToString(), 
metadata, ns);
+      info->prefixes = prefixes;
+
+      util::UniqueIterator field_iter(storage, storage->DefaultScanOptions(), 
ColumnFamilyID::Search);
+      auto field_begin = index_key.ConstructFieldMeta();
+
+      for (field_iter->Seek(field_begin); field_iter->Valid(); 
field_iter->Next()) {
+        auto key = field_iter->key();
+
+        uint8_t ns_size = 0;
+        if (!GetFixed8(&key, &ns_size)) break;
+        if (ns_size != ns.size()) break;
+        if (!key.starts_with(ns)) break;
+        key.remove_prefix(ns_size);
+
+        uint8_t subkey_type = 0;
+        if (!GetFixed8(&key, &subkey_type)) break;
+        if (subkey_type != (uint8_t)SearchSubkeyType::FIELD_META) break;
+
+        Slice value;
+        if (!GetSizedString(&key, &value)) break;
+        if (value != index_name) break;
+
+        if (!GetSizedString(&key, &value)) break;
+
+        auto field_name = value;
+        auto field_value = field_iter->value();
+
+        std::unique_ptr<IndexFieldMetadata> field_meta;
+        if (auto s = IndexFieldMetadata::Decode(&field_value, field_meta); 
!s.ok()) {
+          return {Status::NotOK, fmt::format("fail to decode index field 
metadata for index {}, field {}: {}",
+                                             index_name, field_name, 
s.ToString())};
+        }
+
+        info->Add(kqir::FieldInfo(field_name.ToString(), 
std::move(field_meta)));
+      }
+
+      IndexUpdater updater(info.get());
+      indexer->Add(updater);
+      index_map.Insert(std::move(info));
+    }
+
+    return Status::OK();
+  }
+
+  Status Create(std::unique_ptr<kqir::IndexInfo> info) {
+    if (storage->GetConfig()->cluster_enabled) {
+      return {Status::NotOK, "currently index cannot work in cluster mode"};
+    }
+
+    if (auto iter = index_map.Find(info->name, info->ns); iter != 
index_map.end()) {
+      return {Status::NotOK, "index already exists"};
+    }
+
+    SearchKey index_key(info->ns, info->name);
+    auto cf = storage->GetCFHandle(ColumnFamilyID::Search);
+
+    auto batch = storage->GetWriteBatchBase();
+
+    std::string meta_val;
+    info->metadata.Encode(&meta_val);
+    batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
+
+    std::string prefix_val;
+    info->prefixes.Encode(&prefix_val);
+    batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
+
+    for (const auto &[_, field_info] : info->fields) {
+      SearchKey field_key(info->ns, info->name, field_info.name);
+
+      std::string field_val;
+      field_info.metadata->Encode(&field_val);
+
+      batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
+    }
+
+    if (auto s = storage->Write(storage->DefaultWriteOptions(), 
batch->GetWriteBatch()); !s.ok()) {
+      return {Status::NotOK, "failed to write index metadata"};
+    }
+
+    IndexUpdater updater(info.get());
+    indexer->Add(updater);
+    index_map.Insert(std::move(info));
+
+    for (auto updater : indexer->updater_list) {
+      GET_OR_RET(updater.Build());
+    }
+
+    return Status::OK();
+  }
+
+  StatusOr<std::vector<kqir::ExecutorContext::RowType>> 
Search(std::unique_ptr<kqir::Node> ir,
+                                                               const 
std::string &ns) const {
+    kqir::SemaChecker sema_checker(index_map);
+    sema_checker.ns = ns;
+
+    GET_OR_RET(sema_checker.Check(ir.get()));
+
+    auto plan_ir = kqir::PassManager::Execute(kqir::PassManager::Default(), 
std::move(ir));
+    std::unique_ptr<kqir::PlanOperator> plan_op;
+    if (plan_op = kqir::Node::As<kqir::PlanOperator>(std::move(plan_ir)); 
!plan_op) {
+      return {Status::NotOK, "failed to convert the SQL query to plan 
operators"};
+    }
+
+    kqir::ExecutorContext executor_ctx(plan_op.get(), storage);
+
+    std::vector<kqir::ExecutorContext::RowType> results;
+
+    auto iter_res = GET_OR_RET(executor_ctx.Next());
+    while (!std::holds_alternative<kqir::ExecutorNode::End>(iter_res)) {
+      results.push_back(std::get<kqir::ExecutorContext::RowType>(iter_res));
+
+      iter_res = GET_OR_RET(executor_ctx.Next());
+    }
+
+    return results;
+  }
+};
+
+}  // namespace redis
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 8a90c8f0..cc4735a0 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -265,7 +265,7 @@ Status IndexUpdater::Build() const {
 void GlobalIndexer::Add(IndexUpdater updater) {
   updater.indexer = this;
   for (const auto &prefix : updater.info->prefixes) {
-    prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, 
storage->IsSlotIdEncoded()), updater);
+    prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false), 
updater);
   }
   updater_list.push_back(updater);
 }
@@ -275,17 +275,17 @@ StatusOr<GlobalIndexer::RecordResult> 
GlobalIndexer::Record(std::string_view key
     return Status::NoPrefixMatched;
   }
 
-  auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key, 
storage->IsSlotIdEncoded()));
+  auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key, false));
   if (iter != prefix_map.end()) {
     auto updater = iter.value();
-    return std::make_pair(updater, GET_OR_RET(updater.Record(key)));
+    return RecordResult{updater, std::string(key.begin(), key.end()), 
GET_OR_RET(updater.Record(key))};
   }
 
   return {Status::NoPrefixMatched};
 }
 
-Status GlobalIndexer::Update(const RecordResult &original, std::string_view 
key) {
-  return original.first.Update(original.second, key);
+Status GlobalIndexer::Update(const RecordResult &original) {
+  return original.updater.Update(original.fields, original.key);
 }
 
 }  // namespace redis
diff --git a/src/search/indexer.h b/src/search/indexer.h
index 34a4cf93..33cea303 100644
--- a/src/search/indexer.h
+++ b/src/search/indexer.h
@@ -32,7 +32,6 @@
 #include "index_info.h"
 #include "indexer.h"
 #include "search/search_encoding.h"
-#include "server/server.h"
 #include "storage/redis_metadata.h"
 #include "storage/storage.h"
 #include "types/redis_hash.h"
@@ -90,7 +89,11 @@ struct IndexUpdater {
 
 struct GlobalIndexer {
   using FieldValues = IndexUpdater::FieldValues;
-  using RecordResult = std::pair<IndexUpdater, FieldValues>;
+  struct RecordResult {
+    IndexUpdater updater;
+    std::string key;
+    FieldValues fields;
+  };
 
   tsl::htrie_map<char, IndexUpdater> prefix_map;
   std::vector<IndexUpdater> updater_list;
@@ -101,7 +104,7 @@ struct GlobalIndexer {
 
   void Add(IndexUpdater updater);
   StatusOr<RecordResult> Record(std::string_view key, const std::string &ns);
-  static Status Update(const RecordResult &original, std::string_view key);
+  static Status Update(const RecordResult &original);
 };
 
 }  // namespace redis
diff --git a/src/search/ir_sema_checker.h b/src/search/ir_sema_checker.h
index 5e100621..a7a76181 100644
--- a/src/search/ir_sema_checker.h
+++ b/src/search/ir_sema_checker.h
@@ -33,6 +33,7 @@ namespace kqir {
 
 struct SemaChecker {
   const IndexMap &index_map;
+  std::string ns;
 
   const IndexInfo *current_index = nullptr;
 
@@ -41,7 +42,7 @@ struct SemaChecker {
   Status Check(Node *node) {
     if (auto v = dynamic_cast<SearchExpr *>(node)) {
       auto index_name = v->index->name;
-      if (auto iter = index_map.find(index_name); iter != index_map.end()) {
+      if (auto iter = index_map.Find(index_name, ns); iter != index_map.end()) 
{
         current_index = iter->second.get();
         v->index->info = current_index;
 
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 3819fd9b..5b4cf739 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -180,6 +180,8 @@ struct IndexFieldMetadata {
   bool noindex = false;
   IndexFieldType type;
 
+  explicit IndexFieldMetadata(IndexFieldType type) : type(type) {}
+
   // flag: <noindex: 1 bit> <type: 4 bit> <reserved: 3 bit>
   uint8_t MakeFlag() const { return noindex | (uint8_t)type << 1; }
 
@@ -192,6 +194,17 @@ struct IndexFieldMetadata {
 
   virtual ~IndexFieldMetadata() = default;
 
+  std::string_view Type() const {
+    switch (type) {
+      case IndexFieldType::TAG:
+        return "tag";
+      case IndexFieldType::NUMERIC:
+        return "numeric";
+      default:
+        return "unknown";
+    }
+  }
+
   virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); }
 
   virtual rocksdb::Status Decode(Slice *input) {
@@ -213,6 +226,8 @@ struct TagFieldMetadata : IndexFieldMetadata {
   char separator = ',';
   bool case_sensitive = false;
 
+  TagFieldMetadata() : IndexFieldMetadata(IndexFieldType::TAG) {}
+
   void Encode(std::string *dst) const override {
     IndexFieldMetadata::Encode(dst);
     PutFixed8(dst, separator);
@@ -224,7 +239,7 @@ struct TagFieldMetadata : IndexFieldMetadata {
       return s;
     }
 
-    if (input->size() < 8 + 8) {
+    if (input->size() < 2) {
       return rocksdb::Status::Corruption(kErrorInsufficientLength);
     }
 
@@ -235,6 +250,8 @@ struct TagFieldMetadata : IndexFieldMetadata {
 };
 
 struct NumericFieldMetadata : IndexFieldMetadata {
+  NumericFieldMetadata() : IndexFieldMetadata(IndexFieldType::NUMERIC) {}
+
   bool IsSortable() const override { return true; }
 };
 
diff --git a/src/server/namespace.h b/src/server/namespace.h
index 1eadd656..3e22d382 100644
--- a/src/server/namespace.h
+++ b/src/server/namespace.h
@@ -48,6 +48,7 @@ class Namespace {
   rocksdb::ColumnFamilyHandle *cf_ = nullptr;
 
   std::shared_mutex tokens_mu_;
+  // mapping from token to namespace name
   std::map<std::string, std::string> tokens_;
 
   Status loadFromDB(std::map<std::string, std::string> *db_tokens) const;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 536c4d20..f3960a66 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -28,6 +28,7 @@
 #include "commands/commander.h"
 #include "commands/error_constants.h"
 #include "fmt/format.h"
+#include "search/indexer.h"
 #include "server/redis_reply.h"
 #include "string_util.h"
 #ifdef ENABLE_OPENSSL
@@ -411,6 +412,10 @@ Status Connection::ExecuteCommand(const std::string 
&cmd_name, const std::vector
   return s;
 }
 
+static bool IsHashOrJsonCommand(const std::string &cmd) {
+  return util::HasPrefix(cmd, "h") || util::HasPrefix(cmd, "json.");
+}
+
 void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
   const Config *config = srv_->GetConfig();
   std::string reply;
@@ -537,9 +542,36 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
       continue;
     }
 
+    // TODO: transaction support for index recording
+    std::vector<GlobalIndexer::RecordResult> index_records;
+    if (IsHashOrJsonCommand(cmd_name) && (attributes->flags & 
redis::kCmdWrite) && !config->cluster_enabled) {
+      attributes->ForEachKeyRange(
+          [&, this](const std::vector<std::string> &args, const 
CommandKeyRange &key_range) {
+            key_range.ForEachKey(
+                [&, this](const std::string &key) {
+                  auto res = srv_->indexer.Record(key, ns_);
+                  if (res.IsOK()) {
+                    index_records.push_back(*res);
+                  } else if (!res.Is<Status::NoPrefixMatched>() && 
!res.Is<Status::TypeMismatched>()) {
+                    LOG(WARNING) << "index recording failed for key: " << key;
+                  }
+                },
+                args);
+          },
+          cmd_tokens);
+    }
+
     SetLastCmd(cmd_name);
     s = ExecuteCommand(cmd_name, cmd_tokens, current_cmd.get(), &reply);
 
+    // TODO: transaction support for index updating
+    for (const auto &record : index_records) {
+      auto s = GlobalIndexer::Update(record);
+      if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
+        LOG(WARNING) << "index updating failed for key: " << record.key;
+      }
+    }
+
     // Break the execution loop when occurring the blocking command like BLPOP 
or BRPOP,
     // it will suspend the connection and wait for the wakeup signal.
     if (s.Is<Status::BlockingCmd>()) {
diff --git a/src/server/server.cc b/src/server/server.cc
index 7ed970bf..2da7dfa0 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -39,6 +39,7 @@
 
 #include "commands/commander.h"
 #include "config.h"
+#include "config/config.h"
 #include "fmt/format.h"
 #include "redis_connection.h"
 #include "storage/compaction_checker.h"
@@ -52,7 +53,12 @@
 #include "worker.h"
 
 Server::Server(engine::Storage *storage, Config *config)
-    : storage(storage), start_time_secs_(util::GetTimeStamp()), 
config_(config), namespace_(storage) {
+    : storage(storage),
+      indexer(storage),
+      index_mgr(&indexer, storage),
+      start_time_secs_(util::GetTimeStamp()),
+      config_(config),
+      namespace_(storage) {
   // init commands stats here to prevent concurrent insert, and cause core
   auto commands = redis::CommandTable::GetOriginal();
   for (const auto &iter : *commands) {
@@ -150,6 +156,13 @@ Status Server::Start() {
     }
   }
 
+  if (!config_->cluster_enabled) {
+    GET_OR_RET(index_mgr.Load(kDefaultNamespace));
+    for (auto [_, ns] : namespace_.List()) {
+      GET_OR_RET(index_mgr.Load(ns));
+    }
+  }
+
   if (config_->cluster_enabled) {
     if (config_->persist_cluster_nodes_enabled) {
       auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
diff --git a/src/server/server.h b/src/server/server.h
index ad967c77..c1793e81 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -44,6 +44,8 @@
 #include "commands/commander.h"
 #include "lua.hpp"
 #include "namespace.h"
+#include "search/index_manager.h"
+#include "search/indexer.h"
 #include "server/redis_connection.h"
 #include "stats/log_collector.h"
 #include "stats/stats.h"
@@ -313,6 +315,10 @@ class Server {
   UniqueSSLContext ssl_ctx;
 #endif
 
+  // search
+  redis::GlobalIndexer indexer;
+  redis::IndexManager index_mgr;
+
  private:
   void cron();
   void recordInstantaneousMetrics();
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index 9788c849..118bb8f6 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -124,4 +124,23 @@ class PubSubFilterFactory : public 
rocksdb::CompactionFilterFactory {
   }
 };
 
+class SearchFilter : public rocksdb::CompactionFilter {
+ public:
+  const char *Name() const override { return "SearchFilter"; }
+  bool Filter(int level, const Slice &key, const Slice &value, std::string 
*new_value, bool *modified) const override {
+    // TODO: just a dummy one here
+    return false;
+  }
+};
+
+class SearchFilterFactory : public rocksdb::CompactionFilterFactory {
+ public:
+  SearchFilterFactory() = default;
+  const char *Name() const override { return "SearchFilterFactory"; }
+  std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
+      const rocksdb::CompactionFilter::Context &context) override {
+    return std::unique_ptr<rocksdb::CompactionFilter>(new SearchFilter());
+  }
+};
+
 }  // namespace engine
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index bbba44ce..8cdb63bf 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -338,6 +338,13 @@ Status Storage::Open(DBOpenMode mode) {
   propagate_opts.disable_auto_compactions = 
config_->rocks_db.disable_auto_compactions;
   SetBlobDB(&propagate_opts);
 
+  rocksdb::BlockBasedTableOptions search_table_opts = InitTableOptions();
+  rocksdb::ColumnFamilyOptions search_opts(options);
+  
search_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(search_table_opts));
+  search_opts.compaction_filter_factory = 
std::make_shared<SearchFilterFactory>();
+  search_opts.disable_auto_compactions = 
config_->rocks_db.disable_auto_compactions;
+  SetBlobDB(&search_opts);
+
   std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
   // Caution: don't change the order of column family, or the handle will be 
mismatched
   column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
@@ -346,7 +353,7 @@ Status Storage::Open(DBOpenMode mode) {
   column_families.emplace_back(std::string(kPubSubColumnFamilyName), 
pubsub_opts);
   column_families.emplace_back(std::string(kPropagateColumnFamilyName), 
propagate_opts);
   column_families.emplace_back(std::string(kStreamColumnFamilyName), 
subkey_opts);
-  column_families.emplace_back(std::string(kSearchColumnFamilyName), 
subkey_opts);
+  column_families.emplace_back(std::string(kSearchColumnFamilyName), 
search_opts);
 
   std::vector<std::string> old_column_families;
   auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, 
&old_column_families);
diff --git a/src/types/json.h b/src/types/json.h
index c9b681c4..0ce88797 100644
--- a/src/types/json.h
+++ b/src/types/json.h
@@ -39,6 +39,7 @@
 #include "common/string_util.h"
 #include "jsoncons_ext/jsonpath/jsonpath_error.hpp"
 #include "status.h"
+#include "storage/redis_metadata.h"
 
 template <class T>
 using Optionals = std::vector<std::optional<T>>;
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index b5d7c3f9..d9b3656e 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -80,14 +80,14 @@ TEST_F(IndexerTest, HashTag) {
   {
     auto s = indexer.Record(key1, ns);
     ASSERT_EQ(s.Msg(), Status::ok_msg);
-    ASSERT_EQ(s->first.info->name, idxname);
-    ASSERT_TRUE(s->second.empty());
+    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_TRUE(s->fields.empty());
 
     uint64_t cnt = 0;
     db.Set(key1, "x", "food,kitChen,Beauty", &cnt);
     ASSERT_EQ(cnt, 1);
 
-    auto s2 = indexer.Update(*s, key1);
+    auto s2 = indexer.Update(*s);
     ASSERT_TRUE(s2);
 
     auto key = redis::SearchKey(ns, idxname, 
"x").ConstructTagFieldData("food", key1);
@@ -113,16 +113,16 @@ TEST_F(IndexerTest, HashTag) {
   {
     auto s = indexer.Record(key1, ns);
     ASSERT_TRUE(s);
-    ASSERT_EQ(s->first.info->name, idxname);
-    ASSERT_EQ(s->second.size(), 1);
-    ASSERT_EQ(s->second["x"], "food,kitChen,Beauty");
+    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_EQ(s->fields.size(), 1);
+    ASSERT_EQ(s->fields["x"], "food,kitChen,Beauty");
 
     uint64_t cnt = 0;
     auto s_set = db.Set(key1, "x", "Clothing,FOOD,sport", &cnt);
     ASSERT_EQ(cnt, 0);
     ASSERT_TRUE(s_set.ok());
 
-    auto s2 = indexer.Update(*s, key1);
+    auto s2 = indexer.Update(*s);
     ASSERT_TRUE(s2);
 
     auto key = redis::SearchKey(ns, idxname, 
"x").ConstructTagFieldData("food", key1);
@@ -171,13 +171,13 @@ TEST_F(IndexerTest, JsonTag) {
   {
     auto s = indexer.Record(key1, ns);
     ASSERT_TRUE(s);
-    ASSERT_EQ(s->first.info->name, idxname);
-    ASSERT_TRUE(s->second.empty());
+    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_TRUE(s->fields.empty());
 
     auto s_set = db.Set(key1, "$", R"({"x": "food,kitChen,Beauty"})");
     ASSERT_TRUE(s_set.ok());
 
-    auto s2 = indexer.Update(*s, key1);
+    auto s2 = indexer.Update(*s);
     ASSERT_TRUE(s2);
 
     auto key = redis::SearchKey(ns, idxname, 
"$.x").ConstructTagFieldData("food", key1);
@@ -203,14 +203,14 @@ TEST_F(IndexerTest, JsonTag) {
   {
     auto s = indexer.Record(key1, ns);
     ASSERT_TRUE(s);
-    ASSERT_EQ(s->first.info->name, idxname);
-    ASSERT_EQ(s->second.size(), 1);
-    ASSERT_EQ(s->second["$.x"], "food,kitChen,Beauty");
+    ASSERT_EQ(s->updater.info->name, idxname);
+    ASSERT_EQ(s->fields.size(), 1);
+    ASSERT_EQ(s->fields["$.x"], "food,kitChen,Beauty");
 
     auto s_set = db.Set(key1, "$.x", "\"Clothing,FOOD,sport\"");
     ASSERT_TRUE(s_set.ok());
 
-    auto s2 = indexer.Update(*s, key1);
+    auto s2 = indexer.Update(*s);
     ASSERT_TRUE(s2);
 
     auto key = redis::SearchKey(ns, idxname, 
"$.x").ConstructTagFieldData("food", key1);
diff --git a/tests/cppunit/ir_dot_dumper_test.cc 
b/tests/cppunit/ir_dot_dumper_test.cc
index bc50c7a9..d4ae7292 100644
--- a/tests/cppunit/ir_dot_dumper_test.cc
+++ b/tests/cppunit/ir_dot_dumper_test.cc
@@ -29,6 +29,7 @@
 #include "search/passes/manager.h"
 #include "search/search_encoding.h"
 #include "search/sql_transformer.h"
+#include "storage/redis_metadata.h"
 
 using namespace kqir;
 
@@ -79,9 +80,8 @@ static IndexMap MakeIndexMap() {
   ia->Add(std::move(f4));
   ia->Add(std::move(f5));
 
-  auto& name = ia->name;
   IndexMap res;
-  res.emplace(name, std::move(ia));
+  res.Insert(std::move(ia));
   return res;
 }
 
diff --git a/tests/cppunit/ir_pass_test.cc b/tests/cppunit/ir_pass_test.cc
index 70811f81..81ed49e8 100644
--- a/tests/cppunit/ir_pass_test.cc
+++ b/tests/cppunit/ir_pass_test.cc
@@ -183,9 +183,8 @@ static IndexMap MakeIndexMap() {
   ia->Add(std::move(f4));
   ia->Add(std::move(f5));
 
-  auto& name = ia->name;
   IndexMap res;
-  res.emplace(name, std::move(ia));
+  res.Insert(std::move(ia));
   return res;
 }
 
diff --git a/tests/cppunit/ir_sema_checker_test.cc 
b/tests/cppunit/ir_sema_checker_test.cc
index 8223e5ed..3a15dde7 100644
--- a/tests/cppunit/ir_sema_checker_test.cc
+++ b/tests/cppunit/ir_sema_checker_test.cc
@@ -43,9 +43,8 @@ static IndexMap MakeIndexMap() {
   ia->Add(std::move(f2));
   ia->Add(std::move(f3));
 
-  auto& name = ia->name;
   IndexMap res;
-  res.emplace(name, std::move(ia));
+  res.Insert(std::move(ia));
   return res;
 }
 
diff --git a/tests/cppunit/plan_executor_test.cc 
b/tests/cppunit/plan_executor_test.cc
index e41ea94b..20f1f009 100644
--- a/tests/cppunit/plan_executor_test.cc
+++ b/tests/cppunit/plan_executor_test.cc
@@ -48,9 +48,8 @@ static IndexMap MakeIndexMap() {
   ia->Add(std::move(f2));
   ia->Add(std::move(f3));
 
-  auto& name = ia->name;
   IndexMap res;
-  res.emplace(name, std::move(ia));
+  res.Insert(std::move(ia));
   return res;
 }
 
@@ -79,8 +78,8 @@ TEST(PlanExecutorTest, Mock) {
   ASSERT_EQ(ctx.Next().GetValue(), exe_end);
 }
 
-static auto IndexI() -> const IndexInfo* { return index_map.at("ia").get(); }
-static auto FieldI(const std::string& f) -> const FieldInfo* { return 
&index_map.at("ia")->fields.at(f); }
+static auto IndexI() -> const IndexInfo* { return index_map.Find("ia", 
"search_ns")->second.get(); }
+static auto FieldI(const std::string& f) -> const FieldInfo* { return 
&IndexI()->fields.at(f); }
 
 TEST(PlanExecutorTest, TopNSort) {
   std::vector<ExecutorNode::RowType> data{
@@ -317,7 +316,7 @@ struct ScopedUpdate {
   ScopedUpdate& operator=(ScopedUpdate&&) = delete;
 
   ~ScopedUpdate() {
-    auto s = redis::GlobalIndexer::Update(rr, key);
+    auto s = redis::GlobalIndexer::Update(rr);
     EXPECT_EQ(s.Msg(), Status::ok_msg);
   }
 };


Reply via email to