This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 201ba83d Implement the unify WAL iterator (#2040)
201ba83d is described below

commit 201ba83dca6c5ba5bd0e2323578f9be4eb7f5979
Author: Myth <[email protected]>
AuthorDate: Fri Jan 26 10:04:56 2024 +0800

    Implement the unify WAL iterator (#2040)
    
    Implement the unified WAL iterator for kvrocks, similar to the DBIterator.
    It will wrap rocksdb WAL Iterator and can return different item types.
    It is possible to implement their processing logic depending on the 
concrete type.
---
 src/storage/iterator.cc        | 129 +++++++++-
 src/storage/iterator.h         |  82 +++++++
 src/storage/redis_metadata.cc  |  11 +
 src/storage/redis_metadata.h   |   1 +
 tests/cppunit/iterator_test.cc | 519 +++++++++++++++++++++++++++++++++++++++--
 5 files changed, 725 insertions(+), 17 deletions(-)

diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc
index 3717afb0..6514207b 100644
--- a/src/storage/iterator.cc
+++ b/src/storage/iterator.cc
@@ -25,7 +25,7 @@
 #include "db_util.h"
 
 namespace engine {
-DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, 
int slot)
+DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, 
int slot)
     : storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
   metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
   metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, 
metadata_cf_handle_));
@@ -80,7 +80,7 @@ void DBIterator::Reset() {
   if (metadata_iter_) metadata_iter_.reset();
 }
 
-void DBIterator::Seek(const std::string& target) {
+void DBIterator::Seek(const std::string &target) {
   if (!metadata_iter_) return;
 
   // Iterate with the slot id but storage didn't enable slot id encoding
@@ -112,7 +112,7 @@ std::unique_ptr<SubKeyIterator> 
DBIterator::GetSubKeyIterator() const {
   return std::make_unique<SubKeyIterator>(storage_, read_options_, type, 
std::move(prefix));
 }
 
-SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions 
read_options, RedisType type, std::string prefix)
+SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions 
read_options, RedisType type, std::string prefix)
     : storage_(storage), read_options_(std::move(read_options)), type_(type), 
prefix_(std::move(prefix)) {
   if (type_ == kRedisStream) {
     cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
@@ -145,7 +145,7 @@ Slice SubKeyIterator::UserKey() const {
   return internal_key.GetSubKey();
 }
 
-rocksdb::ColumnFamilyHandle* SubKeyIterator::ColumnFamilyHandle() const { 
return Valid() ? this->cf_handle_ : nullptr; }
+rocksdb::ColumnFamilyHandle *SubKeyIterator::ColumnFamilyHandle() const { 
return Valid() ? this->cf_handle_ : nullptr; }
 
 Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : 
Slice(); }
 
@@ -164,4 +164,125 @@ void SubKeyIterator::Reset() {
   if (iter_) iter_.reset();
 }
 
+rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const 
Slice &key, const Slice &value) {
+  if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
+    return rocksdb::Status::OK();
+  }
+  items_.emplace_back(WALItem::Type::kTypePut, column_family_id, 
key.ToString(), value.ToString());
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const 
rocksdb::Slice &key) {
+  if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
+    return rocksdb::Status::OK();
+  }
+  items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id, 
key.ToString(), std::string{});
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status WALBatchExtractor::DeleteRangeCF(uint32_t column_family_id, 
const rocksdb::Slice &begin_key,
+                                                 const rocksdb::Slice 
&end_key) {
+  items_.emplace_back(WALItem::Type::kTypeDeleteRange, column_family_id, 
begin_key.ToString(), end_key.ToString());
+  return rocksdb::Status::OK();
+}
+
+void WALBatchExtractor::LogData(const rocksdb::Slice &blob) {
+  items_.emplace_back(WALItem::Type::kTypeLogData, 0, blob.ToString(), 
std::string{});
+};
+
+void WALBatchExtractor::Clear() { items_.clear(); }
+
+WALBatchExtractor::Iter WALBatchExtractor::GetIter() { return Iter(&items_); }
+
+bool WALBatchExtractor::Iter::Valid() { return items_ && cur_ < 
items_->size(); }
+
+void WALBatchExtractor::Iter::Next() { cur_++; }
+
+WALItem WALBatchExtractor::Iter::Value() {
+  if (!Valid()) {
+    return {};
+  }
+  return (*items_)[cur_];
+}
+
+void WALIterator::Reset() {
+  if (iter_) {
+    iter_.reset();
+  }
+  if (batch_iter_) {
+    batch_iter_.reset();
+  }
+  extractor_.Clear();
+  next_batch_seq_ = 0;
+}
+
+bool WALIterator::Valid() const { return (batch_iter_ && batch_iter_->Valid()) 
|| (iter_ && iter_->Valid()); }
+
+void WALIterator::nextBatch() {
+  if (!iter_ || !iter_->Valid()) {
+    Reset();
+    return;
+  }
+
+  auto batch = iter_->GetBatch();
+  if (batch.sequence != next_batch_seq_ || !batch.writeBatchPtr) {
+    Reset();
+    return;
+  }
+
+  extractor_.Clear();
+
+  auto s = batch.writeBatchPtr->Iterate(&extractor_);
+  if (!s.ok()) {
+    Reset();
+    return;
+  }
+
+  next_batch_seq_ += batch.writeBatchPtr->Count();
+  batch_iter_ = 
std::make_unique<WALBatchExtractor::Iter>(extractor_.GetIter());
+}
+
+void WALIterator::Seek(rocksdb::SequenceNumber seq) {
+  if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
+    Reset();
+    return;
+  }
+
+  auto s = storage_->GetWALIter(seq, &iter_);
+  if (!s.IsOK()) {
+    Reset();
+    return;
+  }
+
+  next_batch_seq_ = seq;
+
+  nextBatch();
+}
+
+WALItem WALIterator::Item() {
+  if (batch_iter_ && batch_iter_->Valid()) {
+    return batch_iter_->Value();
+  }
+  return {};
+}
+
+rocksdb::SequenceNumber WALIterator::NextSequenceNumber() const { return 
next_batch_seq_; }
+
+void WALIterator::Next() {
+  if (!Valid()) {
+    Reset();
+    return;
+  }
+
+  if (batch_iter_ && batch_iter_->Valid()) {
+    batch_iter_->Next();
+    if (batch_iter_->Valid()) {
+      return;
+    }
+  }
+
+  iter_->Next();
+  nextBatch();
+}
+
 }  // namespace engine
