This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 4059b43f Implement an unify key-value iterator for Kvrocks (#2004)
4059b43f is described below
commit 4059b43f38601e560fd0812c597759eecaf463be
Author: hulk <[email protected]>
AuthorDate: Fri Jan 12 18:16:51 2024 +0800
Implement an unify key-value iterator for Kvrocks (#2004)
Currently, we need to iterate all keys in the database in different places
like the cluster migration and kvrocks2redis, but don't have an iterator for
this purpose. It's very error-prone to implement this in different places since
Kvrocks may add a new column family in the future, and we must be careful to
iterate all keys in all column families. This would be a burden for
maintenance, So we want to implement an iterator for iterating keys.
```C++
DBIter iter(storage, read_option);
for (iter.Seek(); iter.Valid(); iter.Next()) {
if (iter.Type() == kRedisString || iter.Type() == kRedisJSON) {
// the string/json type didn't have subkeys
continue;
}
auto subkey_iter = iter.GetSubKeyIterator();
for (subkey_iter.Seek(); subkey_iter.Valid(); subkey_iter.Next()) {
// handle its subkey and value here
}
}
```
When using this iterator, it will iterate the metadata column family first
and check its type, if it's not a string or JSON, then it will iterate the
corresponding column family to get subkeys. That said, if we have a key foo
with type hash, then the iterator will iterate foo and foo:field1, foo:field2,
and so on.
This solution can bring those benefits:
- The codes look more intuitive
- Can reuse this iterator if we want to iterate keys only
This closes #1989
---
src/storage/iterator.cc | 166 +++++++++++++++++++
src/storage/iterator.h | 82 +++++++++
tests/cppunit/iterator_test.cc | 366 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 614 insertions(+)
diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc
new file mode 100644
index 00000000..12238cea
--- /dev/null
+++ b/src/storage/iterator.cc
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "iterator.h"
+
+#include <cluster/redis_slot.h>
+
+#include "db_util.h"
+
+namespace engine {
+DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options,
int slot)
+ : storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
+ metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
+ metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_,
metadata_cf_handle_));
+}
+
+void DBIterator::Next() {
+ if (!Valid()) return;
+
+ metadata_iter_->Next();
+ nextUntilValid();
+}
+
+void DBIterator::nextUntilValid() {
+ // slot_ != -1 means we would like to iterate all keys in the slot
+ // so we can skip the afterwards keys if the slot id doesn't match
+ if (slot_ != -1 && metadata_iter_->Valid()) {
+ auto [_, user_key] = ExtractNamespaceKey(metadata_iter_->key(),
storage_->IsSlotIdEncoded());
+ // Release the iterator if the slot id doesn't match
+ if (GetSlotIdFromKey(user_key.ToString()) != slot_) {
+ Reset();
+ return;
+ }
+ }
+
+ while (metadata_iter_->Valid()) {
+ Metadata metadata(kRedisNone, false);
+ // Skip the metadata if it's expired
+ if (metadata.Decode(metadata_iter_->value()).ok() && !metadata.Expired()) {
+ metadata_ = metadata;
+ break;
+ }
+ metadata_iter_->Next();
+ }
+}
+
+bool DBIterator::Valid() const { return metadata_iter_ &&
metadata_iter_->Valid(); }
+
+Slice DBIterator::Key() const { return Valid() ? metadata_iter_->key() :
Slice(); }
+
+std::tuple<Slice, Slice> DBIterator::UserKey() const {
+ if (!Valid()) {
+ return {};
+ }
+ return ExtractNamespaceKey(metadata_iter_->key(), slot_ != -1);
+}
+
+Slice DBIterator::Value() const { return Valid() ? metadata_iter_->value() :
Slice(); }
+
+RedisType DBIterator::Type() const { return Valid() ? metadata_.Type() :
kRedisNone; }
+
+void DBIterator::Reset() {
+ if (metadata_iter_) metadata_iter_.reset();
+}
+
+void DBIterator::Seek(const std::string& target) {
+ if (!metadata_iter_) return;
+
+ // Iterate with the slot id but storage didn't enable slot id encoding
+ if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
+ Reset();
+ return;
+ }
+ std::string prefix = target;
+ if (slot_ != -1) {
+ // Use the slot id as the prefix if it's specified
+ prefix = ComposeSlotKeyPrefix(kDefaultNamespace, slot_) + target;
+ }
+
+ metadata_iter_->Seek(prefix);
+ nextUntilValid();
+}
+
+std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
+ if (!Valid()) {
+ return nullptr;
+ }
+
+ // The string/json type doesn't have sub keys
+ RedisType type = metadata_.Type();
+ if (type == kRedisNone || type == kRedisString || type == kRedisJson) {
+ return nullptr;
+ }
+
+ auto prefix = InternalKey(Key(), "", metadata_.version,
storage_->IsSlotIdEncoded()).Encode();
+ return std::make_unique<SubKeyIterator>(storage_, read_options_, type,
std::move(prefix));
+}
+
+SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions
read_options, RedisType type, std::string prefix)
+ : storage_(storage), read_options_(std::move(read_options)), type_(type),
prefix_(std::move(prefix)) {
+ if (type_ == kRedisStream) {
+ cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
+ } else {
+ cf_handle_ = storage_->GetCFHandle(kSubkeyColumnFamilyName);
+ }
+ iter_ = util::UniqueIterator(storage_->NewIterator(read_options_,
cf_handle_));
+}
+
+void SubKeyIterator::Next() {
+ if (!Valid()) return;
+
+ iter_->Next();
+
+ if (!Valid()) return;
+
+ if (!iter_->key().starts_with(prefix_)) {
+ Reset();
+ }
+}
+
+bool SubKeyIterator::Valid() const { return iter_ && iter_->Valid(); }
+
+Slice SubKeyIterator::Key() const { return Valid() ? iter_->key() : Slice(); }
+
+Slice SubKeyIterator::UserKey() const {
+ if (!Valid()) return {};
+
+ const InternalKey internal_key(iter_->key(), storage_->IsSlotIdEncoded());
+ return internal_key.GetSubKey();
+}
+
+Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() :
Slice(); }
+
+void SubKeyIterator::Seek() {
+ if (!iter_) return;
+
+ iter_->Seek(prefix_);
+ if (!iter_->Valid()) return;
+ // For the subkey iterator, it MUST contain the prefix key itself
+ if (!iter_->key().starts_with(prefix_)) {
+ Reset();
+ }
+}
+
+void SubKeyIterator::Reset() {
+ if (iter_) iter_.reset();
+}
+
+} // namespace engine
diff --git a/src/storage/iterator.h b/src/storage/iterator.h
new file mode 100644
index 00000000..40b93bc3
--- /dev/null
+++ b/src/storage/iterator.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#pragma once
+
+#include <rocksdb/iterator.h>
+#include <rocksdb/options.h>
+
+#include "storage.h"
+
+namespace engine {
+
+class SubKeyIterator {
+ public:
+ explicit SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options,
RedisType type, std::string prefix);
+ ~SubKeyIterator() = default;
+ bool Valid() const;
+ void Seek();
+ void Next();
+ // return the raw key in rocksdb
+ Slice Key() const;
+ // return the user key without prefix
+ Slice UserKey() const;
+ Slice Value() const;
+ void Reset();
+
+ private:
+ Storage *storage_;
+ rocksdb::ReadOptions read_options_;
+ RedisType type_;
+ std::string prefix_;
+ std::unique_ptr<rocksdb::Iterator> iter_;
+ rocksdb::ColumnFamilyHandle *cf_handle_ = nullptr;
+};
+
+class DBIterator {
+ public:
+ explicit DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int
slot = -1);
+ ~DBIterator() = default;
+
+ bool Valid() const;
+ void Seek(const std::string &target = "");
+ void Next();
+ // return the raw key in rocksdb
+ Slice Key() const;
+ // return the namespace and user key without prefix
+ std::tuple<Slice, Slice> UserKey() const;
+ Slice Value() const;
+ RedisType Type() const;
+ void Reset();
+ std::unique_ptr<SubKeyIterator> GetSubKeyIterator() const;
+
+ private:
+ void nextUntilValid();
+
+ Storage *storage_;
+ rocksdb::ReadOptions read_options_;
+ int slot_ = -1;
+ Metadata metadata_ = Metadata(kRedisNone, false);
+
+ rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = nullptr;
+ std::unique_ptr<rocksdb::Iterator> metadata_iter_;
+ std::unique_ptr<SubKeyIterator> subkey_iter_;
+};
+
+} // namespace engine
diff --git a/tests/cppunit/iterator_test.cc b/tests/cppunit/iterator_test.cc
new file mode 100644
index 00000000..4bbd2408
--- /dev/null
+++ b/tests/cppunit/iterator_test.cc
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <cluster/redis_slot.h>
+#include <storage/iterator.h>
+#include <types/redis_bitmap.h>
+#include <types/redis_bloom_chain.h>
+#include <types/redis_json.h>
+#include <types/redis_list.h>
+#include <types/redis_set.h>
+#include <types/redis_sortedint.h>
+#include <types/redis_stream.h>
+#include <types/redis_zset.h>
+
+#include "test_base.h"
+#include "types/redis_string.h"
+
+class IteratorTest : public TestBase {
+ protected:
+ explicit IteratorTest() = default;
+ ~IteratorTest() override = default;
+
+ void SetUp() override {
+ { // string
+ redis::String string(storage_, "test_ns0");
+ string.Set("a", "1");
+ string.Set("b", "2");
+ string.Set("c", "3");
+ // Make sure the key "c" is expired
+ auto s = string.Expire("c", 1);
+ ASSERT_TRUE(s.ok());
+ string.Set("d", "4");
+ }
+
+ { // hash
+ uint64_t ret = 0;
+ redis::Hash hash(storage_, "test_ns1");
+ hash.MSet("hash-1", {{"f0", "v0"}, {"f1", "v1"}, {"f2", "v2"}, {"f3",
"v3"}}, false, &ret);
+ }
+
+ { // set
+ uint64_t ret = 0;
+ redis::Set set(storage_, "test_ns2");
+ set.Add("set-1", {"e0", "e1", "e2"}, &ret);
+ }
+
+ { // sorted set
+ uint64_t ret = 0;
+ redis::ZSet zset(storage_, "test_ns3");
+ auto mscores = std::vector<MemberScore>{{"z0", 0}, {"z1", 1}, {"z2", 2}};
+ zset.Add("zset-1", ZAddFlags(), &mscores, &ret);
+ }
+
+ { // list
+ uint64_t ret = 0;
+ redis::List list(storage_, "test_ns4");
+ list.Push("list-1", {"l0", "l1", "l2"}, false, &ret);
+ }
+
+ { // stream
+ redis::Stream stream(storage_, "test_ns5");
+ redis::StreamEntryID ret;
+ redis::StreamAddOptions options;
+ options.next_id_strategy =
std::make_unique<redis::AutoGeneratedEntryID>();
+ stream.Add("stream-1", options, {"x0"}, &ret);
+ stream.Add("stream-1", options, {"x1"}, &ret);
+ stream.Add("stream-1", options, {"x2"}, &ret);
+ // TODO(@git-hulk): add stream group after it's finished
+ }
+
+ { // bitmap
+ redis::Bitmap bitmap(storage_, "test_ns6");
+ bool ret = false;
+ bitmap.SetBit("bitmap-1", 0, true, &ret);
+ bitmap.SetBit("bitmap-1", 8 * 1024, true, &ret);
+ bitmap.SetBit("bitmap-1", 2 * 8 * 1024, true, &ret);
+ }
+
+ { // json
+ redis::Json json(storage_, "test_ns7");
+ json.Set("json-1", "$", "{\"a\": 1, \"b\": 2}");
+ json.Set("json-2", "$", "{\"a\": 1, \"b\": 2}");
+ json.Set("json-3", "$", "{\"a\": 1, \"b\": 2}");
+ json.Set("json-4", "$", "{\"a\": 1, \"b\": 2}");
+ auto s = json.Expire("json-4", 1);
+ ASSERT_TRUE(s.ok());
+ }
+
+ {
+ // sorted integer
+ redis::Sortedint sortedint(storage_, "test_ns8");
+ uint64_t ret = 0;
+ sortedint.Add("sortedint-1", {1, 2, 3}, &ret);
+ }
+ }
+};
+
+TEST_F(IteratorTest, AllKeys) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+ std::vector<std::string> live_keys = {"a", "b", "d",
"hash-1", "set-1", "zset-1", "list-1",
+ "stream-1", "bitmap-1", "json-1",
"json-2", "json-3", "sortedint-1"};
+ std::reverse(live_keys.begin(), live_keys.end());
+ for (iter.Seek(); iter.Valid(); iter.Next()) {
+ ASSERT_TRUE(!live_keys.empty());
+ auto [_, user_key] = iter.UserKey();
+ ASSERT_EQ(live_keys.back(), user_key.ToString());
+ live_keys.pop_back();
+ }
+ ASSERT_TRUE(live_keys.empty());
+}
+
+TEST_F(IteratorTest, BasicString) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+
+ std::vector<std::string> expected_keys = {"a", "b", "d"};
+ std::reverse(expected_keys.begin(), expected_keys.end());
+ auto prefix = ComposeNamespaceKey("test_ns0", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ if (expected_keys.empty()) {
+ FAIL() << "Unexpected key: " << iter.Key().ToString();
+ }
+ ASSERT_EQ(kRedisString, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns0", ns.ToString());
+ ASSERT_EQ(expected_keys.back(), key.ToString());
+ expected_keys.pop_back();
+ // Make sure there is no subkey iterator
+ ASSERT_TRUE(!iter.GetSubKeyIterator());
+ }
+ // Make sure all keys are iterated except the expired one: "c"
+ ASSERT_TRUE(expected_keys.empty());
+}
+
+TEST_F(IteratorTest, BasicHash) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+ auto prefix = ComposeNamespaceKey("test_ns1", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ ASSERT_EQ(kRedisHash, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns1", ns.ToString());
+
+ auto subkey_iter = iter.GetSubKeyIterator();
+ ASSERT_TRUE(subkey_iter);
+ std::vector<std::string> expected_keys = {"f0", "f1", "f2", "f3"};
+ std::reverse(expected_keys.begin(), expected_keys.end());
+ for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
+ if (expected_keys.empty()) {
+ FAIL() << "Unexpected key: " << subkey_iter->UserKey().ToString();
+ }
+ ASSERT_EQ(expected_keys.back(), subkey_iter->UserKey().ToString());
+ expected_keys.pop_back();
+ }
+ ASSERT_TRUE(expected_keys.empty());
+ }
+}
+
+TEST_F(IteratorTest, BasicSet) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+ auto prefix = ComposeNamespaceKey("test_ns2", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ ASSERT_EQ(kRedisSet, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns2", ns.ToString());
+
+ auto subkey_iter = iter.GetSubKeyIterator();
+ ASSERT_TRUE(subkey_iter);
+ std::vector<std::string> expected_keys = {"e0", "e1", "e2"};
+ std::reverse(expected_keys.begin(), expected_keys.end());
+ for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
+ if (expected_keys.empty()) {
+ FAIL() << "Unexpected key: " << subkey_iter->UserKey().ToString();
+ }
+ ASSERT_EQ(expected_keys.back(), subkey_iter->UserKey().ToString());
+ expected_keys.pop_back();
+ }
+ ASSERT_TRUE(expected_keys.empty());
+ }
+}
+
+TEST_F(IteratorTest, BasicZSet) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+ auto prefix = ComposeNamespaceKey("test_ns3", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ ASSERT_EQ(kRedisZSet, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns3", ns.ToString());
+
+ auto subkey_iter = iter.GetSubKeyIterator();
+ ASSERT_TRUE(subkey_iter);
+ std::vector<std::string> expected_members = {"z0", "z1", "z2"};
+ std::reverse(expected_members.begin(), expected_members.end());
+ for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
+ if (expected_members.empty()) {
+ FAIL() << "Unexpected key: " << subkey_iter->UserKey().ToString();
+ }
+ ASSERT_EQ(expected_members.back(), subkey_iter->UserKey().ToString());
+ expected_members.pop_back();
+ }
+ ASSERT_TRUE(expected_members.empty());
+ }
+}
+
+TEST_F(IteratorTest, BasicList) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+ auto prefix = ComposeNamespaceKey("test_ns4", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ ASSERT_EQ(kRedisList, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns4", ns.ToString());
+
+ auto subkey_iter = iter.GetSubKeyIterator();
+ ASSERT_TRUE(subkey_iter);
+ std::vector<std::string> expected_values = {"l0", "l1", "l2"};
+ std::reverse(expected_values.begin(), expected_values.end());
+ for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
+ if (expected_values.empty()) {
+ FAIL() << "Unexpected value: " << subkey_iter->Value().ToString();
+ }
+ ASSERT_EQ(expected_values.back(), subkey_iter->Value().ToString());
+ expected_values.pop_back();
+ }
+ ASSERT_TRUE(expected_values.empty());
+ }
+}
+
+TEST_F(IteratorTest, BasicStream) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+ auto prefix = ComposeNamespaceKey("test_ns5", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ ASSERT_EQ(kRedisStream, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns5", ns.ToString());
+
+ auto subkey_iter = iter.GetSubKeyIterator();
+ ASSERT_TRUE(subkey_iter);
+ std::vector<std::string> expected_values = {"x0", "x1", "x2"};
+ std::reverse(expected_values.begin(), expected_values.end());
+ for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
+ if (expected_values.empty()) {
+ FAIL() << "Unexpected value: " << subkey_iter->Value().ToString();
+ }
+ std::vector<std::string> elems;
+ auto s =
redis::DecodeRawStreamEntryValue(subkey_iter->Value().ToString(), &elems);
+ ASSERT_TRUE(s.IsOK() && !elems.empty());
+ ASSERT_EQ(expected_values.back(), elems[0]);
+ expected_values.pop_back();
+ }
+ ASSERT_TRUE(expected_values.empty());
+ }
+}
+
+TEST_F(IteratorTest, BasicBitmap) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+ auto prefix = ComposeNamespaceKey("test_ns6", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ ASSERT_EQ(kRedisBitmap, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns6", ns.ToString());
+
+ auto subkey_iter = iter.GetSubKeyIterator();
+ ASSERT_TRUE(subkey_iter);
+ std::vector<std::string> expected_values = {"\x1", "\x1", "\x1"};
+ for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
+ if (expected_values.empty()) {
+ FAIL() << "Unexpected value: " << subkey_iter->Value().ToString();
+ }
+ ASSERT_EQ(expected_values.back(), subkey_iter->Value().ToString());
+ expected_values.pop_back();
+ }
+ ASSERT_TRUE(expected_values.empty());
+ }
+}
+
+TEST_F(IteratorTest, BasicJSON) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+
+ std::vector<std::string> expected_keys = {"json-1", "json-2", "json-3"};
+ std::reverse(expected_keys.begin(), expected_keys.end());
+ auto prefix = ComposeNamespaceKey("test_ns7", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ if (expected_keys.empty()) {
+ FAIL() << "Unexpected key: " << iter.Key().ToString();
+ }
+ ASSERT_EQ(kRedisJson, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns7", ns.ToString());
+ ASSERT_EQ(expected_keys.back(), key.ToString());
+ expected_keys.pop_back();
+ // Make sure there is no subkey iterator
+ ASSERT_TRUE(!iter.GetSubKeyIterator());
+ }
+ // Make sure all keys are iterated except the expired one: "json-4"
+ ASSERT_TRUE(expected_keys.empty());
+}
+
+TEST_F(IteratorTest, BasicSortedInt) {
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions());
+
+ auto prefix = ComposeNamespaceKey("test_ns8", "",
storage_->IsSlotIdEncoded());
+ for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix);
iter.Next()) {
+ ASSERT_EQ(kRedisSortedint, iter.Type());
+ auto [ns, key] = iter.UserKey();
+ ASSERT_EQ("test_ns8", ns.ToString());
+
+ auto subkey_iter = iter.GetSubKeyIterator();
+ ASSERT_TRUE(subkey_iter);
+ std::vector<uint64_t> expected_keys = {1, 2, 3};
+ std::reverse(expected_keys.begin(), expected_keys.end());
+ for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
+ auto value = DecodeFixed64(subkey_iter->UserKey().data());
+ if (expected_keys.empty()) {
+ FAIL() << "Unexpected value: " << value;
+ }
+ ASSERT_EQ(expected_keys.back(), value);
+ expected_keys.pop_back();
+ }
+ }
+}
+
+class SlotIteratorTest : public TestBase {
+ protected:
+ explicit SlotIteratorTest() = default;
+ ~SlotIteratorTest() override = default;
+ void SetUp() override { storage_->GetConfig()->slot_id_encoded = true; }
+};
+
+TEST_F(SlotIteratorTest, LiveKeys) {
+ redis::String string(storage_, kDefaultNamespace);
+ std::vector<std::string> keys = {"{x}a", "{x}b", "{y}c", "{y}d", "{x}e"};
+ for (const auto &key : keys) {
+ string.Set(key, "1");
+ }
+
+ std::set<std::string> same_slot_keys;
+ auto slot_id = GetSlotIdFromKey(keys[0]);
+ for (const auto &key : keys) {
+ if (GetSlotIdFromKey(key) == slot_id) {
+ same_slot_keys.insert(key);
+ }
+ }
+ engine::DBIterator iter(storage_, rocksdb::ReadOptions(), slot_id);
+ int count = 0;
+ for (iter.Seek(); iter.Valid(); iter.Next()) {
+ auto [_, user_key] = iter.UserKey();
+ ASSERT_EQ(slot_id, GetSlotIdFromKey(user_key.ToString()));
+ count++;
+ }
+ ASSERT_EQ(count, same_slot_keys.size());
+}