This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 2d896298 feat(search): implement IndexManager and command
FT.CREATE|INFO|_LIST|SEARCH|SEARCHSQL (#2349)
2d896298 is described below
commit 2d89629854c1100236e1aa84d01c6d838777846e
Author: Twice <[email protected]>
AuthorDate: Sun Jun 2 17:04:42 2024 +0900
feat(search): implement IndexManager and command
FT.CREATE|INFO|_LIST|SEARCH|SEARCHSQL (#2349)
---
src/commands/cmd_search.cc | 289 ++++++++++++++++++++++++++++++++++
src/commands/commander.h | 28 ++++
src/search/index_info.h | 11 +-
src/search/index_manager.h | 209 ++++++++++++++++++++++++
src/search/indexer.cc | 10 +-
src/search/indexer.h | 9 +-
src/search/ir_sema_checker.h | 3 +-
src/search/search_encoding.h | 19 ++-
src/server/namespace.h | 1 +
src/server/redis_connection.cc | 32 ++++
src/server/server.cc | 15 +-
src/server/server.h | 6 +
src/storage/compact_filter.h | 19 +++
src/storage/storage.cc | 9 +-
src/types/json.h | 1 +
tests/cppunit/indexer_test.cc | 28 ++--
tests/cppunit/ir_dot_dumper_test.cc | 4 +-
tests/cppunit/ir_pass_test.cc | 3 +-
tests/cppunit/ir_sema_checker_test.cc | 3 +-
tests/cppunit/plan_executor_test.cc | 9 +-
20 files changed, 670 insertions(+), 38 deletions(-)
diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc
new file mode 100644
index 00000000..0ca8f5dd
--- /dev/null
+++ b/src/commands/cmd_search.cc
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <variant>
+
+#include "commander.h"
+#include "commands/command_parser.h"
+#include "search/index_info.h"
+#include "search/ir.h"
+#include "search/redis_query_transformer.h"
+#include "search/search_encoding.h"
+#include "search/sql_transformer.h"
+#include "server/redis_reply.h"
+#include "server/server.h"
+#include "tao/pegtl/string_input.hpp"
+
+namespace redis {
+
+class CommandFTCreate : public Commander {
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+
+ auto index_name = GET_OR_RET(parser.TakeStr());
+ if (index_name.empty()) {
+ return {Status::RedisParseErr, "index name cannot be empty"};
+ }
+
+ index_info_ = std::make_unique<kqir::IndexInfo>(index_name,
redis::IndexMetadata{}, "");
+ auto data_type = IndexOnDataType(0);
+
+ while (parser.Good()) {
+ if (parser.EatEqICase("ON")) {
+ if (parser.EatEqICase("HASH")) {
+ data_type = IndexOnDataType::HASH;
+ } else if (parser.EatEqICase("JSON")) {
+ data_type = IndexOnDataType::JSON;
+ } else {
+ return {Status::RedisParseErr, "expect HASH or JSON after ON"};
+ }
+ } else if (parser.EatEqICase("PREFIX")) {
+ size_t count = GET_OR_RET(parser.TakeInt<size_t>());
+
+ for (size_t i = 0; i < count; ++i) {
+
index_info_->prefixes.prefixes.push_back(GET_OR_RET(parser.TakeStr()));
+ }
+ } else {
+ break;
+ }
+ }
+
+ if (int(data_type) == 0) {
+ return {Status::RedisParseErr, "expect ON HASH | JSON"};
+ } else {
+ index_info_->metadata.on_data_type = data_type;
+ }
+
+ if (parser.EatEqICase("SCHEMA")) {
+ while (parser.Good()) {
+ auto field_name = GET_OR_RET(parser.TakeStr());
+ if (field_name.empty()) {
+ return {Status::RedisParseErr, "field name cannot be empty"};
+ }
+
+ std::unique_ptr<redis::IndexFieldMetadata> field_meta;
+ if (parser.EatEqICase("TAG")) {
+ field_meta = std::make_unique<redis::TagFieldMetadata>();
+ } else if (parser.EatEqICase("NUMERIC")) {
+ field_meta = std::make_unique<redis::NumericFieldMetadata>();
+ } else {
+ return {Status::RedisParseErr, "expect field type TAG or NUMERIC"};
+ }
+
+ while (parser.Good()) {
+ if (parser.EatEqICase("NOINDEX")) {
+ field_meta->noindex = true;
+ } else if (auto tag = dynamic_cast<redis::TagFieldMetadata
*>(field_meta.get())) {
+ if (parser.EatEqICase("CASESENSITIVE")) {
+ tag->case_sensitive = true;
+ } else if (parser.EatEqICase("SEPARATOR")) {
+ auto sep = GET_OR_RET(parser.TakeStr());
+
+ if (sep.size() != 1) {
+ return {Status::NotOK, "only one character separator is
supported"};
+ }
+
+ tag->separator = sep[0];
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ kqir::FieldInfo field_info(field_name, std::move(field_meta));
+
+ index_info_->Add(std::move(field_info));
+ }
+ } else {
+ return {Status::RedisParseErr, "expect SCHEMA section for this index"};
+ }
+
+ if (parser.Good()) {
+ return {Status::RedisParseErr, "more token than expected in command
arguments"};
+ }
+
+ return Status::OK();
+ }
+
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ index_info_->ns = conn->GetNamespace();
+
+ GET_OR_RET(srv->index_mgr.Create(std::move(index_info_)));
+
+ output->append(redis::SimpleString("OK"));
+ return Status::OK();
+ };
+
+ private:
+ std::unique_ptr<kqir::IndexInfo> index_info_;
+};
+
+class CommandFTSearchSQL : public Commander {
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ const auto &sql = args_[1];
+
+ auto ir = GET_OR_RET(kqir::sql::ParseToIR(kqir::peg::string_input(sql,
"ft.searchsql")));
+
+ auto results = GET_OR_RET(srv->index_mgr.Search(std::move(ir),
conn->GetNamespace()));
+
+ output->append(MultiLen(results.size()));
+ for (const auto &[key, fields, _] : results) {
+ output->append(MultiLen(2));
+ output->append(redis::BulkString(key));
+ output->append(MultiLen(fields.size()));
+ for (const auto &[_, field] : fields) {
+ output->append(redis::BulkString(field));
+ }
+ }
+
+ return Status::OK();
+ };
+};
+
+class CommandFTSearch : public Commander {
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+
+ auto index_name = GET_OR_RET(parser.TakeStr());
+ auto query_str = GET_OR_RET(parser.TakeStr());
+
+ auto index_ref = std::make_unique<kqir::IndexRef>(index_name);
+ auto query = kqir::Node::MustAs<kqir::QueryExpr>(
+
GET_OR_RET(kqir::redis_query::ParseToIR(kqir::peg::string_input(query_str,
"ft.search"))));
+
+ auto select =
std::make_unique<kqir::SelectClause>(std::vector<std::unique_ptr<kqir::FieldRef>>{});
+ std::unique_ptr<kqir::SortByClause> sort_by;
+ std::unique_ptr<kqir::LimitClause> limit;
+ while (parser.Good()) {
+ if (parser.EatEqICase("RETURNS")) {
+ auto count = GET_OR_RET(parser.TakeInt<size_t>());
+
+ for (size_t i = 0; i < count; ++i) {
+ auto field = GET_OR_RET(parser.TakeStr());
+ select->fields.push_back(std::make_unique<kqir::FieldRef>(field));
+ }
+ } else if (parser.EatEqICase("SORTBY")) {
+ auto field = GET_OR_RET(parser.TakeStr());
+ auto order = kqir::SortByClause::ASC;
+ if (parser.EatEqICase("ASC")) {
+ // NOOP
+ } else if (parser.EatEqICase("DESC")) {
+ order = kqir::SortByClause::DESC;
+ }
+
+ sort_by = std::make_unique<kqir::SortByClause>(order,
std::make_unique<kqir::FieldRef>(field));
+ } else if (parser.EatEqICase("LIMIT")) {
+ auto offset = GET_OR_RET(parser.TakeInt<size_t>());
+ auto count = GET_OR_RET(parser.TakeInt<size_t>());
+
+ limit = std::make_unique<kqir::LimitClause>(offset, count);
+ } else {
+ return parser.InvalidSyntax();
+ }
+ }
+
+ ir_ = std::make_unique<kqir::SearchExpr>(std::move(index_ref),
std::move(query), std::move(limit),
+ std::move(sort_by),
std::move(select));
+ return Status::OK();
+ }
+
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ CHECK(ir_);
+ auto results = GET_OR_RET(srv->index_mgr.Search(std::move(ir_),
conn->GetNamespace()));
+
+ output->append(MultiLen(results.size()));
+ for (const auto &[key, fields, _] : results) {
+ output->append(MultiLen(2));
+ output->append(redis::BulkString(key));
+ output->append(MultiLen(fields.size()));
+ for (const auto &[_, field] : fields) {
+ output->append(redis::BulkString(field));
+ }
+ }
+
+ return Status::OK();
+ };
+
+ private:
+ std::unique_ptr<kqir::Node> ir_;
+};
+
+class CommandFTInfo : public Commander {
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ const auto &index_map = srv->index_mgr.index_map;
+ const auto &index_name = args_[1];
+
+ auto iter = index_map.Find(index_name, conn->GetNamespace());
+ if (iter == index_map.end()) {
+ return {Status::RedisExecErr, "index not found"};
+ }
+
+ const auto &info = iter->second;
+ output->append(MultiLen(8));
+
+ output->append(redis::SimpleString("index_name"));
+ output->append(redis::BulkString(info->name));
+
+ output->append(redis::SimpleString("on_data_type"));
+
output->append(redis::BulkString(RedisTypeNames[(size_t)info->metadata.on_data_type]));
+
+ output->append(redis::SimpleString("prefixes"));
+ output->append(redis::ArrayOfBulkStrings(info->prefixes.prefixes));
+
+ output->append(redis::SimpleString("fields"));
+ output->append(MultiLen(info->fields.size()));
+ for (const auto &[_, field] : info->fields) {
+ output->append(MultiLen(2));
+ output->append(redis::BulkString(field.name));
+ auto type = field.metadata->Type();
+ output->append(redis::BulkString(std::string(type.begin(), type.end())));
+ }
+
+ return Status::OK();
+ };
+};
+
+class CommandFTList : public Commander {
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ const auto &index_map = srv->index_mgr.index_map;
+
+ std::vector<std::string> results;
+ for (const auto &[_, index] : index_map) {
+ if (index->ns == conn->GetNamespace()) {
+ results.push_back(index->name);
+ }
+ }
+
+ output->append(ArrayOfBulkStrings(results));
+
+ return Status::OK();
+ };
+};
+
+REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandFTCreate>("ft.create", -2, "write
exclusive no-multi no-script", 0, 0, 0),
+ MakeCmdAttr<CommandFTSearchSQL>("ft.searchsql", 2,
"read-only", 0, 0, 0),
+ MakeCmdAttr<CommandFTSearch>("ft.search", -3,
"read-only", 0, 0, 0),
+ MakeCmdAttr<CommandFTInfo>("ft.info", 2, "read-only",
0, 0, 0),
+ MakeCmdAttr<CommandFTList>("ft._list", 1, "read-only",
0, 0, 0));
+
+} // namespace redis
diff --git a/src/commands/commander.h b/src/commands/commander.h
index 1982d897..1441581d 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -106,6 +106,13 @@ struct CommandKeyRange {
// step length of key position
// e.g. key step 2 means "key other key other ..." sequence
int key_step;
+
+ template <typename F>
+ void ForEachKey(F &&f, const std::vector<std::string> &args) const {
+ for (size_t i = first_key; last_key > 0 ? i <= size_t(last_key) : i <=
args.size() + last_key; i += key_step) {
+ std::forward<F>(f)(args[i]);
+ }
+ }
};
using CommandKeyRangeGen = std::function<CommandKeyRange(const
std::vector<std::string> &)>;
@@ -153,6 +160,27 @@ struct CommandAttributes {
bool CheckArity(int cmd_size) const {
return !((arity > 0 && cmd_size != arity) || (arity < 0 && cmd_size <
-arity));
}
+
+ template <typename F>
+ void ForEachKeyRange(F &&f, const std::vector<std::string> &args) const {
+ if (key_range.first_key > 0) {
+ std::forward<F>(f)(args, key_range);
+ } else if (key_range.first_key == -1) {
+ redis::CommandKeyRange range = key_range_gen(args);
+
+ if (range.first_key > 0) {
+ std::forward<F>(f)(args, range);
+ }
+ } else if (key_range.first_key == -2) {
+ std::vector<redis::CommandKeyRange> vec_range = key_range_vec_gen(args);
+
+ for (const auto &range : vec_range) {
+ if (range.first_key > 0) {
+ std::forward<F>(f)(args, range);
+ }
+ }
+ }
+ }
};
using CommandMap = std::map<std::string, const CommandAttributes *>;
diff --git a/src/search/index_info.h b/src/search/index_info.h
index ba5e6af3..3badc372 100644
--- a/src/search/index_info.h
+++ b/src/search/index_info.h
@@ -20,12 +20,14 @@
#pragma once
+#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include "search_encoding.h"
+#include "storage/redis_metadata.h"
namespace kqir {
@@ -67,6 +69,13 @@ struct IndexInfo {
}
};
-using IndexMap = std::map<std::string, std::unique_ptr<IndexInfo>>;
+struct IndexMap : std::map<std::string, std::unique_ptr<IndexInfo>> {
+ auto Insert(std::unique_ptr<IndexInfo> index_info) {
+ auto key = ComposeNamespaceKey(index_info->ns, index_info->name, false);
+ return emplace(key, std::move(index_info));
+ }
+
+ auto Find(std::string_view index, std::string_view ns) const { return
find(ComposeNamespaceKey(ns, index, false)); }
+};
} // namespace kqir
diff --git a/src/search/index_manager.h b/src/search/index_manager.h
new file mode 100644
index 00000000..222c1440
--- /dev/null
+++ b/src/search/index_manager.h
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "db_util.h"
+#include "encoding.h"
+#include "search/index_info.h"
+#include "search/indexer.h"
+#include "search/ir.h"
+#include "search/ir_sema_checker.h"
+#include "search/passes/manager.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "status.h"
+#include "storage/storage.h"
+#include "string_util.h"
+
+namespace redis {
+
+struct IndexManager {
+ kqir::IndexMap index_map;
+ GlobalIndexer *indexer;
+ engine::Storage *storage;
+
+ IndexManager(GlobalIndexer *indexer, engine::Storage *storage) :
indexer(indexer), storage(storage) {}
+
+ Status Load(const std::string &ns) {
+ // currently index cannot work in cluster mode
+ if (storage->GetConfig()->cluster_enabled) {
+ return Status::OK();
+ }
+
+ util::UniqueIterator iter(storage, storage->DefaultScanOptions(),
ColumnFamilyID::Search);
+ auto begin = SearchKey{ns, ""}.ConstructIndexMeta();
+
+ for (iter->Seek(begin); iter->Valid(); iter->Next()) {
+ auto key = iter->key();
+
+ uint8_t ns_size = 0;
+ if (!GetFixed8(&key, &ns_size)) break;
+ if (ns_size != ns.size()) break;
+ if (!key.starts_with(ns)) break;
+ key.remove_prefix(ns_size);
+
+ uint8_t subkey_type = 0;
+ if (!GetFixed8(&key, &subkey_type)) break;
+ if (subkey_type != (uint8_t)SearchSubkeyType::INDEX_META) break;
+
+ Slice index_name;
+ if (!GetSizedString(&key, &index_name)) break;
+
+ IndexMetadata metadata;
+ auto index_meta_value = iter->value();
+ if (auto s = metadata.Decode(&index_meta_value); !s.ok()) {
+ return {Status::NotOK, fmt::format("fail to decode index metadata for
index: {}", index_name)};
+ }
+
+ auto index_key = SearchKey(ns, index_name.ToStringView());
+ std::string prefix_value;
+ if (auto s = storage->Get(storage->DefaultMultiGetOptions(),
storage->GetCFHandle(ColumnFamilyID::Search),
+ index_key.ConstructIndexPrefixes(),
&prefix_value);
+ !s.ok()) {
+ return {Status::NotOK, fmt::format("fail to find index prefixes for
index: {}", index_name)};
+ }
+
+ IndexPrefixes prefixes;
+ Slice prefix_slice = prefix_value;
+ if (auto s = prefixes.Decode(&prefix_slice); !s.ok()) {
+ return {Status::NotOK, fmt::format("fail to decode index prefixes for
index: {}", index_name)};
+ }
+
+ auto info = std::make_unique<kqir::IndexInfo>(index_name.ToString(),
metadata, ns);
+ info->prefixes = prefixes;
+
+ util::UniqueIterator field_iter(storage, storage->DefaultScanOptions(),
ColumnFamilyID::Search);
+ auto field_begin = index_key.ConstructFieldMeta();
+
+ for (field_iter->Seek(field_begin); field_iter->Valid();
field_iter->Next()) {
+ auto key = field_iter->key();
+
+ uint8_t ns_size = 0;
+ if (!GetFixed8(&key, &ns_size)) break;
+ if (ns_size != ns.size()) break;
+ if (!key.starts_with(ns)) break;
+ key.remove_prefix(ns_size);
+
+ uint8_t subkey_type = 0;
+ if (!GetFixed8(&key, &subkey_type)) break;
+ if (subkey_type != (uint8_t)SearchSubkeyType::FIELD_META) break;
+
+ Slice value;
+ if (!GetSizedString(&key, &value)) break;
+ if (value != index_name) break;
+
+ if (!GetSizedString(&key, &value)) break;
+
+ auto field_name = value;
+ auto field_value = field_iter->value();
+
+ std::unique_ptr<IndexFieldMetadata> field_meta;
+ if (auto s = IndexFieldMetadata::Decode(&field_value, field_meta);
!s.ok()) {
+ return {Status::NotOK, fmt::format("fail to decode index field
metadata for index {}, field {}: {}",
+ index_name, field_name,
s.ToString())};
+ }
+
+ info->Add(kqir::FieldInfo(field_name.ToString(),
std::move(field_meta)));
+ }
+
+ IndexUpdater updater(info.get());
+ indexer->Add(updater);
+ index_map.Insert(std::move(info));
+ }
+
+ return Status::OK();
+ }
+
+ Status Create(std::unique_ptr<kqir::IndexInfo> info) {
+ if (storage->GetConfig()->cluster_enabled) {
+ return {Status::NotOK, "currently index cannot work in cluster mode"};
+ }
+
+ if (auto iter = index_map.Find(info->name, info->ns); iter !=
index_map.end()) {
+ return {Status::NotOK, "index already exists"};
+ }
+
+ SearchKey index_key(info->ns, info->name);
+ auto cf = storage->GetCFHandle(ColumnFamilyID::Search);
+
+ auto batch = storage->GetWriteBatchBase();
+
+ std::string meta_val;
+ info->metadata.Encode(&meta_val);
+ batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
+
+ std::string prefix_val;
+ info->prefixes.Encode(&prefix_val);
+ batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
+
+ for (const auto &[_, field_info] : info->fields) {
+ SearchKey field_key(info->ns, info->name, field_info.name);
+
+ std::string field_val;
+ field_info.metadata->Encode(&field_val);
+
+ batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
+ }
+
+ if (auto s = storage->Write(storage->DefaultWriteOptions(),
batch->GetWriteBatch()); !s.ok()) {
+ return {Status::NotOK, "failed to write index metadata"};
+ }
+
+ IndexUpdater updater(info.get());
+ indexer->Add(updater);
+ index_map.Insert(std::move(info));
+
+ for (auto updater : indexer->updater_list) {
+ GET_OR_RET(updater.Build());
+ }
+
+ return Status::OK();
+ }
+
+ StatusOr<std::vector<kqir::ExecutorContext::RowType>>
Search(std::unique_ptr<kqir::Node> ir,
+ const
std::string &ns) const {
+ kqir::SemaChecker sema_checker(index_map);
+ sema_checker.ns = ns;
+
+ GET_OR_RET(sema_checker.Check(ir.get()));
+
+ auto plan_ir = kqir::PassManager::Execute(kqir::PassManager::Default(),
std::move(ir));
+ std::unique_ptr<kqir::PlanOperator> plan_op;
+ if (plan_op = kqir::Node::As<kqir::PlanOperator>(std::move(plan_ir));
!plan_op) {
+ return {Status::NotOK, "failed to convert the SQL query to plan
operators"};
+ }
+
+ kqir::ExecutorContext executor_ctx(plan_op.get(), storage);
+
+ std::vector<kqir::ExecutorContext::RowType> results;
+
+ auto iter_res = GET_OR_RET(executor_ctx.Next());
+ while (!std::holds_alternative<kqir::ExecutorNode::End>(iter_res)) {
+ results.push_back(std::get<kqir::ExecutorContext::RowType>(iter_res));
+
+ iter_res = GET_OR_RET(executor_ctx.Next());
+ }
+
+ return results;
+ }
+};
+
+} // namespace redis
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 8a90c8f0..cc4735a0 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -265,7 +265,7 @@ Status IndexUpdater::Build() const {
void GlobalIndexer::Add(IndexUpdater updater) {
updater.indexer = this;
for (const auto &prefix : updater.info->prefixes) {
- prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix,
storage->IsSlotIdEncoded()), updater);
+ prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false),
updater);
}
updater_list.push_back(updater);
}
@@ -275,17 +275,17 @@ StatusOr<GlobalIndexer::RecordResult>
GlobalIndexer::Record(std::string_view key
return Status::NoPrefixMatched;
}
- auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key,
storage->IsSlotIdEncoded()));
+ auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key, false));
if (iter != prefix_map.end()) {
auto updater = iter.value();
- return std::make_pair(updater, GET_OR_RET(updater.Record(key)));
+ return RecordResult{updater, std::string(key.begin(), key.end()),
GET_OR_RET(updater.Record(key))};
}
return {Status::NoPrefixMatched};
}
-Status GlobalIndexer::Update(const RecordResult &original, std::string_view
key) {
- return original.first.Update(original.second, key);
+Status GlobalIndexer::Update(const RecordResult &original) {
+ return original.updater.Update(original.fields, original.key);
}
} // namespace redis
diff --git a/src/search/indexer.h b/src/search/indexer.h
index 34a4cf93..33cea303 100644
--- a/src/search/indexer.h
+++ b/src/search/indexer.h
@@ -32,7 +32,6 @@
#include "index_info.h"
#include "indexer.h"
#include "search/search_encoding.h"
-#include "server/server.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "types/redis_hash.h"
@@ -90,7 +89,11 @@ struct IndexUpdater {
struct GlobalIndexer {
using FieldValues = IndexUpdater::FieldValues;
- using RecordResult = std::pair<IndexUpdater, FieldValues>;
+ struct RecordResult {
+ IndexUpdater updater;
+ std::string key;
+ FieldValues fields;
+ };
tsl::htrie_map<char, IndexUpdater> prefix_map;
std::vector<IndexUpdater> updater_list;
@@ -101,7 +104,7 @@ struct GlobalIndexer {
void Add(IndexUpdater updater);
StatusOr<RecordResult> Record(std::string_view key, const std::string &ns);
- static Status Update(const RecordResult &original, std::string_view key);
+ static Status Update(const RecordResult &original);
};
} // namespace redis
diff --git a/src/search/ir_sema_checker.h b/src/search/ir_sema_checker.h
index 5e100621..a7a76181 100644
--- a/src/search/ir_sema_checker.h
+++ b/src/search/ir_sema_checker.h
@@ -33,6 +33,7 @@ namespace kqir {
struct SemaChecker {
const IndexMap &index_map;
+ std::string ns;
const IndexInfo *current_index = nullptr;
@@ -41,7 +42,7 @@ struct SemaChecker {
Status Check(Node *node) {
if (auto v = dynamic_cast<SearchExpr *>(node)) {
auto index_name = v->index->name;
- if (auto iter = index_map.find(index_name); iter != index_map.end()) {
+ if (auto iter = index_map.Find(index_name, ns); iter != index_map.end())
{
current_index = iter->second.get();
v->index->info = current_index;
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 3819fd9b..5b4cf739 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -180,6 +180,8 @@ struct IndexFieldMetadata {
bool noindex = false;
IndexFieldType type;
+ explicit IndexFieldMetadata(IndexFieldType type) : type(type) {}
+
// flag: <noindex: 1 bit> <type: 4 bit> <reserved: 3 bit>
uint8_t MakeFlag() const { return noindex | (uint8_t)type << 1; }
@@ -192,6 +194,17 @@ struct IndexFieldMetadata {
virtual ~IndexFieldMetadata() = default;
+ std::string_view Type() const {
+ switch (type) {
+ case IndexFieldType::TAG:
+ return "tag";
+ case IndexFieldType::NUMERIC:
+ return "numeric";
+ default:
+ return "unknown";
+ }
+ }
+
virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); }
virtual rocksdb::Status Decode(Slice *input) {
@@ -213,6 +226,8 @@ struct TagFieldMetadata : IndexFieldMetadata {
char separator = ',';
bool case_sensitive = false;
+ TagFieldMetadata() : IndexFieldMetadata(IndexFieldType::TAG) {}
+
void Encode(std::string *dst) const override {
IndexFieldMetadata::Encode(dst);
PutFixed8(dst, separator);
@@ -224,7 +239,7 @@ struct TagFieldMetadata : IndexFieldMetadata {
return s;
}
- if (input->size() < 8 + 8) {
+ if (input->size() < 2) {
return rocksdb::Status::Corruption(kErrorInsufficientLength);
}
@@ -235,6 +250,8 @@ struct TagFieldMetadata : IndexFieldMetadata {
};
struct NumericFieldMetadata : IndexFieldMetadata {
+ NumericFieldMetadata() : IndexFieldMetadata(IndexFieldType::NUMERIC) {}
+
bool IsSortable() const override { return true; }
};
diff --git a/src/server/namespace.h b/src/server/namespace.h
index 1eadd656..3e22d382 100644
--- a/src/server/namespace.h
+++ b/src/server/namespace.h
@@ -48,6 +48,7 @@ class Namespace {
rocksdb::ColumnFamilyHandle *cf_ = nullptr;
std::shared_mutex tokens_mu_;
+ // mapping from token to namespace name
std::map<std::string, std::string> tokens_;
Status loadFromDB(std::map<std::string, std::string> *db_tokens) const;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 536c4d20..f3960a66 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -28,6 +28,7 @@
#include "commands/commander.h"
#include "commands/error_constants.h"
#include "fmt/format.h"
+#include "search/indexer.h"
#include "server/redis_reply.h"
#include "string_util.h"
#ifdef ENABLE_OPENSSL
@@ -411,6 +412,10 @@ Status Connection::ExecuteCommand(const std::string
&cmd_name, const std::vector
return s;
}
+static bool IsHashOrJsonCommand(const std::string &cmd) {
+ return util::HasPrefix(cmd, "h") || util::HasPrefix(cmd, "json.");
+}
+
void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
const Config *config = srv_->GetConfig();
std::string reply;
@@ -537,9 +542,36 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
continue;
}
+ // TODO: transaction support for index recording
+ std::vector<GlobalIndexer::RecordResult> index_records;
+ if (IsHashOrJsonCommand(cmd_name) && (attributes->flags &
redis::kCmdWrite) && !config->cluster_enabled) {
+ attributes->ForEachKeyRange(
+ [&, this](const std::vector<std::string> &args, const
CommandKeyRange &key_range) {
+ key_range.ForEachKey(
+ [&, this](const std::string &key) {
+ auto res = srv_->indexer.Record(key, ns_);
+ if (res.IsOK()) {
+ index_records.push_back(*res);
+ } else if (!res.Is<Status::NoPrefixMatched>() &&
!res.Is<Status::TypeMismatched>()) {
+ LOG(WARNING) << "index recording failed for key: " << key;
+ }
+ },
+ args);
+ },
+ cmd_tokens);
+ }
+
SetLastCmd(cmd_name);
s = ExecuteCommand(cmd_name, cmd_tokens, current_cmd.get(), &reply);
+ // TODO: transaction support for index updating
+ for (const auto &record : index_records) {
+ auto s = GlobalIndexer::Update(record);
+ if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
+ LOG(WARNING) << "index updating failed for key: " << record.key;
+ }
+ }
+
// Break the execution loop when occurring the blocking command like BLPOP
or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
if (s.Is<Status::BlockingCmd>()) {
diff --git a/src/server/server.cc b/src/server/server.cc
index 7ed970bf..2da7dfa0 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -39,6 +39,7 @@
#include "commands/commander.h"
#include "config.h"
+#include "config/config.h"
#include "fmt/format.h"
#include "redis_connection.h"
#include "storage/compaction_checker.h"
@@ -52,7 +53,12 @@
#include "worker.h"
Server::Server(engine::Storage *storage, Config *config)
- : storage(storage), start_time_secs_(util::GetTimeStamp()),
config_(config), namespace_(storage) {
+ : storage(storage),
+ indexer(storage),
+ index_mgr(&indexer, storage),
+ start_time_secs_(util::GetTimeStamp()),
+ config_(config),
+ namespace_(storage) {
// init commands stats here to prevent concurrent insert, and cause core
auto commands = redis::CommandTable::GetOriginal();
for (const auto &iter : *commands) {
@@ -150,6 +156,13 @@ Status Server::Start() {
}
}
+ if (!config_->cluster_enabled) {
+ GET_OR_RET(index_mgr.Load(kDefaultNamespace));
+ for (auto [_, ns] : namespace_.List()) {
+ GET_OR_RET(index_mgr.Load(ns));
+ }
+ }
+
if (config_->cluster_enabled) {
if (config_->persist_cluster_nodes_enabled) {
auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
diff --git a/src/server/server.h b/src/server/server.h
index ad967c77..c1793e81 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -44,6 +44,8 @@
#include "commands/commander.h"
#include "lua.hpp"
#include "namespace.h"
+#include "search/index_manager.h"
+#include "search/indexer.h"
#include "server/redis_connection.h"
#include "stats/log_collector.h"
#include "stats/stats.h"
@@ -313,6 +315,10 @@ class Server {
UniqueSSLContext ssl_ctx;
#endif
+ // search
+ redis::GlobalIndexer indexer;
+ redis::IndexManager index_mgr;
+
private:
void cron();
void recordInstantaneousMetrics();
diff --git a/src/storage/compact_filter.h b/src/storage/compact_filter.h
index 9788c849..118bb8f6 100644
--- a/src/storage/compact_filter.h
+++ b/src/storage/compact_filter.h
@@ -124,4 +124,23 @@ class PubSubFilterFactory : public
rocksdb::CompactionFilterFactory {
}
};
+class SearchFilter : public rocksdb::CompactionFilter {
+ public:
+ const char *Name() const override { return "SearchFilter"; }
+ bool Filter(int level, const Slice &key, const Slice &value, std::string
*new_value, bool *modified) const override {
+ // TODO: just a dummy one here
+ return false;
+ }
+};
+
+class SearchFilterFactory : public rocksdb::CompactionFilterFactory {
+ public:
+ SearchFilterFactory() = default;
+ const char *Name() const override { return "SearchFilterFactory"; }
+ std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
+ const rocksdb::CompactionFilter::Context &context) override {
+ return std::unique_ptr<rocksdb::CompactionFilter>(new SearchFilter());
+ }
+};
+
} // namespace engine
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index bbba44ce..8cdb63bf 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -338,6 +338,13 @@ Status Storage::Open(DBOpenMode mode) {
propagate_opts.disable_auto_compactions =
config_->rocks_db.disable_auto_compactions;
SetBlobDB(&propagate_opts);
+ rocksdb::BlockBasedTableOptions search_table_opts = InitTableOptions();
+ rocksdb::ColumnFamilyOptions search_opts(options);
+
search_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(search_table_opts));
+ search_opts.compaction_filter_factory =
std::make_shared<SearchFilterFactory>();
+ search_opts.disable_auto_compactions =
config_->rocks_db.disable_auto_compactions;
+ SetBlobDB(&search_opts);
+
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
// Caution: don't change the order of column family, or the handle will be
mismatched
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
@@ -346,7 +353,7 @@ Status Storage::Open(DBOpenMode mode) {
column_families.emplace_back(std::string(kPubSubColumnFamilyName),
pubsub_opts);
column_families.emplace_back(std::string(kPropagateColumnFamilyName),
propagate_opts);
column_families.emplace_back(std::string(kStreamColumnFamilyName),
subkey_opts);
- column_families.emplace_back(std::string(kSearchColumnFamilyName),
subkey_opts);
+ column_families.emplace_back(std::string(kSearchColumnFamilyName),
search_opts);
std::vector<std::string> old_column_families;
auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir,
&old_column_families);
diff --git a/src/types/json.h b/src/types/json.h
index c9b681c4..0ce88797 100644
--- a/src/types/json.h
+++ b/src/types/json.h
@@ -39,6 +39,7 @@
#include "common/string_util.h"
#include "jsoncons_ext/jsonpath/jsonpath_error.hpp"
#include "status.h"
+#include "storage/redis_metadata.h"
template <class T>
using Optionals = std::vector<std::optional<T>>;
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index b5d7c3f9..d9b3656e 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -80,14 +80,14 @@ TEST_F(IndexerTest, HashTag) {
{
auto s = indexer.Record(key1, ns);
ASSERT_EQ(s.Msg(), Status::ok_msg);
- ASSERT_EQ(s->first.info->name, idxname);
- ASSERT_TRUE(s->second.empty());
+ ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_TRUE(s->fields.empty());
uint64_t cnt = 0;
db.Set(key1, "x", "food,kitChen,Beauty", &cnt);
ASSERT_EQ(cnt, 1);
- auto s2 = indexer.Update(*s, key1);
+ auto s2 = indexer.Update(*s);
ASSERT_TRUE(s2);
auto key = redis::SearchKey(ns, idxname,
"x").ConstructTagFieldData("food", key1);
@@ -113,16 +113,16 @@ TEST_F(IndexerTest, HashTag) {
{
auto s = indexer.Record(key1, ns);
ASSERT_TRUE(s);
- ASSERT_EQ(s->first.info->name, idxname);
- ASSERT_EQ(s->second.size(), 1);
- ASSERT_EQ(s->second["x"], "food,kitChen,Beauty");
+ ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_EQ(s->fields.size(), 1);
+ ASSERT_EQ(s->fields["x"], "food,kitChen,Beauty");
uint64_t cnt = 0;
auto s_set = db.Set(key1, "x", "Clothing,FOOD,sport", &cnt);
ASSERT_EQ(cnt, 0);
ASSERT_TRUE(s_set.ok());
- auto s2 = indexer.Update(*s, key1);
+ auto s2 = indexer.Update(*s);
ASSERT_TRUE(s2);
auto key = redis::SearchKey(ns, idxname,
"x").ConstructTagFieldData("food", key1);
@@ -171,13 +171,13 @@ TEST_F(IndexerTest, JsonTag) {
{
auto s = indexer.Record(key1, ns);
ASSERT_TRUE(s);
- ASSERT_EQ(s->first.info->name, idxname);
- ASSERT_TRUE(s->second.empty());
+ ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_TRUE(s->fields.empty());
auto s_set = db.Set(key1, "$", R"({"x": "food,kitChen,Beauty"})");
ASSERT_TRUE(s_set.ok());
- auto s2 = indexer.Update(*s, key1);
+ auto s2 = indexer.Update(*s);
ASSERT_TRUE(s2);
auto key = redis::SearchKey(ns, idxname,
"$.x").ConstructTagFieldData("food", key1);
@@ -203,14 +203,14 @@ TEST_F(IndexerTest, JsonTag) {
{
auto s = indexer.Record(key1, ns);
ASSERT_TRUE(s);
- ASSERT_EQ(s->first.info->name, idxname);
- ASSERT_EQ(s->second.size(), 1);
- ASSERT_EQ(s->second["$.x"], "food,kitChen,Beauty");
+ ASSERT_EQ(s->updater.info->name, idxname);
+ ASSERT_EQ(s->fields.size(), 1);
+ ASSERT_EQ(s->fields["$.x"], "food,kitChen,Beauty");
auto s_set = db.Set(key1, "$.x", "\"Clothing,FOOD,sport\"");
ASSERT_TRUE(s_set.ok());
- auto s2 = indexer.Update(*s, key1);
+ auto s2 = indexer.Update(*s);
ASSERT_TRUE(s2);
auto key = redis::SearchKey(ns, idxname,
"$.x").ConstructTagFieldData("food", key1);
diff --git a/tests/cppunit/ir_dot_dumper_test.cc
b/tests/cppunit/ir_dot_dumper_test.cc
index bc50c7a9..d4ae7292 100644
--- a/tests/cppunit/ir_dot_dumper_test.cc
+++ b/tests/cppunit/ir_dot_dumper_test.cc
@@ -29,6 +29,7 @@
#include "search/passes/manager.h"
#include "search/search_encoding.h"
#include "search/sql_transformer.h"
+#include "storage/redis_metadata.h"
using namespace kqir;
@@ -79,9 +80,8 @@ static IndexMap MakeIndexMap() {
ia->Add(std::move(f4));
ia->Add(std::move(f5));
- auto& name = ia->name;
IndexMap res;
- res.emplace(name, std::move(ia));
+ res.Insert(std::move(ia));
return res;
}
diff --git a/tests/cppunit/ir_pass_test.cc b/tests/cppunit/ir_pass_test.cc
index 70811f81..81ed49e8 100644
--- a/tests/cppunit/ir_pass_test.cc
+++ b/tests/cppunit/ir_pass_test.cc
@@ -183,9 +183,8 @@ static IndexMap MakeIndexMap() {
ia->Add(std::move(f4));
ia->Add(std::move(f5));
- auto& name = ia->name;
IndexMap res;
- res.emplace(name, std::move(ia));
+ res.Insert(std::move(ia));
return res;
}
diff --git a/tests/cppunit/ir_sema_checker_test.cc
b/tests/cppunit/ir_sema_checker_test.cc
index 8223e5ed..3a15dde7 100644
--- a/tests/cppunit/ir_sema_checker_test.cc
+++ b/tests/cppunit/ir_sema_checker_test.cc
@@ -43,9 +43,8 @@ static IndexMap MakeIndexMap() {
ia->Add(std::move(f2));
ia->Add(std::move(f3));
- auto& name = ia->name;
IndexMap res;
- res.emplace(name, std::move(ia));
+ res.Insert(std::move(ia));
return res;
}
diff --git a/tests/cppunit/plan_executor_test.cc
b/tests/cppunit/plan_executor_test.cc
index e41ea94b..20f1f009 100644
--- a/tests/cppunit/plan_executor_test.cc
+++ b/tests/cppunit/plan_executor_test.cc
@@ -48,9 +48,8 @@ static IndexMap MakeIndexMap() {
ia->Add(std::move(f2));
ia->Add(std::move(f3));
- auto& name = ia->name;
IndexMap res;
- res.emplace(name, std::move(ia));
+ res.Insert(std::move(ia));
return res;
}
@@ -79,8 +78,8 @@ TEST(PlanExecutorTest, Mock) {
ASSERT_EQ(ctx.Next().GetValue(), exe_end);
}
-static auto IndexI() -> const IndexInfo* { return index_map.at("ia").get(); }
-static auto FieldI(const std::string& f) -> const FieldInfo* { return
&index_map.at("ia")->fields.at(f); }
+static auto IndexI() -> const IndexInfo* { return index_map.Find("ia",
"search_ns")->second.get(); }
+static auto FieldI(const std::string& f) -> const FieldInfo* { return
&IndexI()->fields.at(f); }
TEST(PlanExecutorTest, TopNSort) {
std::vector<ExecutorNode::RowType> data{
@@ -317,7 +316,7 @@ struct ScopedUpdate {
ScopedUpdate& operator=(ScopedUpdate&&) = delete;
~ScopedUpdate() {
- auto s = redis::GlobalIndexer::Update(rr, key);
+ auto s = redis::GlobalIndexer::Update(rr);
EXPECT_EQ(s.Msg(), Status::ok_msg);
}
};