diff --git a/src/storage/iterator.h b/src/storage/iterator.h
index c257df6f..2f123630 100644
--- a/src/storage/iterator.h
+++ b/src/storage/iterator.h
@@ -80,4 +80,86 @@ class DBIterator {
   std::unique_ptr<SubKeyIterator> subkey_iter_;
 };
 
+struct WALItem {
+  enum class Type : uint8_t {
+    kTypeInvalid = 0,
+    kTypeLogData = 1,
+    kTypePut = 2,
+    kTypeDelete = 3,
+    kTypeDeleteRange = 4,
+  };
+
+  WALItem() = default;
+  WALItem(WALItem::Type t, uint32_t cf_id, std::string k, std::string v)
+      : type(t), column_family_id(cf_id), key(std::move(k)), 
value(std::move(v)) {}
+
+  WALItem::Type type = WALItem::Type::kTypeInvalid;
+  uint32_t column_family_id = 0;
+  std::string key;
+  std::string value;
+};
+
+class WALBatchExtractor : public rocksdb::WriteBatch::Handler {
+ public:
+  // If set slot, storage must enable slot id encoding
+  explicit WALBatchExtractor(int slot = -1) : slot_(slot) {}
+
+  rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const 
Slice &value) override;
+
+  rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice 
&key) override;
+
+  rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const 
rocksdb::Slice &begin_key,
+                                const rocksdb::Slice &end_key) override;
+
+  void LogData(const rocksdb::Slice &blob) override;
+
+  void Clear();
+
+  class Iter {
+    friend class WALBatchExtractor;
+
+   public:
+    bool Valid();
+    void Next();
+    WALItem Value();
+
+   private:
+    explicit Iter(std::vector<WALItem> *items) : items_(items), cur_(0) {}
+    std::vector<WALItem> *items_;
+    size_t cur_;
+  };
+
+  WALBatchExtractor::Iter GetIter();
+
+ private:
+  std::vector<WALItem> items_;
+  int slot_;
+};
+
+class WALIterator {
+ public:
+  explicit WALIterator(engine::Storage *storage, int slot = -1)
+      : storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){};
+  ~WALIterator() = default;
+
+  bool Valid() const;
+  void Seek(rocksdb::SequenceNumber seq);
+  void Next();
+  WALItem Item();
+
+  rocksdb::SequenceNumber NextSequenceNumber() const;
+  void Reset();
+
+ private:
+  void nextBatch();
+
+  engine::Storage *storage_;
+  int slot_;
+
+  std::unique_ptr<rocksdb::TransactionLogIterator> iter_;
+  WALBatchExtractor extractor_;
+  std::unique_ptr<WALBatchExtractor::Iter> batch_iter_;
+  rocksdb::SequenceNumber next_batch_seq_;
+};
+
 }  // namespace engine
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 458ab4e8..1bca93d7 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -101,6 +101,17 @@ bool InternalKey::operator==(const InternalKey &that) 
const {
   return version_ == that.version_;
 }
 
