mapleFU commented on code in PR #2262:
URL: https://github.com/apache/kvrocks/pull/2262#discussion_r1574543977


##########
src/storage/redis_db.cc:
##########
@@ -777,4 +782,208 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Database::lookupKeyByPattern(const std::string &pattern, const 
std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return "";
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
SortArgument &args, const RESP &version,
+                               std::vector<std::string> *output_vec, 
SortResult *res) {
+  /* When sorting a set with no sort specified, we must sort the output
+   * so the result is consistent across scripting and replication.
+   *
+   * The other types (list, sorted set) will retain their native order
+   * even if no sort order is requested, so they remain stable across
+   * scripting and replication.
+   *
+   * TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & 
CLIENT_SCRIPT)) */
+  if (args.dontsort && type == RedisType::kRedisSet && 
(!args.storekey.empty())) {
+    /* Force ALPHA sorting */
+    args.dontsort = false;
+    args.alpha = true;
+    args.sortby = "";
+  }
+
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = (int)metadata.size;
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(str_vec.begin() + offset, str_vec.begin() + 
offset + count);

Review Comment:
   why desc isn't checked here?



##########
src/commands/cmd_key.cc:
##########
@@ -424,6 +424,111 @@ class CommandCopy : public Commander {
   bool replace_ = false;
 };
 
+template <bool ReadOnly>
+class CommandSort : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 2);
+    while (parser.Good()) {
+      if (parser.EatEqICase("BY")) {
+        if (parser.Remains() < 1) {
+          return parser.InvalidSyntax();
+        }
+        sort_argument_.sortby = GET_OR_RET(parser.TakeStr());
+
+        if (sort_argument_.sortby.find('*') == std::string::npos) {
+          sort_argument_.dontsort = true;
+        } else {
+          /* TODO:
+           * If BY is specified with a real pattern, we can't accept it in 
cluster mode,
+           * unless we can make sure the keys formed by the pattern are in the 
same slot
+           * as the key to sort.
+           * If BY is specified with a real pattern, we can't accept
+           * it if no full ACL key access is applied for this command. */
+        }
+      } else if (parser.EatEqICase("LIMIT")) {
+        if (parser.Remains() < 2) {
+          return parser.InvalidSyntax();
+        }
+        sort_argument_.offset = GET_OR_RET(parser.template TakeInt<int>());
+        sort_argument_.count = GET_OR_RET(parser.template TakeInt<int>());
+      } else if (parser.EatEqICase("GET") && parser.Remains() >= 1) {
+        if (parser.Remains() < 1) {
+          return parser.InvalidSyntax();
+        }
+        /* TODO:
+         * If GET is specified with a real pattern, we can't accept it in 
cluster mode,
+         * unless we can make sure the keys formed by the pattern are in the 
same slot
+         * as the key to sort. */
+        sort_argument_.getpatterns.push_back(GET_OR_RET(parser.TakeStr()));
+      } else if (parser.EatEqICase("ASC")) {
+        sort_argument_.desc = false;
+      } else if (parser.EatEqICase("DESC")) {
+        sort_argument_.desc = true;
+      } else if (parser.EatEqICase("ALPHA")) {
+        sort_argument_.alpha = true;
+      } else if (parser.EatEqICase("STORE")) {
+        if constexpr (ReadOnly) {
+          return parser.InvalidSyntax();
+        }
+        if (parser.Remains() < 1) {
+          return parser.InvalidSyntax();
+        }
+        sort_argument_.storekey = GET_OR_RET(parser.TakeStr());
+      } else {
+        return parser.InvalidSyntax();
+      }
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Database redis(srv->storage, conn->GetNamespace());
+    RedisType type = kRedisNone;
+    auto s = redis.Type(args_[1], &type);
+    if (s.ok()) {
+      if (type >= RedisTypeNames.size()) {
+        return {Status::RedisExecErr, "Invalid type"};
+      } else if (type != RedisType::kRedisList && type != RedisType::kRedisSet 
&& type != RedisType::kRedisZSet) {

Review Comment:
   Would it enough for just check the type equal to List/Set/Zset?



##########
src/storage/redis_db.cc:
##########
@@ -777,4 +782,208 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Database::lookupKeyByPattern(const std::string &pattern, const 
std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return "";
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
SortArgument &args, const RESP &version,
+                               std::vector<std::string> *output_vec, 
SortResult *res) {
+  /* When sorting a set with no sort specified, we must sort the output
+   * so the result is consistent across scripting and replication.
+   *
+   * The other types (list, sorted set) will retain their native order
+   * even if no sort order is requested, so they remain stable across
+   * scripting and replication.
+   *
+   * TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & 
CLIENT_SCRIPT)) */
+  if (args.dontsort && type == RedisType::kRedisSet && 
(!args.storekey.empty())) {
+    /* Force ALPHA sorting */
+    args.dontsort = false;
+    args.alpha = true;
+    args.sortby = "";
+  }
+
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = (int)metadata.size;
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(str_vec.begin() + offset, str_vec.begin() + 
offset + count);
+      }
+    } else if (type == RedisType::kRedisZSet) {
+      auto zset_db = redis::ZSet(storage_, namespace_);
+      std::vector<MemberScore> member_scores;
+
+      if (args.dontsort) {
+        RangeRankSpec spec;
+        spec.start = offset;
+        spec.stop = offset + count - 1;
+        spec.reversed = args.desc;
+        zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(member_score.member);
+        }
+      } else {
+        zset_db.GetAllMemberScores(key, &member_scores);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(member_score.member);
+        }
+      }
+    } else {
+      *res = SortResult::UNKNOWN_TYPE;
+      return s;
+    }
+  }
+
+  std::vector<RedisSortObject> sort_vec(str_vec.size());
+  for (size_t i = 0; i < str_vec.size(); ++i) {
+    sort_vec[i].obj = str_vec[i];
+  }
+
+  // Sort by BY, ALPHA, ASC/DESC
+  if (!args.dontsort) {
+    for (size_t i = 0; i < sort_vec.size(); ++i) {
+      std::string byval;
+      if (!args.sortby.empty()) {
+        byval = lookupKeyByPattern(args.sortby, str_vec[i]);
+        if (byval.empty()) continue;
+      } else {
+        byval = str_vec[i];
+      }
+
+      if (args.alpha) {
+        if (!args.sortby.empty()) {
+          sort_vec[i].v = byval;
+        }
+      } else {
+        try {
+          sort_vec[i].v = std::stod(byval);

Review Comment:
   Seems that we already has parsing in util?



##########
src/storage/redis_db.cc:
##########
@@ -777,4 +782,208 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Database::lookupKeyByPattern(const std::string &pattern, const 
std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return "";
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
SortArgument &args, const RESP &version,
+                               std::vector<std::string> *output_vec, 
SortResult *res) {
+  /* When sorting a set with no sort specified, we must sort the output
+   * so the result is consistent across scripting and replication.
+   *
+   * The other types (list, sorted set) will retain their native order
+   * even if no sort order is requested, so they remain stable across
+   * scripting and replication.
+   *
+   * TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & 
CLIENT_SCRIPT)) */
+  if (args.dontsort && type == RedisType::kRedisSet && 
(!args.storekey.empty())) {
+    /* Force ALPHA sorting */
+    args.dontsort = false;
+    args.alpha = true;
+    args.sortby = "";
+  }
+
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = (int)metadata.size;

Review Comment:
   Besides, should we limit the element count here? @PragmaTwice . A huge 
vectorlen can easily causing OOM in our current impl?



##########
src/storage/redis_db.cc:
##########
@@ -777,4 +782,208 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Database::lookupKeyByPattern(const std::string &pattern, const 
std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return "";
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
SortArgument &args, const RESP &version,
+                               std::vector<std::string> *output_vec, 
SortResult *res) {
+  /* When sorting a set with no sort specified, we must sort the output
+   * so the result is consistent across scripting and replication.
+   *
+   * The other types (list, sorted set) will retain their native order
+   * even if no sort order is requested, so they remain stable across
+   * scripting and replication.
+   *
+   * TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & 
CLIENT_SCRIPT)) */
+  if (args.dontsort && type == RedisType::kRedisSet && 
(!args.storekey.empty())) {
+    /* Force ALPHA sorting */
+    args.dontsort = false;
+    args.alpha = true;
+    args.sortby = "";
+  }
+
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = (int)metadata.size;
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(str_vec.begin() + offset, str_vec.begin() + 
offset + count);
+      }
+    } else if (type == RedisType::kRedisZSet) {
+      auto zset_db = redis::ZSet(storage_, namespace_);
+      std::vector<MemberScore> member_scores;
+
+      if (args.dontsort) {
+        RangeRankSpec spec;
+        spec.start = offset;
+        spec.stop = offset + count - 1;
+        spec.reversed = args.desc;
+        zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(member_score.member);

Review Comment:
   nit: can use std::move to handling this



##########
src/commands/cmd_key.cc:
##########
@@ -424,6 +424,111 @@ class CommandCopy : public Commander {
   bool replace_ = false;
 };
 
+template <bool ReadOnly>
+class CommandSort : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 2);
+    while (parser.Good()) {
+      if (parser.EatEqICase("BY")) {
+        if (parser.Remains() < 1) {
+          return parser.InvalidSyntax();
+        }
+        sort_argument_.sortby = GET_OR_RET(parser.TakeStr());
+
+        if (sort_argument_.sortby.find('*') == std::string::npos) {
+          sort_argument_.dontsort = true;
+        } else {
+          /* TODO:
+           * If BY is specified with a real pattern, we can't accept it in 
cluster mode,
+           * unless we can make sure the keys formed by the pattern are in the 
same slot
+           * as the key to sort.
+           * If BY is specified with a real pattern, we can't accept
+           * it if no full ACL key access is applied for this command. */

Review Comment:
   Should we raise un-support first?



##########
src/storage/redis_db.cc:
##########
@@ -777,4 +782,208 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Database::lookupKeyByPattern(const std::string &pattern, const 
std::string &subst) {

Review Comment:
   Should better has a doc string here?



##########
src/storage/redis_db.cc:
##########
@@ -777,4 +782,208 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Database::lookupKeyByPattern(const std::string &pattern, const 
std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return "";
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type >= 
RedisTypeNames.size()) {
+      return "";
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
SortArgument &args, const RESP &version,
+                               std::vector<std::string> *output_vec, 
SortResult *res) {
+  /* When sorting a set with no sort specified, we must sort the output
+   * so the result is consistent across scripting and replication.
+   *
+   * The other types (list, sorted set) will retain their native order
+   * even if no sort order is requested, so they remain stable across
+   * scripting and replication.
+   *
+   * TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & 
CLIENT_SCRIPT)) */
+  if (args.dontsort && type == RedisType::kRedisSet && 
(!args.storekey.empty())) {
+    /* Force ALPHA sorting */
+    args.dontsort = false;
+    args.alpha = true;
+    args.sortby = "";
+  }
+
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = (int)metadata.size;
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(str_vec.begin() + offset, str_vec.begin() + 
offset + count);
+      }
+    } else if (type == RedisType::kRedisZSet) {
+      auto zset_db = redis::ZSet(storage_, namespace_);
+      std::vector<MemberScore> member_scores;
+
+      if (args.dontsort) {
+        RangeRankSpec spec;
+        spec.start = offset;
+        spec.stop = offset + count - 1;
+        spec.reversed = args.desc;
+        zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(member_score.member);
+        }
+      } else {
+        zset_db.GetAllMemberScores(key, &member_scores);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(member_score.member);

Review Comment:
   nit: can using move to handling this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to