PragmaTwice commented on code in PR #2262:
URL: https://github.com/apache/kvrocks/pull/2262#discussion_r1579294171


##########
src/storage/redis_db.cc:
##########
@@ -777,4 +782,193 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Database::lookupKeyByPattern(const std::string &pattern, const 
std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return "";
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
const SortArgument &args,
+                               const RESP &version, std::vector<std::string> 
*elems, SortResult *res) {
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = static_cast<int>(metadata.size);
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(str_vec.begin() + offset, str_vec.begin() + 
offset + count);
+      }
+    } else if (type == RedisType::kRedisZSet) {
+      auto zset_db = redis::ZSet(storage_, namespace_);
+      std::vector<MemberScore> member_scores;
+
+      if (args.dontsort) {
+        RangeRankSpec spec;
+        spec.start = offset;
+        spec.stop = offset + count - 1;
+        spec.reversed = args.desc;
+        zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(std::move(member_score.member));
+        }
+      } else {
+        zset_db.GetAllMemberScores(key, &member_scores);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(std::move(member_score.member));
+        }
+      }
+    } else {
+      *res = SortResult::UNKNOWN_TYPE;
+      return s;
+    }
+  }
+
+  std::vector<RedisSortObject> sort_vec(str_vec.size());
+  for (size_t i = 0; i < str_vec.size(); ++i) {
+    sort_vec[i].obj = str_vec[i];
+  }
+
+  // Sort by BY, ALPHA, ASC/DESC
+  if (!args.dontsort) {
+    for (size_t i = 0; i < sort_vec.size(); ++i) {
+      std::string byval;
+      if (!args.sortby.empty()) {
+        byval = lookupKeyByPattern(args.sortby, str_vec[i]);
+        if (byval.empty()) continue;
+      } else {
+        byval = str_vec[i];
+      }
+
+      if (args.alpha) {
+        if (!args.sortby.empty()) {
+          sort_vec[i].v = byval;
+        }
+      } else {
+        auto double_byval = ParseFloat<double>(byval);
+        if (!double_byval) {
+          *res = SortResult::DOUBLE_CONVERT_ERROR;
+        } else {
+          sort_vec[i].v = *double_byval;
+        }
+      }
+    }
+
+    std::sort(sort_vec.begin(), sort_vec.end(), [args](const RedisSortObject 
&a, const RedisSortObject &b) {
+      return RedisSortObject::SortCompare(a, b, args);
+    });
+
+    // Gets the element specified by Limit
+    if (offset != 0 || count != vectorlen) {
+      sort_vec = std::vector(sort_vec.begin() + offset, sort_vec.begin() + 
offset + count);
+    }
+  }
+
+  // Perform storage
+  for (auto &elem : sort_vec) {
+    if (args.getpatterns.empty()) {
+      elems->emplace_back(elem.obj);
+    }
+    for (const std::string &pattern : args.getpatterns) {
+      std::string val = lookupKeyByPattern(pattern, elem.obj);
+      if (val.empty()) {
+        elems->emplace_back(redis::NilString(version));

Review Comment:
   And then, this function will be not related to RESP version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to