+// Must slot encoded
+uint16_t ExtractSlotId(Slice ns_key) {
+  uint8_t namespace_size = 0;
+  GetFixed8(&ns_key, &namespace_size);
+  ns_key.remove_prefix(namespace_size);
+
+  uint16_t slot_id = HASH_SLOTS_SIZE;
+  GetFixed16(&ns_key, &slot_id);
+  return slot_id;
+}
+
 template <typename T>
 std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded) {
   uint8_t namespace_size = 0;
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 8fe52f39..ce102644 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -105,6 +105,7 @@ struct KeyNumStats {
   uint64_t avg_ttl = 0;
 };
 
+[[nodiscard]] uint16_t ExtractSlotId(Slice ns_key);
 template <typename T = Slice>
 [[nodiscard]] std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool 
slot_id_encoded);
 [[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice 
&key, bool slot_id_encoded);
diff --git a/tests/cppunit/iterator_test.cc b/tests/cppunit/iterator_test.cc
index 4bbd2408..6a2437a1 100644
--- a/tests/cppunit/iterator_test.cc
+++ b/tests/cppunit/iterator_test.cc
@@ -19,6 +19,7 @@
  */
 
 #include <cluster/redis_slot.h>
+#include <fmt/format.h>
 #include <storage/iterator.h>
 #include <types/redis_bitmap.h>
 #include <types/redis_bloom_chain.h>
@@ -32,10 +33,10 @@
 #include "test_base.h"
 #include "types/redis_string.h"
 
-class IteratorTest : public TestBase {
+class DBIteratorTest : public TestBase {
  protected:
-  explicit IteratorTest() = default;
-  ~IteratorTest() override = default;
+  explicit DBIteratorTest() = default;
+  ~DBIteratorTest() override = default;
 
   void SetUp() override {
     {  // string
@@ -112,7 +113,7 @@ class IteratorTest : public TestBase {
   }
 };
 
-TEST_F(IteratorTest, AllKeys) {
+TEST_F(DBIteratorTest, AllKeys) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
   std::vector<std::string> live_keys = {"a",        "b",        "d",      
"hash-1", "set-1",  "zset-1",     "list-1",
                                         "stream-1", "bitmap-1", "json-1", 
"json-2", "json-3", "sortedint-1"};
@@ -126,7 +127,7 @@ TEST_F(IteratorTest, AllKeys) {
   ASSERT_TRUE(live_keys.empty());
 }
 
-TEST_F(IteratorTest, BasicString) {
+TEST_F(DBIteratorTest, BasicString) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
 
   std::vector<std::string> expected_keys = {"a", "b", "d"};
@@ -148,7 +149,7 @@ TEST_F(IteratorTest, BasicString) {
   ASSERT_TRUE(expected_keys.empty());
 }
 
-TEST_F(IteratorTest, BasicHash) {
+TEST_F(DBIteratorTest, BasicHash) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
   auto prefix = ComposeNamespaceKey("test_ns1", "", 
storage_->IsSlotIdEncoded());
   for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); 
iter.Next()) {
@@ -171,7 +172,7 @@ TEST_F(IteratorTest, BasicHash) {
   }
 }
 
-TEST_F(IteratorTest, BasicSet) {
+TEST_F(DBIteratorTest, BasicSet) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
   auto prefix = ComposeNamespaceKey("test_ns2", "", 
storage_->IsSlotIdEncoded());
   for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); 
iter.Next()) {
@@ -194,7 +195,7 @@ TEST_F(IteratorTest, BasicSet) {
   }
 }
 
-TEST_F(IteratorTest, BasicZSet) {
+TEST_F(DBIteratorTest, BasicZSet) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
   auto prefix = ComposeNamespaceKey("test_ns3", "", 
storage_->IsSlotIdEncoded());
   for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); 
iter.Next()) {
@@ -217,7 +218,7 @@ TEST_F(IteratorTest, BasicZSet) {
   }
 }
 
-TEST_F(IteratorTest, BasicList) {
+TEST_F(DBIteratorTest, BasicList) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
   auto prefix = ComposeNamespaceKey("test_ns4", "", 
storage_->IsSlotIdEncoded());
   for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); 
iter.Next()) {
@@ -240,7 +241,7 @@ TEST_F(IteratorTest, BasicList) {
   }
 }
 
-TEST_F(IteratorTest, BasicStream) {
+TEST_F(DBIteratorTest, BasicStream) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
   auto prefix = ComposeNamespaceKey("test_ns5", "", 
storage_->IsSlotIdEncoded());
   for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); 
iter.Next()) {
@@ -266,7 +267,7 @@ TEST_F(IteratorTest, BasicStream) {
   }
 }
 
-TEST_F(IteratorTest, BasicBitmap) {
+TEST_F(DBIteratorTest, BasicBitmap) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
   auto prefix = ComposeNamespaceKey("test_ns6", "", 
storage_->IsSlotIdEncoded());
   for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); 
iter.Next()) {
@@ -288,7 +289,7 @@ TEST_F(IteratorTest, BasicBitmap) {
   }
 }
 
