This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 0d8f7381 Implement index updating for numeric and tag field (#2115)
0d8f7381 is described below

commit 0d8f7381a57791593d2d91c9a75421c489aadad4
Author: Twice <[email protected]>
AuthorDate: Tue Feb 27 08:52:01 2024 +0900

    Implement index updating for numeric and tag field (#2115)
---
 src/common/status.h          |  14 ++---
 src/common/string_util.cc    |  12 ++---
 src/common/string_util.h     |   8 +--
 src/search/indexer.cc        | 123 +++++++++++++++++++++++++++++++++++++++++--
 src/search/indexer.h         |  12 ++++-
 src/search/search_encoding.h |   2 +
 src/storage/storage.cc       |   8 ++-
 src/storage/storage.h        |   2 +
 8 files changed, 151 insertions(+), 30 deletions(-)

diff --git a/src/common/status.h b/src/common/status.h
index ade19f86..b425ea5b 100644
--- a/src/common/status.h
+++ b/src/common/status.h
@@ -37,38 +37,30 @@ class [[nodiscard]] Status {
   enum Code : unsigned char {
     NotOK = 1,
     NotFound,
+    NotSupported,
+    InvalidArgument,
 
     // DB
     DBOpenErr,
     DBBackupErr,
     DBGetWALErr,
-    DBBackupFileErr,
-
-    // Replication
-    DBMismatched,
 
     // Redis
     RedisUnknownCmd,
     RedisInvalidCmd,
     RedisParseErr,
     RedisExecErr,
-    RedisReplicationConflict,
 
     // Cluster
     ClusterDown,
     ClusterInvalidInfo,
 
-    // Slot
-    SlotImport,
-
-    // Network
-    NetSendErr,
-
     // Blocking
     BlockingCmd,
 
     // Search
     NoPrefixMatched,
+    TypeMismatched,
   };
 
   Status() : impl_{nullptr} {}
diff --git a/src/common/string_util.cc b/src/common/string_util.cc
index b3004a3a..ae41f918 100644
--- a/src/common/string_util.cc
+++ b/src/common/string_util.cc
@@ -47,7 +47,7 @@ bool EqualICase(std::string_view lhs, std::string_view rhs) {
                                                 [](char l, char r) { return 
std::tolower(l) == std::tolower(r); });
 }
 
-std::string Trim(std::string in, const std::string &chars) {
+std::string Trim(std::string in, std::string_view chars) {
   if (in.empty()) return in;
 
   in.erase(0, in.find_first_not_of(chars));
@@ -56,7 +56,7 @@ std::string Trim(std::string in, const std::string &chars) {
   return in;
 }
 
-std::vector<std::string> Split(const std::string &in, const std::string 
&delim) {
+std::vector<std::string> Split(std::string_view in, std::string_view delim) {
   std::vector<std::string> out;
 
   if (in.empty()) {
@@ -71,8 +71,8 @@ std::vector<std::string> Split(const std::string &in, const 
std::string &delim)
 
   size_t begin = 0, end = in.find_first_of(delim);
   do {
-    std::string elem = in.substr(begin, end - begin);
-    if (!elem.empty()) out.push_back(std::move(elem));
+    std::string_view elem = in.substr(begin, end - begin);
+    if (!elem.empty()) out.emplace_back(elem.begin(), elem.end());
     if (end == std::string::npos) break;
     begin = end + 1;
     end = in.find_first_of(delim, begin);
@@ -228,7 +228,7 @@ std::vector<std::string> RegexMatch(const std::string &str, 
const std::string &r
   return out;
 }
 
-std::string StringToHex(const std::string &input) {
+std::string StringToHex(std::string_view input) {
   static const char hex_digits[] = "0123456789ABCDEF";
   std::string output;
   output.reserve(input.length() * 2);
@@ -331,7 +331,7 @@ std::vector<std::string> TokenizeRedisProtocol(const 
std::string &value) {
 /* escape string where all the non-printable characters
  * (tested with isprint()) are turned into escapes in
  * the form "\n\r\a...." or "\x<hex-number>". */
-std::string EscapeString(const std::string &s) {
+std::string EscapeString(std::string_view s) {
   std::string str;
   str.reserve(s.size());
 
diff --git a/src/common/string_util.h b/src/common/string_util.h
index 2ebd7639..d23ebad7 100644
--- a/src/common/string_util.h
+++ b/src/common/string_util.h
@@ -28,15 +28,15 @@ std::string Float2String(double d);
 std::string ToLower(std::string in);
 bool EqualICase(std::string_view lhs, std::string_view rhs);
 std::string BytesToHuman(uint64_t n);
-std::string Trim(std::string in, const std::string &chars);
-std::vector<std::string> Split(const std::string &in, const std::string 
&delim);
+std::string Trim(std::string in, std::string_view chars);
+std::vector<std::string> Split(std::string_view in, std::string_view delim);
 std::vector<std::string> Split2KV(const std::string &in, const std::string 
&delim);
 bool HasPrefix(const std::string &str, const std::string &prefix);
 int StringMatch(const std::string &pattern, const std::string &in, int nocase);
 int StringMatchLen(const char *p, size_t plen, const char *s, size_t slen, int 
nocase);
 std::vector<std::string> RegexMatch(const std::string &str, const std::string 
&regex);
-std::string StringToHex(const std::string &input);
+std::string StringToHex(std::string_view input);
 std::vector<std::string> TokenizeRedisProtocol(const std::string &value);
-std::string EscapeString(const std::string &s);
+std::string EscapeString(std::string_view s);
 
 }  // namespace util
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index f608d3df..4576e280 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -22,7 +22,11 @@
 
 #include <variant>
 
+#include "parse_util.h"
+#include "search/search_encoding.h"
 #include "storage/redis_metadata.h"
+#include "storage/storage.h"
+#include "string_util.h"
 #include "types/redis_hash.h"
 
 namespace redis {
@@ -79,12 +83,12 @@ StatusOr<IndexUpdater::FieldValues> 
IndexUpdater::Record(std::string_view key, c
   auto s = db.Type(key, &type);
   if (!s.ok()) return {Status::NotOK, s.ToString()};
 
-  if (type != static_cast<RedisType>(on_data_type)) {
+  if (type != static_cast<RedisType>(metadata.on_data_type)) {
     // not the expected type, stop record
-    return {Status::NotOK, "this data type cannot be indexed"};
+    return {Status::TypeMismatched};
   }
 
-  auto retriever = GET_OR_RET(FieldValueRetriever::Create(on_data_type, key, 
indexer->storage, ns));
+  auto retriever = 
GET_OR_RET(FieldValueRetriever::Create(metadata.on_data_type, key, 
indexer->storage, ns));
 
   FieldValues values;
   for (const auto &[field, info] : fields) {
@@ -99,6 +103,110 @@ StatusOr<IndexUpdater::FieldValues> 
IndexUpdater::Record(std::string_view key, c
   return values;
 }
 
+Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view 
key, std::string_view original,
+                                 std::string_view current, const std::string 
&ns) {
+  if (original == current) {
+    // the value of this field is unchanged, no need to update
+    return Status::OK();
+  }
+
+  auto iter = fields.find(field);
+  if (iter == fields.end()) {
+    return {Status::NotOK, "No such field to do index updating"};
+  }
+
+  auto *metadata = iter->second.get();
+  auto *storage = indexer->storage;
+  auto ns_key = ComposeNamespaceKey(ns, name, storage->IsSlotIdEncoded());
+  if (auto tag = dynamic_cast<SearchTagFieldMetadata *>(metadata)) {
+    const char delim[] = {tag->separator, '\0'};
+    auto original_tags = util::Split(original, delim);
+    auto current_tags = util::Split(current, delim);
+
+    std::set<std::string> tags_to_delete(original_tags.begin(), 
original_tags.end());
+    std::set<std::string> tags_to_add(current_tags.begin(), 
current_tags.end());
+
+    for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
+      if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
+        it = tags_to_delete.erase(it);
+        tags_to_add.erase(jt);
+      } else {
+        ++it;
+      }
+    }
+
+    if (tags_to_add.empty() && tags_to_delete.empty()) {
+      // no change, skip index updating
+      return Status::OK();
+    }
+
+    auto batch = storage->GetWriteBatchBase();
+    auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+
+    for (const auto &tag : tags_to_delete) {
+      auto sub_key = ConstructTagFieldSubkey(field, tag, key);
+      auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, 
storage->IsSlotIdEncoded());
+
+      batch->Delete(cf_handle, index_key.Encode());
+    }
+
+    for (const auto &tag : tags_to_add) {
+      auto sub_key = ConstructTagFieldSubkey(field, tag, key);
+      auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, 
storage->IsSlotIdEncoded());
+
+      batch->Put(cf_handle, index_key.Encode(), Slice());
+    }
+
+    auto s = storage->Write(storage->DefaultWriteOptions(), 
batch->GetWriteBatch());
+    if (!s.ok()) return {Status::NotOK, s.ToString()};
+  } else if (auto numeric [[maybe_unused]] = 
dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
+    auto batch = storage->GetWriteBatchBase();
+    auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);
+
+    if (!original.empty()) {
+      auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), 
original.end())));
+      auto sub_key = ConstructNumericFieldSubkey(field, original_num, key);
+      auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, 
storage->IsSlotIdEncoded());
+
+      batch->Delete(cf_handle, index_key.Encode());
+    }
+
+    if (!current.empty()) {
+      auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), 
current.end())));
+      auto sub_key = ConstructNumericFieldSubkey(field, current_num, key);
+      auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, 
storage->IsSlotIdEncoded());
+
+      batch->Put(cf_handle, index_key.Encode(), Slice());
+    }
+
+    auto s = storage->Write(storage->DefaultWriteOptions(), 
batch->GetWriteBatch());
+    if (!s.ok()) return {Status::NotOK, s.ToString()};
+  } else {
+    return {Status::NotOK, "Unexpected field type"};
+  }
+
+  return Status::OK();
+}
+
+Status IndexUpdater::Update(const FieldValues &original, std::string_view key, 
const std::string &ns) {
+  auto current = GET_OR_RET(Record(key, ns));
+
+  for (const auto &[field, _] : fields) {
+    std::string_view original_val, current_val;
+
+    if (auto it = original.find(field); it != original.end()) {
+      original_val = it->second;
+    }
+    if (auto it = current.find(field); it != current.end()) {
+      current_val = it->second;
+    }
+
+    GET_OR_RET(UpdateIndex(field, key, original_val, current_val, ns));
+  }
+
+  return Status::OK();
+}
+
 void GlobalIndexer::Add(IndexUpdater updater) {
   auto &up = updaters.emplace_back(std::move(updater));
   for (const auto &prefix : up.prefixes) {
@@ -106,13 +214,18 @@ void GlobalIndexer::Add(IndexUpdater updater) {
   }
 }
 
-StatusOr<IndexUpdater::FieldValues> GlobalIndexer::Record(std::string_view 
key, const std::string &ns) {
+StatusOr<GlobalIndexer::RecordResult> GlobalIndexer::Record(std::string_view 
key, const std::string &ns) {
   auto iter = prefix_map.longest_prefix(key);
   if (iter != prefix_map.end()) {
-    return iter.value()->Record(key, ns);
+    auto updater = iter.value();
+    return std::make_pair(updater, GET_OR_RET(updater->Record(key, ns)));
   }
 
   return {Status::NoPrefixMatched};
 }
 
+Status GlobalIndexer::Update(const RecordResult &original, std::string_view 
key, const std::string &ns) {
+  return original.first->Update(original.second, key, ns);
+}
+
 }  // namespace redis
diff --git a/src/search/indexer.h b/src/search/indexer.h
index e153d555..001ade13 100644
--- a/src/search/indexer.h
+++ b/src/search/indexer.h
@@ -69,15 +69,22 @@ struct FieldValueRetriever {
 struct IndexUpdater {
   using FieldValues = std::map<std::string, std::string>;
 
-  SearchOnDataType on_data_type;
+  std::string name;
+  SearchMetadata metadata;
   std::vector<std::string> prefixes;
   std::map<std::string, std::unique_ptr<SearchFieldMetadata>> fields;
   GlobalIndexer *indexer = nullptr;
 
   StatusOr<FieldValues> Record(std::string_view key, const std::string &ns);
+  Status UpdateIndex(const std::string &field, std::string_view key, 
std::string_view original,
+                     std::string_view current, const std::string &ns);
+  Status Update(const FieldValues &original, std::string_view key, const 
std::string &ns);
 };
 
 struct GlobalIndexer {
+  using FieldValues = IndexUpdater::FieldValues;
+  using RecordResult = std::pair<IndexUpdater *, FieldValues>;
+
   std::deque<IndexUpdater> updaters;
   tsl::htrie_map<char, IndexUpdater *> prefix_map;
 
@@ -86,7 +93,8 @@ struct GlobalIndexer {
   explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {}
 
   void Add(IndexUpdater updater);
-  StatusOr<IndexUpdater::FieldValues> Record(std::string_view key, const 
std::string &ns);
+  StatusOr<RecordResult> Record(std::string_view key, const std::string &ns);
+  static Status Update(const RecordResult &original, std::string_view key, 
const std::string &ns);
 };
 
 }  // namespace redis
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 2acec050..1637a504 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -73,6 +73,8 @@ struct SearchFieldMetadata {
 
   void DecodeFlag(uint8_t flag) { noindex = flag & 1; }
 
+  virtual ~SearchFieldMetadata() = default;
+
   virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); }
 
   virtual rocksdb::Status Decode(Slice *input) {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index a74e49d3..56cda4ae 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -231,8 +231,9 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options 
&options) {
   rocksdb::ColumnFamilyOptions cf_options(options);
   auto res = util::DBOpen(options, config_->db_dir);
   if (res) {
-    std::vector<std::string> cf_names = {kMetadataColumnFamilyName, 
kZSetScoreColumnFamilyName, kPubSubColumnFamilyName,
-                                         kPropagateColumnFamilyName, 
kStreamColumnFamilyName};
+    std::vector<std::string> cf_names = {kMetadataColumnFamilyName, 
kZSetScoreColumnFamilyName,
+                                         kPubSubColumnFamilyName,   
kPropagateColumnFamilyName,
+                                         kStreamColumnFamilyName,   
kSearchColumnFamilyName};
     std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
     auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
     if (!s.ok()) {
@@ -339,6 +340,7 @@ Status Storage::Open(DBOpenMode mode) {
   column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts);
   column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts);
   column_families.emplace_back(kStreamColumnFamilyName, subkey_opts);
+  column_families.emplace_back(kSearchColumnFamilyName, subkey_opts);
 
   std::vector<std::string> old_column_families;
   auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, 
&old_column_families);
@@ -730,6 +732,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const 
std::string &name) {
     return cf_handles_[4];
   } else if (name == kStreamColumnFamilyName) {
     return cf_handles_[5];
+  } else if (name == kSearchColumnFamilyName) {
+    return cf_handles_[6];
   }
   return cf_handles_[0];
 }
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 0e20425a..e36e77bd 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -57,6 +57,7 @@ enum ColumnFamilyID {
   kColumnFamilyIDPubSub,
   kColumnFamilyIDPropagate,
   kColumnFamilyIDStream,
+  kColumnFamilyIDSearch,
 };
 
 enum DBOpenMode {
@@ -73,6 +74,7 @@ constexpr const char *kMetadataColumnFamilyName = "metadata";
 constexpr const char *kSubkeyColumnFamilyName = "default";
 constexpr const char *kPropagateColumnFamilyName = "propagate";
 constexpr const char *kStreamColumnFamilyName = "stream";
+constexpr const char *kSearchColumnFamilyName = "search";
 
 constexpr const char *kPropagateScriptCommand = "script";
 

Reply via email to