This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 0d8f7381 Implement index updating for numeric and tag field (#2115)
0d8f7381 is described below
commit 0d8f7381a57791593d2d91c9a75421c489aadad4
Author: Twice <[email protected]>
AuthorDate: Tue Feb 27 08:52:01 2024 +0900
Implement index updating for numeric and tag field (#2115)
---
src/common/status.h | 14 ++---
src/common/string_util.cc | 12 ++---
src/common/string_util.h | 8 +--
src/search/indexer.cc | 123 +++++++++++++++++++++++++++++++++++++++++--
src/search/indexer.h | 12 ++++-
src/search/search_encoding.h | 2 +
src/storage/storage.cc | 8 ++-
src/storage/storage.h | 2 +
8 files changed, 151 insertions(+), 30 deletions(-)
diff --git a/src/common/status.h b/src/common/status.h
index ade19f86..b425ea5b 100644
--- a/src/common/status.h
+++ b/src/common/status.h
@@ -37,38 +37,30 @@ class [[nodiscard]] Status {
enum Code : unsigned char {
NotOK = 1,
NotFound,
+ NotSupported,
+ InvalidArgument,
// DB
DBOpenErr,
DBBackupErr,
DBGetWALErr,
- DBBackupFileErr,
-
- // Replication
- DBMismatched,
// Redis
RedisUnknownCmd,
RedisInvalidCmd,
RedisParseErr,
RedisExecErr,
- RedisReplicationConflict,
// Cluster
ClusterDown,
ClusterInvalidInfo,
- // Slot
- SlotImport,
-
- // Network
- NetSendErr,
-
// Blocking
BlockingCmd,
// Search
NoPrefixMatched,
+ TypeMismatched,
};
Status() : impl_{nullptr} {}
diff --git a/src/common/string_util.cc b/src/common/string_util.cc
index b3004a3a..ae41f918 100644
--- a/src/common/string_util.cc
+++ b/src/common/string_util.cc
@@ -47,7 +47,7 @@ bool EqualICase(std::string_view lhs, std::string_view rhs) {
[](char l, char r) { return
std::tolower(l) == std::tolower(r); });
}
-std::string Trim(std::string in, const std::string &chars) {
+std::string Trim(std::string in, std::string_view chars) {
if (in.empty()) return in;
in.erase(0, in.find_first_not_of(chars));
@@ -56,7 +56,7 @@ std::string Trim(std::string in, const std::string &chars) {
return in;
}
-std::vector<std::string> Split(const std::string &in, const std::string
&delim) {
+std::vector<std::string> Split(std::string_view in, std::string_view delim) {
std::vector<std::string> out;
if (in.empty()) {
@@ -71,8 +71,8 @@ std::vector<std::string> Split(const std::string &in, const
std::string &delim)
size_t begin = 0, end = in.find_first_of(delim);
do {
- std::string elem = in.substr(begin, end - begin);
- if (!elem.empty()) out.push_back(std::move(elem));
+ std::string_view elem = in.substr(begin, end - begin);
+ if (!elem.empty()) out.emplace_back(elem.begin(), elem.end());
if (end == std::string::npos) break;
begin = end + 1;
end = in.find_first_of(delim, begin);
@@ -228,7 +228,7 @@ std::vector<std::string> RegexMatch(const std::string &str,
const std::string &r
return out;
}
-std::string StringToHex(const std::string &input) {
+std::string StringToHex(std::string_view input) {
static const char hex_digits[] = "0123456789ABCDEF";
std::string output;
output.reserve(input.length() * 2);
@@ -331,7 +331,7 @@ std::vector<std::string> TokenizeRedisProtocol(const
std::string &value) {
/* escape string where all the non-printable characters
* (tested with isprint()) are turned into escapes in
* the form "\n\r\a...." or "\x<hex-number>". */
-std::string EscapeString(const std::string &s) {
+std::string EscapeString(std::string_view s) {
std::string str;
str.reserve(s.size());
diff --git a/src/common/string_util.h b/src/common/string_util.h
index 2ebd7639..d23ebad7 100644
--- a/src/common/string_util.h
+++ b/src/common/string_util.h
@@ -28,15 +28,15 @@ std::string Float2String(double d);
std::string ToLower(std::string in);
bool EqualICase(std::string_view lhs, std::string_view rhs);
std::string BytesToHuman(uint64_t n);
-std::string Trim(std::string in, const std::string &chars);
-std::vector<std::string> Split(const std::string &in, const std::string
&delim);
+std::string Trim(std::string in, std::string_view chars);
+std::vector<std::string> Split(std::string_view in, std::string_view delim);
std::vector<std::string> Split2KV(const std::string &in, const std::string
&delim);
bool HasPrefix(const std::string &str, const std::string &prefix);
int StringMatch(const std::string &pattern, const std::string &in, int nocase);
int StringMatchLen(const char *p, size_t plen, const char *s, size_t slen, int
nocase);
std::vector<std::string> RegexMatch(const std::string &str, const std::string
®ex);
-std::string StringToHex(const std::string &input);
+std::string StringToHex(std::string_view input);
std::vector<std::string> TokenizeRedisProtocol(const std::string &value);
-std::string EscapeString(const std::string &s);
+std::string EscapeString(std::string_view s);
} // namespace util
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index f608d3df..4576e280 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -22,7 +22,11 @@
#include <variant>
+#include "parse_util.h"
+#include "search/search_encoding.h"
#include "storage/redis_metadata.h"
+#include "storage/storage.h"
+#include "string_util.h"
#include "types/redis_hash.h"
namespace redis {
@@ -79,12 +83,12 @@ StatusOr<IndexUpdater::FieldValues>
IndexUpdater::Record(std::string_view key, c
auto s = db.Type(key, &type);
if (!s.ok()) return {Status::NotOK, s.ToString()};
- if (type != static_cast<RedisType>(on_data_type)) {
+ if (type != static_cast<RedisType>(metadata.on_data_type)) {
// not the expected type, stop record
- return {Status::NotOK, "this data type cannot be indexed"};
+ return {Status::TypeMismatched};
}
- auto retriever = GET_OR_RET(FieldValueRetriever::Create(on_data_type, key,
indexer->storage, ns));
+ auto retriever =
GET_OR_RET(FieldValueRetriever::Create(metadata.on_data_type, key,
indexer->storage, ns));
FieldValues values;
for (const auto &[field, info] : fields) {
@@ -99,6 +103,110 @@ StatusOr<IndexUpdater::FieldValues>
IndexUpdater::Record(std::string_view key, c
return values;
}
+Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view
key, std::string_view original,
+ std::string_view current, const std::string
&ns) {
+ if (original == current) {
+ // the value of this field is unchanged, no need to update
+ return Status::OK();
+ }
+
+ auto iter = fields.find(field);
+ if (iter == fields.end()) {
+ return {Status::NotOK, "No such field to do index updating"};
+ }
+
+ auto *metadata = iter->second.get();
+ auto *storage = indexer->storage;
+ auto ns_key = ComposeNamespaceKey(ns, name, storage->IsSlotIdEncoded());
+ if (auto tag = dynamic_cast<SearchTagFieldMetadata *>(metadata)) {
+ const char delim[] = {tag->separator, '\0'};
+ auto original_tags = util::Split(original, delim);
+ auto current_tags = util::Split(current, delim);
+
+ std::set<std::string> tags_to_delete(original_tags.begin(),
original_tags.end());
+ std::set<std::string> tags_to_add(current_tags.begin(),
current_tags.end());
+
+ for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
+ if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
+ it = tags_to_delete.erase(it);
+ tags_to_add.erase(jt);
+ } else {
+ ++it;
+ }
+ }
+
+ if (tags_to_add.empty() && tags_to_delete.empty()) {
+ // no change, skip index updating
+ return Status::OK();
+ }
+
+ auto batch = storage->GetWriteBatchBase();
+ auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+
+ for (const auto &tag : tags_to_delete) {
+ auto sub_key = ConstructTagFieldSubkey(field, tag, key);
+ auto index_key = InternalKey(ns_key, sub_key, this->metadata.version,
storage->IsSlotIdEncoded());
+
+ batch->Delete(cf_handle, index_key.Encode());
+ }
+
+ for (const auto &tag : tags_to_add) {
+ auto sub_key = ConstructTagFieldSubkey(field, tag, key);
+ auto index_key = InternalKey(ns_key, sub_key, this->metadata.version,
storage->IsSlotIdEncoded());
+
+ batch->Put(cf_handle, index_key.Encode(), Slice());
+ }
+
+ auto s = storage->Write(storage->DefaultWriteOptions(),
batch->GetWriteBatch());
+ if (!s.ok()) return {Status::NotOK, s.ToString()};
+ } else if (auto numeric [[maybe_unused]] =
dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
+ auto batch = storage->GetWriteBatchBase();
+ auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+
+ if (!original.empty()) {
+ auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(),
original.end())));
+ auto sub_key = ConstructNumericFieldSubkey(field, original_num, key);
+ auto index_key = InternalKey(ns_key, sub_key, this->metadata.version,
storage->IsSlotIdEncoded());
+
+ batch->Delete(cf_handle, index_key.Encode());
+ }
+
+ if (!current.empty()) {
+ auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(),
current.end())));
+ auto sub_key = ConstructNumericFieldSubkey(field, current_num, key);
+ auto index_key = InternalKey(ns_key, sub_key, this->metadata.version,
storage->IsSlotIdEncoded());
+
+ batch->Put(cf_handle, index_key.Encode(), Slice());
+ }
+
+ auto s = storage->Write(storage->DefaultWriteOptions(),
batch->GetWriteBatch());
+ if (!s.ok()) return {Status::NotOK, s.ToString()};
+ } else {
+ return {Status::NotOK, "Unexpected field type"};
+ }
+
+ return Status::OK();
+}
+
+Status IndexUpdater::Update(const FieldValues &original, std::string_view key,
const std::string &ns) {
+ auto current = GET_OR_RET(Record(key, ns));
+
+ for (const auto &[field, _] : fields) {
+ std::string_view original_val, current_val;
+
+ if (auto it = original.find(field); it != original.end()) {
+ original_val = it->second;
+ }
+ if (auto it = current.find(field); it != current.end()) {
+ current_val = it->second;
+ }
+
+ GET_OR_RET(UpdateIndex(field, key, original_val, current_val, ns));
+ }
+
+ return Status::OK();
+}
+
void GlobalIndexer::Add(IndexUpdater updater) {
auto &up = updaters.emplace_back(std::move(updater));
for (const auto &prefix : up.prefixes) {
@@ -106,13 +214,18 @@ void GlobalIndexer::Add(IndexUpdater updater) {
}
}
-StatusOr<IndexUpdater::FieldValues> GlobalIndexer::Record(std::string_view
key, const std::string &ns) {
+StatusOr<GlobalIndexer::RecordResult> GlobalIndexer::Record(std::string_view
key, const std::string &ns) {
auto iter = prefix_map.longest_prefix(key);
if (iter != prefix_map.end()) {
- return iter.value()->Record(key, ns);
+ auto updater = iter.value();
+ return std::make_pair(updater, GET_OR_RET(updater->Record(key, ns)));
}
return {Status::NoPrefixMatched};
}
+Status GlobalIndexer::Update(const RecordResult &original, std::string_view
key, const std::string &ns) {
+ return original.first->Update(original.second, key, ns);
+}
+
} // namespace redis
diff --git a/src/search/indexer.h b/src/search/indexer.h
index e153d555..001ade13 100644
--- a/src/search/indexer.h
+++ b/src/search/indexer.h
@@ -69,15 +69,22 @@ struct FieldValueRetriever {
struct IndexUpdater {
using FieldValues = std::map<std::string, std::string>;
- SearchOnDataType on_data_type;
+ std::string name;
+ SearchMetadata metadata;
std::vector<std::string> prefixes;
std::map<std::string, std::unique_ptr<SearchFieldMetadata>> fields;
GlobalIndexer *indexer = nullptr;
StatusOr<FieldValues> Record(std::string_view key, const std::string &ns);
+ Status UpdateIndex(const std::string &field, std::string_view key,
std::string_view original,
+ std::string_view current, const std::string &ns);
+ Status Update(const FieldValues &original, std::string_view key, const
std::string &ns);
};
struct GlobalIndexer {
+ using FieldValues = IndexUpdater::FieldValues;
+ using RecordResult = std::pair<IndexUpdater *, FieldValues>;
+
std::deque<IndexUpdater> updaters;
tsl::htrie_map<char, IndexUpdater *> prefix_map;
@@ -86,7 +93,8 @@ struct GlobalIndexer {
explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {}
void Add(IndexUpdater updater);
- StatusOr<IndexUpdater::FieldValues> Record(std::string_view key, const
std::string &ns);
+ StatusOr<RecordResult> Record(std::string_view key, const std::string &ns);
+ static Status Update(const RecordResult &original, std::string_view key,
const std::string &ns);
};
} // namespace redis
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 2acec050..1637a504 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -73,6 +73,8 @@ struct SearchFieldMetadata {
void DecodeFlag(uint8_t flag) { noindex = flag & 1; }
+ virtual ~SearchFieldMetadata() = default;
+
virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); }
virtual rocksdb::Status Decode(Slice *input) {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index a74e49d3..56cda4ae 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -231,8 +231,9 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options
&options) {
rocksdb::ColumnFamilyOptions cf_options(options);
auto res = util::DBOpen(options, config_->db_dir);
if (res) {
- std::vector<std::string> cf_names = {kMetadataColumnFamilyName,
kZSetScoreColumnFamilyName, kPubSubColumnFamilyName,
- kPropagateColumnFamilyName,
kStreamColumnFamilyName};
+ std::vector<std::string> cf_names = {kMetadataColumnFamilyName,
kZSetScoreColumnFamilyName,
+ kPubSubColumnFamilyName,
kPropagateColumnFamilyName,
+ kStreamColumnFamilyName,
kSearchColumnFamilyName};
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
if (!s.ok()) {
@@ -339,6 +340,7 @@ Status Storage::Open(DBOpenMode mode) {
column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts);
column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts);
column_families.emplace_back(kStreamColumnFamilyName, subkey_opts);
+ column_families.emplace_back(kSearchColumnFamilyName, subkey_opts);
std::vector<std::string> old_column_families;
auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir,
&old_column_families);
@@ -730,6 +732,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const
std::string &name) {
return cf_handles_[4];
} else if (name == kStreamColumnFamilyName) {
return cf_handles_[5];
+ } else if (name == kSearchColumnFamilyName) {
+ return cf_handles_[6];
}
return cf_handles_[0];
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 0e20425a..e36e77bd 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -57,6 +57,7 @@ enum ColumnFamilyID {
kColumnFamilyIDPubSub,
kColumnFamilyIDPropagate,
kColumnFamilyIDStream,
+ kColumnFamilyIDSearch,
};
enum DBOpenMode {
@@ -73,6 +74,7 @@ constexpr const char *kMetadataColumnFamilyName = "metadata";
constexpr const char *kSubkeyColumnFamilyName = "default";
constexpr const char *kPropagateColumnFamilyName = "propagate";
constexpr const char *kStreamColumnFamilyName = "stream";
+constexpr const char *kSearchColumnFamilyName = "search";
constexpr const char *kPropagateScriptCommand = "script";