-TEST_F(IteratorTest, BasicJSON) {
+TEST_F(DBIteratorTest, BasicJSON) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
 
   std::vector<std::string> expected_keys = {"json-1", "json-2", "json-3"};
@@ -310,7 +311,7 @@ TEST_F(IteratorTest, BasicJSON) {
   ASSERT_TRUE(expected_keys.empty());
 }
 
-TEST_F(IteratorTest, BasicSortedInt) {
+TEST_F(DBIteratorTest, BasicSortedInt) {
   engine::DBIterator iter(storage_, rocksdb::ReadOptions());
 
   auto prefix = ComposeNamespaceKey("test_ns8", "", 
storage_->IsSlotIdEncoded());
@@ -343,6 +344,7 @@ class SlotIteratorTest : public TestBase {
 
 TEST_F(SlotIteratorTest, LiveKeys) {
   redis::String string(storage_, kDefaultNamespace);
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
   std::vector<std::string> keys = {"{x}a", "{x}b", "{y}c", "{y}d", "{x}e"};
   for (const auto &key : keys) {
     string.Set(key, "1");
@@ -363,4 +365,495 @@ TEST_F(SlotIteratorTest, LiveKeys) {
     count++;
   }
   ASSERT_EQ(count, same_slot_keys.size());
+
+  engine::WALIterator wal_iter(storage_, slot_id);
+  count = 0;
+  for (wal_iter.Seek(start_seq + 1); wal_iter.Valid(); wal_iter.Next()) {
+    auto item = wal_iter.Item();
+    if (item.type == engine::WALItem::Type::kTypePut) {
+      auto [_, user_key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+      ASSERT_EQ(slot_id, GetSlotIdFromKey(user_key.ToString())) << 
user_key.ToString();
+      count++;
+    }
+  }
+  ASSERT_EQ(count, same_slot_keys.size());
+}
+
+class WALIteratorTest : public TestBase {
+ protected:
+  explicit WALIteratorTest() = default;
+  ~WALIteratorTest() override = default;
+  void SetUp() override {}
+};
+
+TEST_F(WALIteratorTest, BasicString) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  redis::String string(storage_, "test_ns0");
+  string.Set("a", "1");
+  string.MSet({{"b", "2"}, {"c", "3"}});
+  ASSERT_TRUE(string.Del("b").ok());
+
+  std::vector<std::string> put_keys, delete_keys;
+  auto expected_put_keys = {"a", "b", "c"};
+  auto expected_delete_keys = {"b"};
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+        ASSERT_EQ(ns.ToString(), "test_ns0");
+        put_keys.emplace_back(key.ToString());
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisString);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+        ASSERT_EQ(ns.ToString(), "test_ns0");
+        delete_keys.emplace_back(key.ToString());
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+  ASSERT_EQ(expected_put_keys.size(), put_keys.size());
+  ASSERT_EQ(expected_delete_keys.size(), delete_keys.size());
+  ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), 
put_keys.begin()));
+  ASSERT_TRUE(std::equal(expected_delete_keys.begin(), 
expected_delete_keys.end(), delete_keys.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicHash) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  redis::Hash hash(storage_, "test_ns1");
+  uint64_t ret = 0;
+  hash.MSet("hash-1", {{"f0", "v0"}, {"f1", "v1"}, {"f2", "v2"}, {"f3", 
"v3"}}, false, &ret);
+  uint64_t deleted_cnt = 0;
+  hash.Delete("hash-1", {"f0"}, &deleted_cnt);
+
+  // Delete will put meta key again
+  auto expected_put_keys = {"hash-1", "hash-1"};
+  // Sub key will be putted in reverse order
+  auto expected_put_fields = {"f3", "f2", "f1", "f0"};
+  auto expected_delete_fields = {"f0"};
+  std::vector<std::string> put_keys, put_fields, delete_fields;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id == kColumnFamilyIDDefault) {
+          InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
+          put_fields.emplace_back(internal_key.GetSubKey().ToString());
+        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+          auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+          ASSERT_EQ(ns.ToString(), "test_ns1");
+          put_keys.emplace_back(key.ToString());
+        }
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisHash);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
+        delete_fields.emplace_back(internal_key.GetSubKey().ToString());
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+  ASSERT_EQ(expected_put_keys.size(), put_keys.size());
+  ASSERT_EQ(expected_put_fields.size(), put_fields.size());
+  ASSERT_EQ(expected_delete_fields.size(), delete_fields.size());
+  ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), 
put_keys.begin()));
+  ASSERT_TRUE(std::equal(expected_put_fields.begin(), 
expected_put_fields.end(), put_fields.begin()));
+  ASSERT_TRUE(std::equal(expected_delete_fields.begin(), 
expected_delete_fields.end(), delete_fields.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicSet) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+
+  uint64_t ret = 0;
+  redis::Set set(storage_, "test_ns2");
+  set.Add("set-1", {"e0", "e1", "e2"}, &ret);
+  uint64_t removed_cnt = 0;
+  set.Remove("set-1", {"e0", "e1"}, &removed_cnt);
+
+  auto expected_put_keys = {"set-1", "set-1"};
+  auto expected_put_members = {"e0", "e1", "e2"};
+  auto expected_delete_members = {"e0", "e1"};
+  std::vector<std::string> put_keys, put_members, delete_members;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id == kColumnFamilyIDDefault) {
+          InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
+          put_members.emplace_back(internal_key.GetSubKey().ToString());
+        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+          auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+          ASSERT_EQ(ns.ToString(), "test_ns2");
+          put_keys.emplace_back(key.ToString());
+        }
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisSet);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
+        delete_members.emplace_back(internal_key.GetSubKey().ToString());
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+
+  ASSERT_EQ(expected_put_keys.size(), put_keys.size());
+  ASSERT_EQ(expected_put_members.size(), put_members.size());
+  ASSERT_EQ(expected_delete_members.size(), delete_members.size());
+  ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), 
put_keys.begin()));
+  ASSERT_TRUE(std::equal(expected_put_members.begin(), 
expected_put_members.end(), put_members.begin()));
+  ASSERT_TRUE(std::equal(expected_delete_members.begin(), 
expected_delete_members.end(), delete_members.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicZSet) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  uint64_t ret = 0;
+  redis::ZSet zset(storage_, "test_ns3");
+  auto mscores = std::vector<MemberScore>{{"z0", 0}, {"z1", 1}, {"z2", 2}};
+  zset.Add("zset-1", ZAddFlags(), &mscores, &ret);
+  uint64_t removed_cnt = 0;
+  zset.Remove("zset-1", {"z0"}, &removed_cnt);
+
+  auto expected_put_keys = {"zset-1", "zset-1"};
+  auto expected_put_members = {"z2", "z1", "z0"};
+  // member and score
+  int expected_delete_count = 2, delete_count = 0;
+  std::vector<std::string> put_keys, put_members;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id == kColumnFamilyIDDefault) {
+          InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
+          put_members.emplace_back(internal_key.GetSubKey().ToString());
+        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+          auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+          ASSERT_EQ(ns.ToString(), "test_ns3");
+          put_keys.emplace_back(key.ToString());
+        }
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisZSet);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        delete_count++;
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+
+  ASSERT_EQ(expected_put_keys.size(), put_keys.size());
+  ASSERT_EQ(expected_put_members.size(), put_members.size());
+  ASSERT_EQ(expected_delete_count, delete_count);
+  ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), 
put_keys.begin()));
+  ASSERT_TRUE(std::equal(expected_put_members.begin(), 
expected_put_members.end(), put_members.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicList) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  uint64_t ret = 0;
+  redis::List list(storage_, "test_ns4");
+  list.Push("list-1", {"l0", "l1", "l2", "l3", "l4"}, false, &ret);
+  ASSERT_TRUE(list.Trim("list-1", 2, 4).ok());
+
+  auto expected_put_keys = {"list-1", "list-1"};
+  auto expected_put_values = {"l0", "l1", "l2", "l3", "l4"};
+  auto expected_delete_count = 2, delete_count = 0;
+  std::vector<std::string> put_keys, put_values;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id == kColumnFamilyIDDefault) {
+          put_values.emplace_back(item.value);
+        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+          auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+          ASSERT_EQ(ns.ToString(), "test_ns4");
+          put_keys.emplace_back(key.ToString());
+        }
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisList);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        delete_count++;
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+
+  ASSERT_EQ(expected_put_keys.size(), put_keys.size());
+  ASSERT_EQ(expected_put_values.size(), put_values.size());
+  ASSERT_EQ(expected_delete_count, delete_count);
+  ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), 
put_keys.begin()));
+  ASSERT_TRUE(std::equal(expected_put_values.begin(), 
expected_put_values.end(), put_values.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicStream) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  redis::Stream stream(storage_, "test_ns5");
+  redis::StreamEntryID ret;
+  redis::StreamAddOptions options;
+  options.next_id_strategy = std::make_unique<redis::AutoGeneratedEntryID>();
+  stream.Add("stream-1", options, {"x0"}, &ret);
+  stream.Add("stream-1", options, {"x1"}, &ret);
+  stream.Add("stream-1", options, {"x2"}, &ret);
+  uint64_t deleted = 0;
+  ASSERT_TRUE(stream.DeleteEntries("stream-1", {ret}, &deleted).ok());
+
+  auto expected_put_keys = {"stream-1", "stream-1", "stream-1", "stream-1"};
+  auto expected_put_values = {"x0", "x1", "x2"};
+  int delete_count = 0;
+  std::vector<std::string> put_keys, put_values;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id == kColumnFamilyIDStream) {
+          std::vector<std::string> elems;
+          auto s = redis::DecodeRawStreamEntryValue(item.value, &elems);
+          ASSERT_TRUE(s.IsOK() && !elems.empty());
+          put_values.emplace_back(elems[0]);
+        } else if (item.column_family_id == kColumnFamilyIDMetadata) {
+          auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+          ASSERT_EQ(ns.ToString(), "test_ns5");
+          put_keys.emplace_back(key.ToString());
+        }
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisStream);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        delete_count++;
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+
+  ASSERT_EQ(expected_put_keys.size(), put_keys.size());
+  ASSERT_EQ(expected_put_values.size(), put_values.size());
+  ASSERT_EQ(deleted, delete_count);
+  ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), 
put_keys.begin()));
+  ASSERT_TRUE(std::equal(expected_put_values.begin(), 
expected_put_values.end(), put_values.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicBitmap) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+
+  redis::Bitmap bitmap(storage_, "test_ns6");
+  bool ret = false;
+  bitmap.SetBit("bitmap-1", 0, true, &ret);
+  bitmap.SetBit("bitmap-1", 8 * 1024, true, &ret);
+  bitmap.SetBit("bitmap-1", 2 * 8 * 1024, true, &ret);
+
+  auto expected_put_values = {"\x1", "\x1", "\x1"};
+  std::vector<std::string> put_values;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id == kColumnFamilyIDDefault) {
+          put_values.emplace_back(item.value);
+        }
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisBitmap);
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+  ASSERT_EQ(expected_put_values.size(), put_values.size());
+  ASSERT_TRUE(std::equal(expected_put_values.begin(), 
expected_put_values.end(), put_values.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicJSON) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  redis::Json json(storage_, "test_ns7");
+  json.Set("json-1", "$", "{\"a\": 1, \"b\": 2}");
+  json.Set("json-2", "$", "{\"a\": 1, \"b\": 2}");
+  json.Set("json-3", "$", "{\"a\": 1, \"b\": 2}");
+
+  size_t result = 0;
+  ASSERT_TRUE(json.Del("json-3", "$", &result).ok());
+
+  auto expected_put_keys = {"json-1", "json-2", "json-3"};
+  auto expected_delete_keys = {"json-3"};
+  std::vector<std::string> put_keys, delete_keys;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata);
+        auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+        ASSERT_EQ(ns.ToString(), "test_ns7");
+        put_keys.emplace_back(key.ToString());
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisJson);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata);
+        auto [ns, key] = ExtractNamespaceKey(item.key, 
storage_->IsSlotIdEncoded());
+        ASSERT_EQ(ns.ToString(), "test_ns7");
+        delete_keys.emplace_back(key.ToString());
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+
+  ASSERT_EQ(expected_put_keys.size(), put_keys.size());
+  ASSERT_EQ(expected_delete_keys.size(), delete_keys.size());
+  ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), 
put_keys.begin()));
+  ASSERT_TRUE(std::equal(expected_delete_keys.begin(), 
expected_delete_keys.end(), delete_keys.begin()));
+}
+
+TEST_F(WALIteratorTest, BasicSortedInt) {
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  redis::Sortedint sortedint(storage_, "test_ns8");
+  uint64_t ret = 0;
+  sortedint.Add("sortedint-1", {1, 2, 3}, &ret);
+  uint64_t removed_cnt = 0;
+  sortedint.Remove("sortedint-1", {2}, &removed_cnt);
+
+  std::vector<uint64_t> expected_values = {1, 2, 3}, put_values;
+  std::vector<uint64_t> expected_delete_values = {2}, delete_values;
+
+  engine::WALIterator iter(storage_);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    auto item = iter.Item();
+    switch (item.type) {
+      case engine::WALItem::Type::kTypePut: {
+        if (item.column_family_id == kColumnFamilyIDDefault) {
+          const InternalKey internal_key(item.key, 
storage_->IsSlotIdEncoded());
+          auto value = DecodeFixed64(internal_key.GetSubKey().data());
+          put_values.emplace_back(value);
+        }
+        break;
+      }
+      case engine::WALItem::Type::kTypeLogData: {
+        redis::WriteBatchLogData log_data;
+        ASSERT_TRUE(log_data.Decode(item.key).IsOK());
+        ASSERT_EQ(log_data.GetRedisType(), kRedisSortedint);
+        break;
+      }
+      case engine::WALItem::Type::kTypeDelete: {
+        const InternalKey internal_key(item.key, storage_->IsSlotIdEncoded());
+        auto value = DecodeFixed64(internal_key.GetSubKey().data());
+        delete_values.emplace_back(value);
+        break;
+      }
+      default:
+        FAIL() << "Unexpected wal item type" << uint8_t(item.type);
+    }
+  }
+  ASSERT_EQ(expected_values.size(), put_values.size());
+  ASSERT_EQ(expected_delete_values.size(), delete_values.size());
+  ASSERT_TRUE(std::equal(expected_values.begin(), expected_values.end(), 
put_values.begin()));
+  ASSERT_TRUE(std::equal(expected_delete_values.begin(), 
expected_delete_values.end(), delete_values.begin()));
+}
+
+TEST_F(WALIteratorTest, NextSequence) {
+  std::vector<rocksdb::SequenceNumber> expected_next_sequences;
+  std::set<rocksdb::SequenceNumber> next_sequences_set;
+
+  auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
+  uint64_t ret = 0;
+  redis::List list(storage_, "test_ns2");
+  list.Push("list-1", {"l0", "l1", "l2", "l3", "l4"}, false, &ret);
+  
expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber()
 + 1);
+  list.Push("list-2", {"l0", "l1", "l2"}, false, &ret);
+  
expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber()
 + 1);
+  ASSERT_TRUE(list.Trim("list-1", 2, 4).ok());
+  
expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber()
 + 1);
+
+  engine::WALIterator iter(storage_);
+
+  ASSERT_EQ(iter.NextSequenceNumber(), 0);
+
+  for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) {
+    next_sequences_set.emplace(iter.NextSequenceNumber());
+  }
+
+  std::vector<rocksdb::SequenceNumber> 
next_sequences(next_sequences_set.begin(), next_sequences_set.end());
+
+  ASSERT_EQ(expected_next_sequences.size(), next_sequences.size());
+  ASSERT_TRUE(std::equal(expected_next_sequences.begin(), 
expected_next_sequences.end(), next_sequences.begin()));
 }

Reply via email to