This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 45deba91 Add the support of the load RDB command (#1798)
45deba91 is described below

commit 45deba91f80efabbf2933a8d66ccd6b1d521a56b
Author: xq2010 <[email protected]>
AuthorDate: Tue Oct 17 22:28:39 2023 +0800

    Add the support of the load RDB command (#1798)
---
 src/commands/cmd_server.cc       |  48 ++++-
 src/common/rdb_stream.cc         |  69 +++++++
 src/common/rdb_stream.h          |  84 +++++++++
 src/storage/rdb.cc               | 399 ++++++++++++++++++++++++++++++---------
 src/storage/rdb.h                |  40 +++-
 src/storage/rdb_listpack.cc      |  12 +-
 src/storage/rdb_zipmap.cc        |   5 +-
 src/storage/redis_db.cc          |  14 ++
 src/storage/redis_db.h           |   1 +
 src/vendor/endianconv.h          |  10 +
 tests/cppunit/rdb_stream_test.cc |  90 +++++++++
 tests/cppunit/rdb_test.cc        | 312 ++++++++++++++++++++++++++++++
 tests/cppunit/rdb_util.h         | 203 ++++++++++++++++++++
 tests/cppunit/test_base.h        |   2 +
 14 files changed, 1184 insertions(+), 105 deletions(-)

diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index fe96074e..14ca00ee 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -22,6 +22,7 @@
 #include "commander.h"
 #include "commands/scan_base.h"
 #include "common/io_util.h"
+#include "common/rdb_stream.h"
 #include "config/config.h"
 #include "error_constants.h"
 #include "server/redis_connection.h"
@@ -1059,8 +1060,9 @@ class CommandRestore : public Commander {
       ttl_ms_ -= now;
     }
 
-    RDB rdb(svr->storage, conn->GetNamespace(), args_[3]);
-    auto s = rdb.Restore(args_[1], ttl_ms_);
+    auto stream_ptr = std::make_unique<RdbStringStream>(args_[3]);
+    RDB rdb(svr->storage, conn->GetNamespace(), std::move(stream_ptr));
+    auto s = rdb.Restore(args_[1], args_[3], ttl_ms_);
     if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};
     *output = redis::SimpleString("OK");
     return Status::OK();
@@ -1072,6 +1074,45 @@ class CommandRestore : public Commander {
   uint64_t ttl_ms_ = 0;
 };
 
+// command format: rdb load <path> [NX]  [DB index]
+class CommandRdb : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 3);
+    while (parser.Good()) {
+      if (parser.EatEqICase("NX")) {
+        overwrite_exist_key_ = false;
+      } else if (parser.EatEqICase("DB")) {
+        db_index_ = GET_OR_RET(parser.TakeInt<uint32_t>());
+      } else {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    rocksdb::Status db_status;
+    redis::Database redis(svr->storage, conn->GetNamespace());
+    auto type = args_[1];
+    auto path = args_[2];
+
+    auto stream_ptr = std::make_unique<RdbFileStream>(path);
+    GET_OR_RET(stream_ptr->Open());
+
+    RDB rdb(svr->storage, conn->GetNamespace(), std::move(stream_ptr));
+    GET_OR_RET(rdb.LoadRdb(db_index_, overwrite_exist_key_));
+
+    *output = redis::SimpleString("OK");
+    return Status::OK();
+  }
+
+ private:
+  bool overwrite_exist_key_ = true;  // default overwrite exist key
+  uint32_t db_index_ = 0;
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only 
ok-loading", 0, 0, 0),
                         MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0, 
0, 0),
                         MakeCmdAttr<CommandSelect>("select", 2, "read-only", 
0, 0, 0),
@@ -1105,6 +1146,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 
2, "read-only ok-loadin
                         MakeCmdAttr<CommandLastSave>("lastsave", 1, 
"read-only", 0, 0, 0),
                         MakeCmdAttr<CommandFlushBackup>("flushbackup", 1, 
"read-only no-script", 0, 0, 0),
                         MakeCmdAttr<CommandSlaveOf>("slaveof", 3, "read-only 
exclusive no-script", 0, 0, 0),
-                        MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0, 
0, 0), )
+                        MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0, 
0, 0),
+                        MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive", 
0, 0, 0), )
 
 }  // namespace redis
diff --git a/src/common/rdb_stream.cc b/src/common/rdb_stream.cc
new file mode 100644
index 00000000..83855ea6
--- /dev/null
+++ b/src/common/rdb_stream.cc
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "rdb_stream.h"
+
+#include "fmt/format.h"
+#include "vendor/crc64.h"
+
+StatusOr<size_t> RdbStringStream::Read(char *buf, size_t n) {
+  if (pos_ + n > input_.size()) {
+    return {Status::NotOK, "unexpected EOF"};
+  }
+  memcpy(buf, input_.data() + pos_, n);
+  pos_ += n;
+  return n;
+}
+
+StatusOr<uint64_t> RdbStringStream::GetCheckSum() const {
+  if (input_.size() < 8) {
+    return {Status::NotOK, "invalid payload length"};
+  }
+  uint64_t crc = crc64(0, reinterpret_cast<const unsigned char 
*>(input_.data()), input_.size() - 8);
+  memrev64ifbe(&crc);
+  return crc;
+}
+
+Status RdbFileStream::Open() {
+  ifs_.open(file_name_, std::ifstream::in | std::ifstream::binary);
+  if (!ifs_.is_open()) {
+    return {Status::NotOK, fmt::format("failed to open rdb file: '{}': {}", 
file_name_, strerror(errno))};
+  }
+
+  return Status::OK();
+}
+
+StatusOr<size_t> RdbFileStream::Read(char *buf, size_t len) {
+  size_t n = 0;
+  while (len) {
+    size_t read_bytes = std::min(max_read_chunk_size_, len);
+    ifs_.read(buf, static_cast<std::streamsize>(read_bytes));
+    if (!ifs_.good()) {
+      return Status(Status::NotOK, fmt::format("read failed: {}:", 
strerror(errno)));
+    }
+    check_sum_ = crc64(check_sum_, reinterpret_cast<const unsigned char 
*>(buf), read_bytes);
+    buf = (char *)buf + read_bytes;
+    len -= read_bytes;
+    total_read_bytes_ += read_bytes;
+    n += read_bytes;
+  }
+
+  return n;
+}
diff --git a/src/common/rdb_stream.h b/src/common/rdb_stream.h
new file mode 100644
index 00000000..842d24a7
--- /dev/null
+++ b/src/common/rdb_stream.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <fstream>
+#include <string>
+
+#include "status.h"
+#include "vendor/endianconv.h"
+
+class RdbStream {
+ public:
+  RdbStream() = default;
+  virtual ~RdbStream() = default;
+
+  virtual StatusOr<size_t> Read(char *buf, size_t len) = 0;
+  virtual StatusOr<uint64_t> GetCheckSum() const = 0;
+  StatusOr<uint8_t> ReadByte() {
+    uint8_t value = 0;
+    auto s = Read(reinterpret_cast<char *>(&value), 1);
+    if (!s.IsOK()) {
+      return s;
+    }
+    return value;
+  }
+};
+
+class RdbStringStream : public RdbStream {
+ public:
+  explicit RdbStringStream(std::string_view input) : input_(input){};
+  RdbStringStream(const RdbStringStream &) = delete;
+  RdbStringStream &operator=(const RdbStringStream &) = delete;
+  ~RdbStringStream() override = default;
+
+  StatusOr<size_t> Read(char *buf, size_t len) override;
+  StatusOr<uint64_t> GetCheckSum() const override;
+
+ private:
+  std::string input_;
+  size_t pos_ = 0;
+};
+
+class RdbFileStream : public RdbStream {
+ public:
+  explicit RdbFileStream(std::string file_name, size_t chunk_size = 1024 * 
1024)
+      : file_name_(std::move(file_name)), check_sum_(0), total_read_bytes_(0), 
max_read_chunk_size_(chunk_size){};
+  RdbFileStream(const RdbFileStream &) = delete;
+  RdbFileStream &operator=(const RdbFileStream &) = delete;
+  ~RdbFileStream() override = default;
+
+  Status Open();
+  StatusOr<size_t> Read(char *buf, size_t len) override;
+  StatusOr<uint64_t> GetCheckSum() const override {
+    uint64_t crc = check_sum_;
+    memrev64ifbe(&crc);
+    return crc;
+  }
+
+ private:
+  std::ifstream ifs_;
+  std::string file_name_;
+  uint64_t check_sum_;
+  size_t total_read_bytes_;
+  size_t max_read_chunk_size_;  // maximum single read chunk size
+};
diff --git a/src/storage/rdb.cc b/src/storage/rdb.cc
index 23ddb0d0..c73c172e 100644
--- a/src/storage/rdb.cc
+++ b/src/storage/rdb.cc
@@ -20,7 +20,11 @@
 
 #include "rdb.h"
 
+#include <glog/logging.h>
+
 #include "common/encoding.h"
+#include "common/rdb_stream.h"
+#include "common/time_util.h"
 #include "rdb_intset.h"
 #include "rdb_listpack.h"
 #include "rdb_ziplist.h"
@@ -39,53 +43,71 @@
 constexpr const int RDB6BitLen = 0;
 constexpr const int RDB14BitLen = 1;
 constexpr const int RDBEncVal = 3;
-constexpr const int RDB32BitLen = 0x08;
+constexpr const int RDB32BitLen = 0x80;
 constexpr const int RDB64BitLen = 0x81;
 constexpr const int RDBEncInt8 = 0;
 constexpr const int RDBEncInt16 = 1;
 constexpr const int RDBEncInt32 = 2;
 constexpr const int RDBEncLzf = 3;
 
-Status RDB::peekOk(size_t n) {
-  if (pos_ + n > input_.size()) {
-    return {Status::NotOK, "unexpected EOF"};
+/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
+constexpr const int RDBOpcodeFunction2 = 245;    /* function library data */
+constexpr const int RDBOpcodeFunction = 246;     /* old function library data 
for 7.0 rc1 and rc2 */
+constexpr const int RDBOpcodeModuleAux = 247;    /* Module auxiliary data. */
+constexpr const int RDBOpcodeIdle = 248;         /* LRU idle time. */
+constexpr const int RDBOpcodeFreq = 249;         /* LFU frequency. */
+constexpr const int RDBOpcodeAux = 250;          /* RDB aux field. */
+constexpr const int RDBOpcodeResizeDB = 251;     /* Hash table resize hint. */
+constexpr const int RDBOpcodeExpireTimeMs = 252; /* Expire time in 
milliseconds. */
+constexpr const int RDBOpcodeExpireTime = 253;   /* Old expire time in 
seconds. */
+constexpr const int RDBOpcodeSelectDB = 254;     /* DB number of the following 
keys. */
+constexpr const int RDBOpcodeEof = 255;          /* End of the RDB file. */
+
+constexpr const int SupportedRDBVersion = 10;  // not been tested for version 
11, so use this version with caution.
+constexpr const int MaxRDBVersion = 11;        // The current max rdb version 
supported by redis.
+
+constexpr const int RDBCheckSumLen = 8;                                        
// rdb check sum length
+constexpr const int RestoreRdbVersionLen = 2;                                  
// rdb version len in restore string
+constexpr const int RestoreFooterLen = RestoreRdbVersionLen + RDBCheckSumLen;  
// 10 = ver len  + checksum len
+constexpr const int MinRdbVersionToVerifyChecksum = 5;
+
+template <typename T>
+T LogWhenError(T &&s) {
+  if (!s) {
+    LOG(WARNING) << "Short read or unsupported type loading DB. Unrecoverable 
error, aborting now.";
+    LOG(ERROR) << "Unexpected EOF reading RDB file";
   }
-  return Status::OK();
+  return std::forward<T>(s);
 }
 
-Status RDB::VerifyPayloadChecksum() {
-  if (input_.size() < 10) {
+Status RDB::VerifyPayloadChecksum(const std::string_view &payload) {
+  if (payload.size() < RestoreFooterLen) {  // at least has rdb version and 
checksum
     return {Status::NotOK, "invalid payload length"};
   }
-  auto footer = input_.substr(input_.size() - 10);
+  auto footer = payload.substr(payload.size() - RestoreFooterLen);
   auto rdb_version = (footer[1] << 8) | footer[0];
   // For now, the max redis rdb version is 11
-  if (rdb_version > 11) {
+  if (rdb_version > MaxRDBVersion) {
     return {Status::NotOK, fmt::format("invalid or unsupported rdb version: 
{}", rdb_version)};
   }
-  uint64_t crc = crc64(0, reinterpret_cast<const unsigned char 
*>(input_.data()), input_.size() - 8);
-  memrev64ifbe(&crc);
-  if (memcmp(&crc, footer.data() + 2, 8)) {
+  auto crc = GET_OR_RET(stream_->GetCheckSum());
+  if (memcmp(&crc, footer.data() + RestoreRdbVersionLen, RDBCheckSumLen)) {
     return {Status::NotOK, "incorrect checksum"};
   }
   return Status::OK();
 }
 
 StatusOr<int> RDB::LoadObjectType() {
-  GET_OR_RET(peekOk(1));
-  auto type = input_[pos_++] & 0xFF;
-  // 0-5 is the basic type of Redis objects and 9-21 is the encoding type of 
Redis objects.
-  // Redis allow basic is 0-7 and 6/7 is for the module type which we don't 
support here.
-  if ((type >= 0 && type <= 5) || (type >= 9 && type <= 21)) {
+  auto type = GET_OR_RET(stream_->ReadByte());
+  if (isObjectType(type)) {
     return type;
   }
   return {Status::NotOK, fmt::format("invalid or unsupported object type: {}", 
type)};
 }
 
 StatusOr<uint64_t> RDB::loadObjectLen(bool *is_encoded) {
-  GET_OR_RET(peekOk(1));
   uint64_t len = 0;
-  auto c = input_[pos_++];
+  auto c = GET_OR_RET(stream_->ReadByte());
   auto type = (c & 0xC0) >> 6;
   switch (type) {
     case RDBEncVal:
@@ -95,20 +117,17 @@ StatusOr<uint64_t> RDB::loadObjectLen(bool *is_encoded) {
       return c & 0x3F;
     case RDB14BitLen:
       len = c & 0x3F;
-      GET_OR_RET(peekOk(1));
-      return (len << 8) | input_[pos_++];
-    case RDB32BitLen:
-      GET_OR_RET(peekOk(4));
-      __builtin_memcpy(&len, input_.data() + pos_, 4);
-      pos_ += 4;
-      return len;
-    case RDB64BitLen:
-      GET_OR_RET(peekOk(8));
-      __builtin_memcpy(&len, input_.data() + pos_, 8);
-      pos_ += 8;
-      return len;
+      return (len << 8) | GET_OR_RET(stream_->ReadByte());
     default:
-      return {Status::NotOK, fmt::format("Unknown length encoding {} in 
loadObjectLen()", type)};
+      if (c == RDB32BitLen) {
+        GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&len), 
sizeof(uint32_t)));
+        return ntohl(len);
+      } else if (c == RDB64BitLen) {
+        GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&len), 
sizeof(uint64_t)));
+        return ntohu64(len);
+      } else {
+        return {Status::NotOK, fmt::format("Unknown RDB string encoding type 
{} byte {}", type, c)};
+      }
   }
 }
 
@@ -118,13 +137,13 @@ StatusOr<std::string> RDB::LoadStringObject() { return 
loadEncodedString(); }
 StatusOr<std::string> RDB::loadLzfString() {
   auto compression_len = GET_OR_RET(loadObjectLen(nullptr));
   auto len = GET_OR_RET(loadObjectLen(nullptr));
-  GET_OR_RET(peekOk(static_cast<size_t>(compression_len)));
-
   std::string out_buf(len, 0);
-  if (lzf_decompress(input_.data() + pos_, compression_len, out_buf.data(), 
len) != len) {
+  std::vector<char> vec(compression_len);
+  GET_OR_RET(stream_->Read(vec.data(), compression_len));
+
+  if (lzf_decompress(vec.data(), compression_len, out_buf.data(), len) != len) 
{
     return {Status::NotOK, "Invalid LZF compressed string"};
   }
-  pos_ += compression_len;
   return out_buf;
 }
 
@@ -133,20 +152,18 @@ StatusOr<std::string> RDB::loadEncodedString() {
   auto len = GET_OR_RET(loadObjectLen(&is_encoded));
   if (is_encoded) {
     // For integer type, needs to convert to uint8_t* to avoid signed extension
-    auto data = reinterpret_cast<const uint8_t *>(input_.data());
+    unsigned char buf[4] = {0};
     if (len == RDBEncInt8) {
-      GET_OR_RET(peekOk(1));
-      return std::to_string(data[pos_++]);
+      auto next = GET_OR_RET(stream_->ReadByte());
+      return std::to_string(static_cast<int>(next));
     } else if (len == RDBEncInt16) {
-      GET_OR_RET(peekOk(2));
-      auto value = static_cast<uint16_t>(data[pos_]) | 
(static_cast<uint16_t>(data[pos_ + 1]) << 8);
-      pos_ += 2;
+      GET_OR_RET(stream_->Read(reinterpret_cast<char *>(buf), 2));
+      auto value = static_cast<uint16_t>(buf[0]) | 
(static_cast<uint16_t>(buf[1]) << 8);
       return std::to_string(static_cast<int16_t>(value));
     } else if (len == RDBEncInt32) {
-      GET_OR_RET(peekOk(4));
-      auto value = static_cast<uint32_t>(data[pos_]) | 
(static_cast<uint32_t>(data[pos_ + 1]) << 8) |
-                   (static_cast<uint32_t>(data[pos_ + 2]) << 16) | 
(static_cast<uint32_t>(data[pos_ + 3]) << 24);
-      pos_ += 4;
+      GET_OR_RET(stream_->Read(reinterpret_cast<char *>(buf), 4));
+      auto value = static_cast<uint32_t>(buf[0]) | 
(static_cast<uint32_t>(buf[1]) << 8) |
+                   (static_cast<uint32_t>(buf[2]) << 16) | 
(static_cast<uint32_t>(buf[3]) << 24);
       return std::to_string(static_cast<int32_t>(value));
     } else if (len == RDBEncLzf) {
       return loadLzfString();
@@ -156,10 +173,9 @@ StatusOr<std::string> RDB::loadEncodedString() {
   }
 
   // Normal string
-  GET_OR_RET(peekOk(static_cast<size_t>(len)));
-  auto value = std::string(input_.data() + pos_, len);
-  pos_ += len;
-  return value;
+  std::vector<char> vec(len);
+  GET_OR_RET(stream_->Read(vec.data(), len));
+  return std::string(vec.data(), len);
 }
 
 StatusOr<std::vector<std::string>> RDB::LoadListWithQuickList(int type) {
@@ -177,10 +193,23 @@ StatusOr<std::vector<std::string>> 
RDB::LoadListWithQuickList(int type) {
         return {Status::NotOK, fmt::format("Unknown quicklist node container 
type {}", container)};
       }
     }
-    auto list_pack_string = GET_OR_RET(loadEncodedString());
-    ListPack lp(list_pack_string);
-    auto elements = GET_OR_RET(lp.Entries());
-    list.insert(list.end(), elements.begin(), elements.end());
+
+    if (container == QuickListNodeContainerPlain) {
+      auto element = GET_OR_RET(loadEncodedString());
+      list.push_back(element);
+      continue;
+    }
+
+    auto encoded_string = GET_OR_RET(loadEncodedString());
+    if (type == RDBTypeListQuickList2) {
+      ListPack lp(encoded_string);
+      auto elements = GET_OR_RET(lp.Entries());
+      list.insert(list.end(), elements.begin(), elements.end());
+    } else {
+      ZipList zip_list(encoded_string);
+      auto elements = GET_OR_RET(zip_list.Entries());
+      list.insert(list.end(), elements.begin(), elements.end());
+    }
   }
   return list;
 }
@@ -279,18 +308,15 @@ StatusOr<std::map<std::string, std::string>> 
RDB::LoadHashWithZipList() {
 }
 
 StatusOr<double> RDB::loadBinaryDouble() {
-  GET_OR_RET(peekOk(8));
   double value = 0;
-  memcpy(&value, input_.data() + pos_, 8);
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&value), 8));
   memrev64ifbe(&value);
-  pos_ += 8;
   return value;
 }
 
 StatusOr<double> RDB::loadDouble() {
   char buf[256];
-  GET_OR_RET(peekOk(1));
-  auto len = static_cast<uint8_t>(input_[pos_++]);
+  auto len = GET_OR_RET(stream_->ReadByte());
   switch (len) {
     case 255:
       return -INFINITY; /* Negative inf */
@@ -299,10 +325,8 @@ StatusOr<double> RDB::loadDouble() {
     case 253:
       return NAN; /* NaN */
   }
-  GET_OR_RET(peekOk(len));
-  memcpy(buf, input_.data() + pos_, len);
+  GET_OR_RET(stream_->Read(buf, len));
   buf[len] = '\0';
-  pos_ += len;
   return ParseFloat<double>(std::string(buf, len));
 }
 
@@ -356,17 +380,28 @@ StatusOr<std::vector<MemberScore>> 
RDB::LoadZSetWithZipList() {
   return zset;
 }
 
-Status RDB::Restore(const std::string &key, uint64_t ttl_ms) {
+Status RDB::Restore(const std::string &key, std::string_view payload, uint64_t 
ttl_ms) {
   rocksdb::Status db_status;
 
   // Check the checksum of the payload
-  GET_OR_RET(VerifyPayloadChecksum());
+  GET_OR_RET(VerifyPayloadChecksum(payload));
 
   auto type = GET_OR_RET(LoadObjectType());
+
+  auto value = GET_OR_RET(loadRdbObject(type, key));
+
+  return saveRdbObject(type, key, value, ttl_ms);  // NOLINT
+}
+
+StatusOr<int> RDB::loadRdbType() {
+  auto type = GET_OR_RET(stream_->ReadByte());
+  return type;
+}
+
+StatusOr<RedisObjValue> RDB::loadRdbObject(int type, const std::string &key) {
   if (type == RDBTypeString) {
     auto value = GET_OR_RET(LoadStringObject());
-    redis::String string_db(storage_, ns_);
-    db_status = string_db.SetEX(key, value, ttl_ms);
+    return value;
   } else if (type == RDBTypeSet || type == RDBTypeSetIntSet || type == 
RDBTypeSetListPack) {
     std::vector<std::string> members;
     if (type == RDBTypeSet) {
@@ -376,14 +411,7 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
     } else {
       members = GET_OR_RET(LoadSetWithIntSet());
     }
-    redis::Set set_db(storage_, ns_);
-    uint64_t count = 0;
-    std::vector<Slice> insert_members;
-    insert_members.reserve(members.size());
-    for (const auto &member : members) {
-      insert_members.emplace_back(member);
-    }
-    db_status = set_db.Add(key, insert_members, &count);
+    return members;
   } else if (type == RDBTypeZSet || type == RDBTypeZSet2 || type == 
RDBTypeZSetListPack || type == RDBTypeZSetZipList) {
     std::vector<MemberScore> member_scores;
     if (type == RDBTypeZSet || type == RDBTypeZSet2) {
@@ -393,9 +421,7 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
     } else {
       member_scores = GET_OR_RET(LoadZSetWithZipList());
     }
-    redis::ZSet zset_db(storage_, ns_);
-    uint64_t count = 0;
-    db_status = zset_db.Add(key, ZAddFlags(0), (redis::ZSet::MemberScores 
*)&member_scores, &count);
+    return member_scores;
   } else if (type == RDBTypeHash || type == RDBTypeHashListPack || type == 
RDBTypeHashZipList ||
              type == RDBTypeHashZipMap) {
     std::map<std::string, std::string> entries;
@@ -408,14 +434,7 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
     } else {
       entries = GET_OR_RET(LoadHashWithZipMap());
     }
-    std::vector<FieldValue> filed_values;
-    filed_values.reserve(entries.size());
-    for (const auto &entry : entries) {
-      filed_values.emplace_back(entry.first, entry.second);
-    }
-    redis::Hash hash_db(storage_, ns_);
-    uint64_t count = 0;
-    db_status = hash_db.MSet(key, filed_values, false /*nx*/, &count);
+    return entries;
   } else if (type == RDBTypeList || type == RDBTypeListZipList || type == 
RDBTypeListQuickList ||
              type == RDBTypeListQuickList2) {
     std::vector<std::string> elements;
@@ -426,6 +445,47 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
     } else {
       elements = GET_OR_RET(LoadListWithQuickList(type));
     }
+    return elements;
+  }
+
+  return {Status::RedisParseErr, fmt::format("unsupported type: {}", type)};
+}
+
+Status RDB::saveRdbObject(int type, const std::string &key, const 
RedisObjValue &obj, uint64_t ttl_ms) {
+  rocksdb::Status db_status;
+  if (type == RDBTypeString) {
+    const auto &value = std::get<std::string>(obj);
+    redis::String string_db(storage_, ns_);
+    db_status = string_db.SetEX(key, value, ttl_ms);
+  } else if (type == RDBTypeSet || type == RDBTypeSetIntSet || type == 
RDBTypeSetListPack) {
+    const auto &members = std::get<std::vector<std::string>>(obj);
+    redis::Set set_db(storage_, ns_);
+    uint64_t count = 0;
+    std::vector<Slice> insert_members;
+    insert_members.reserve(members.size());
+    for (const auto &member : members) {
+      insert_members.emplace_back(member);
+    }
+    db_status = set_db.Add(key, insert_members, &count);
+  } else if (type == RDBTypeZSet || type == RDBTypeZSet2 || type == 
RDBTypeZSetListPack || type == RDBTypeZSetZipList) {
+    const auto &member_scores = std::get<std::vector<MemberScore>>(obj);
+    redis::ZSet zset_db(storage_, ns_);
+    uint64_t count = 0;
+    db_status = zset_db.Add(key, ZAddFlags(0), (redis::ZSet::MemberScores 
*)&member_scores, &count);
+  } else if (type == RDBTypeHash || type == RDBTypeHashListPack || type == 
RDBTypeHashZipList ||
+             type == RDBTypeHashZipMap) {
+    const auto &entries = std::get<std::map<std::string, std::string>>(obj);
+    std::vector<FieldValue> filed_values;
+    filed_values.reserve(entries.size());
+    for (const auto &entry : entries) {
+      filed_values.emplace_back(entry.first, entry.second);
+    }
+    redis::Hash hash_db(storage_, ns_);
+    uint64_t count = 0;
+    db_status = hash_db.MSet(key, filed_values, false /*nx*/, &count);
+  } else if (type == RDBTypeList || type == RDBTypeListZipList || type == 
RDBTypeListQuickList ||
+             type == RDBTypeListQuickList2) {
+    const auto &elements = std::get<std::vector<std::string>>(obj);
     if (!elements.empty()) {
       std::vector<Slice> insert_elements;
       insert_elements.reserve(elements.size());
@@ -437,7 +497,7 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
       db_status = list_db.Push(key, insert_elements, false, &list_size);
     }
   } else {
-    return {Status::RedisExecErr, fmt::format("unsupported restore type: {}", 
type)};
+    return {Status::RedisExecErr, fmt::format("unsupported save type: {}", 
type)};
   }
   if (!db_status.ok()) {
     return {Status::RedisExecErr, db_status.ToString()};
@@ -449,3 +509,170 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
   }
   return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr, 
db_status.ToString()};
 }
+
+StatusOr<uint32_t> RDB::loadExpiredTimeSeconds() {
+  uint32_t t32 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+  return t32;
+}
+
+StatusOr<uint64_t> RDB::loadExpiredTimeMilliseconds(int rdb_version) {
+  uint64_t t64 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+  /* before Redis 5 (RDB version 9), the function
+   * failed to convert data to/from little endian, so RDB files with keys 
having
+   * expires could not be shared between big endian and little endian systems
+   * (because the expire time will be totally wrong). comment from src/rdb.c: 
rdbLoadMillisecondTime*/
+  if (rdb_version >= 9) {
+    memrev64ifbe(&t64);
+  }
+  return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+  if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+    return vec_str_ptr->size() == 0;
+  }
+  if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+    return vec_mem_ptr->size() == 0;
+  }
+  if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+    return map_ptr->size() == 0;
+  }
+
+  return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool overwrite_exist_key) {
+  char buf[1024] = {0};
+  GET_OR_RET(LogWhenError(stream_->Read(buf, 9)));
+  buf[9] = '\0';
+
+  if (memcmp(buf, "REDIS", 5) != 0) {
+    LOG(WARNING) << "Wrong signature trying to load DB from file";
+    return {Status::NotOK, "Wrong signature trying to load DB from file"};
+  }
+
+  auto rdb_ver = std::atoi(buf + 5);
+  if (rdb_ver < 1 || rdb_ver > SupportedRDBVersion) {
+    LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+    return {Status::NotOK, fmt::format("Can't handle RDB format version {}", 
rdb_ver)};
+  }
+
+  uint64_t expire_time = 0;
+  int64_t expire_keys = 0;
+  int64_t load_keys = 0;
+  int64_t empty_keys_skipped = 0;
+  auto now = util::GetTimeStampMS();
+  uint32_t db_id = 0;
+  uint64_t skip_exist_keys = 0;
+  while (true) {
+    auto type = GET_OR_RET(LogWhenError(loadRdbType()));
+    if (type == RDBOpcodeExpireTime) {
+      expire_time = 
static_cast<uint64_t>(GET_OR_RET(LogWhenError(loadExpiredTimeSeconds())));
+      expire_time *= 1000;
+      continue;
+    } else if (type == RDBOpcodeExpireTimeMs) {
+      expire_time = 
GET_OR_RET(LogWhenError(loadExpiredTimeMilliseconds(rdb_ver)));
+      continue;
+    } else if (type == RDBOpcodeFreq) {               // LFU frequency: not 
use in kvrocks
+      GET_OR_RET(LogWhenError(stream_->ReadByte()));  // discard the value
+      continue;
+    } else if (type == RDBOpcodeIdle) {  // LRU idle time: not use in kvrocks
+      uint64_t discard = 0;
+      GET_OR_RET(LogWhenError(stream_->Read(reinterpret_cast<char 
*>(&discard), sizeof(uint64_t))));
+      continue;
+    } else if (type == RDBOpcodeEof) {
+      break;
+    } else if (type == RDBOpcodeSelectDB) {
+      db_id = GET_OR_RET(LogWhenError(loadObjectLen(nullptr)));
+      continue;
+    } else if (type == RDBOpcodeResizeDB) {              // not use in 
kvrocks, hint redis for hash table resize
+      GET_OR_RET(LogWhenError(loadObjectLen(nullptr)));  // db_size
+      GET_OR_RET(LogWhenError(loadObjectLen(nullptr)));  // expires_size
+      continue;
+    } else if (type == RDBOpcodeAux) {
+      /* AUX: generic string-string fields. Use to add state to RDB
+       * which is backward compatible. Implementations of RDB loading
+       * are required to skip AUX fields they don't understand.
+       *
+       * An AUX field is composed of two strings: key and value. */
+      auto key = GET_OR_RET(LogWhenError(LoadStringObject()));
+      auto value = GET_OR_RET(LogWhenError(LoadStringObject()));
+      continue;
+    } else if (type == RDBOpcodeModuleAux) {
+      LOG(WARNING) << "RDB module not supported";
+      return {Status::NotOK, "RDB module not supported"};
+    } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+      LOG(WARNING) << "RDB function not supported";
+      return {Status::NotOK, "RDB function not supported"};
+    } else {
+      if (!isObjectType(type)) {
+        LOG(WARNING) << "Invalid or Not supported object type: " << type;
+        return {Status::NotOK, fmt::format("Invalid or Not supported object 
type {}", type)};
+      }
+    }
+
+    auto key = GET_OR_RET(LogWhenError(LoadStringObject()));
+    auto value = GET_OR_RET(LogWhenError(loadRdbObject(type, key)));
+
+    if (db_index != db_id) {  // skip db not match
+      continue;
+    }
+
+    if (isEmptyRedisObject(value)) {  // compatible with empty value
+      /* Since we used to have bug that could lead to empty keys
+       * (See #8453), we rather not fail when empty key is encountered
+       * in an RDB file, instead we will silently discard it and
+       * continue loading. */
+      if (empty_keys_skipped++ < 10) {  // only log 10 empty keys, just as 
redis does.
+        LOG(WARNING) << "skipping empty key: " << key;
+      }
+      continue;
+    } else if (expire_time != 0 &&
+               expire_time < now) {  // in redis this used to feed this 
deletion to any connected replicas
+      expire_keys++;
+      continue;
+    }
+
+    if (!overwrite_exist_key) {  // only load not exist key
+      redis::Database redis(storage_, ns_);
+      auto s = redis.KeyExist(key);
+      if (!s.IsNotFound()) {
+        skip_exist_keys++;  // skip it even it's not okay
+        if (!s.ok()) {
+          LOG(ERROR) << "check key " << key << " exist failed: " << 
s.ToString();
+        }
+        continue;
+      }
+    }
+
+    auto ret = saveRdbObject(type, key, value, expire_time);
+    if (!ret.IsOK()) {
+      LOG(WARNING) << "save rdb object key " << key << " failed: " << 
ret.Msg();
+    } else {
+      load_keys++;
+    }
+  }
+
+  // Verify the checksum if RDB version is >= 5
+  if (rdb_ver >= MinRdbVersionToVerifyChecksum) {
+    uint64_t chk_sum = 0;
+    auto expected = GET_OR_RET(LogWhenError(stream_->GetCheckSum()));
+    GET_OR_RET(LogWhenError(stream_->Read(reinterpret_cast<char *>(&chk_sum), 
RDBCheckSumLen)));
+    if (chk_sum == 0) {
+      LOG(WARNING) << "RDB file was saved with checksum disabled: no check 
performed.";
+    } else if (chk_sum != expected) {
+      LOG(WARNING) << "Wrong RDB checksum expected: " << chk_sum << " got: " 
<< expected;
+      return {Status::NotOK, "All objects were processed and loaded but the 
checksum is unexpected!"};
+    }
+  }
+
+  std::string skip_info = (overwrite_exist_key ? ", exist keys skipped: " + 
std::to_string(skip_exist_keys) : "");
+
+  LOG(INFO) << "Done loading RDB,  keys loaded: " << load_keys << ", keys 
expired:" << expire_keys
+            << ", empty keys skipped: " << empty_keys_skipped << skip_info;
+
+  return Status::OK();
+}
diff --git a/src/storage/rdb.h b/src/storage/rdb.h
index 956df034..7b4cce31 100644
--- a/src/storage/rdb.h
+++ b/src/storage/rdb.h
@@ -19,8 +19,13 @@
  */
 #pragma once
 
+#include <map>
+#include <memory>
+#include <string>
 #include <string_view>
 #include <utility>
+#include <variant>
+#include <vector>
 
 #include "status.h"
 #include "types/redis_zset.h"
@@ -47,26 +52,31 @@ constexpr const int RDBTypeZSetListPack = 17;
 constexpr const int RDBTypeListQuickList2 = 18;
 constexpr const int RDBTypeStreamListPack2 = 19;
 constexpr const int RDBTypeSetListPack = 20;
-// NOTE: when adding new Redis object encoding type, update LoadObjectType.
+// NOTE: when adding new Redis object encoding type, update isObjectType.
 
 // Quick list node encoding
 constexpr const int QuickListNodeContainerPlain = 1;
 constexpr const int QuickListNodeContainerPacked = 2;
 
+class RdbStream;
+
+using RedisObjValue =
+    std::variant<std::string, std::vector<std::string>, 
std::vector<MemberScore>, std::map<std::string, std::string>>;
+
 class RDB {
  public:
-  explicit RDB(engine::Storage *storage, std::string ns, std::string_view 
input)
-      : storage_(storage), ns_(std::move(ns)), input_(input){};
+  explicit RDB(engine::Storage *storage, std::string ns, 
std::unique_ptr<RdbStream> stream)
+      : storage_(storage), ns_(std::move(ns)), stream_(std::move(stream)){};
   ~RDB() = default;
 
-  Status VerifyPayloadChecksum();
+  Status VerifyPayloadChecksum(const std::string_view &payload);
   StatusOr<int> LoadObjectType();
-  Status Restore(const std::string &key, uint64_t ttl_ms);
+  Status Restore(const std::string &key, std::string_view payload, uint64_t 
ttl_ms);
 
   // String
   StatusOr<std::string> LoadStringObject();
 
-  // List
+  // Hash
   StatusOr<std::map<std::string, std::string>> LoadHashObject();
   StatusOr<std::map<std::string, std::string>> LoadHashWithZipMap();
   StatusOr<std::map<std::string, std::string>> LoadHashWithListPack();
@@ -87,16 +97,28 @@ class RDB {
   StatusOr<std::vector<std::string>> LoadListWithZipList();
   StatusOr<std::vector<std::string>> LoadListWithQuickList(int type);
 
+  // Load rdb
+  Status LoadRdb(uint32_t db_index, bool overwrite_exist_key = true);
+
  private:
   engine::Storage *storage_;
   std::string ns_;
-  std::string_view input_;
-  size_t pos_ = 0;
+  std::unique_ptr<RdbStream> stream_;
 
   StatusOr<std::string> loadLzfString();
   StatusOr<std::string> loadEncodedString();
   StatusOr<uint64_t> loadObjectLen(bool *is_encoded);
-  Status peekOk(size_t n);
   StatusOr<double> loadBinaryDouble();
   StatusOr<double> loadDouble();
+
+  StatusOr<int> loadRdbType();
+  StatusOr<RedisObjValue> loadRdbObject(int rdbtype, const std::string &key);
+  Status saveRdbObject(int type, const std::string &key, const RedisObjValue 
&obj, uint64_t ttl_ms);
+  StatusOr<uint32_t> loadExpiredTimeSeconds();
+  StatusOr<uint64_t> loadExpiredTimeMilliseconds(int rdb_version);
+
+  /*0-5 is the basic type of Redis objects and 9-21 is the encoding type of 
Redis objects.
+   Redis allow basic is 0-7 and 6/7 is for the module type which we don't 
support here.*/
+  static bool isObjectType(int type) { return (type >= 0 && type <= 5) || 
(type >= 9 && type <= 21); };
+  static bool isEmptyRedisObject(const RedisObjValue &value);
 };
diff --git a/src/storage/rdb_listpack.cc b/src/storage/rdb_listpack.cc
index f2933bb9..4eb3597d 100644
--- a/src/storage/rdb_listpack.cc
+++ b/src/storage/rdb_listpack.cc
@@ -108,10 +108,12 @@ StatusOr<uint32_t> ListPack::Length() {
     return {Status::NotOK, "invalid listpack length"};
   }
 
+  auto data = reinterpret_cast<const uint8_t*>(
+      input_.data());  // to avoid the sign extension in 
static_cast<uint32_t>(input_[n]))
   // total bytes and number of elements are encoded in little endian
-  uint32_t total_bytes = (static_cast<uint32_t>(input_[0])) | 
(static_cast<uint32_t>(input_[1]) << 8) |
-                         (static_cast<uint32_t>(input_[2]) << 16) | 
(static_cast<uint32_t>(input_[3]) << 24);
-  uint32_t len = (static_cast<uint32_t>(input_[4])) | 
(static_cast<uint32_t>(input_[5]) << 8);
+  uint32_t total_bytes = (static_cast<uint32_t>(data[0])) | 
(static_cast<uint32_t>(data[1]) << 8) |
+                         (static_cast<uint32_t>(data[2]) << 16) | 
(static_cast<uint32_t>(data[3]) << 24);
+  uint32_t len = (static_cast<uint32_t>(data[4])) | 
(static_cast<uint32_t>(data[5]) << 8);
   pos_ += listPackHeaderSize;
 
   if (total_bytes != input_.size()) {
@@ -166,7 +168,7 @@ StatusOr<std::string> ListPack::Next() {
     pos_ += value_len + encodeBackLen(value_len + 1);
   } else if ((c & ListPack13BitIntMask) == ListPack13BitInt) {  // 13bit int
     GET_OR_RET(peekOK(3));
-    int_value = ((c & 0x1F) << 8) | data[pos_];
+    int_value = ((c & 0x1F) << 8) | data[pos_ + 1];
     value = std::to_string(int_value);
     pos_ += ListPack13BitIntEntrySize;
   } else if ((c & ListPack16BitIntMask) == ListPack16BitInt) {  // 16bit int
@@ -188,7 +190,7 @@ StatusOr<std::string> ListPack::Next() {
     pos_ += ListPack32BitIntEntrySize;
   } else if ((c & ListPack64BitIntMask) == ListPack64BitInt) {  // 64bit int
     GET_OR_RET(peekOK(10));
-    int_value = (static_cast<uint64_t>(data[pos_ + 1])) | 
(static_cast<uint64_t>(input_[pos_ + 2]) << 8) |
+    int_value = (static_cast<uint64_t>(data[pos_ + 1])) | 
(static_cast<uint64_t>(data[pos_ + 2]) << 8) |
                 (static_cast<uint64_t>(data[pos_ + 3]) << 16) | 
(static_cast<uint64_t>(data[pos_ + 4]) << 24) |
                 (static_cast<uint64_t>(data[pos_ + 5]) << 32) | 
(static_cast<uint64_t>(data[pos_ + 6]) << 40) |
                 (static_cast<uint64_t>(data[pos_ + 7]) << 48) | 
(static_cast<uint64_t>(data[pos_ + 8]) << 56);
diff --git a/src/storage/rdb_zipmap.cc b/src/storage/rdb_zipmap.cc
index 724ce852..6d82dad1 100644
--- a/src/storage/rdb_zipmap.cc
+++ b/src/storage/rdb_zipmap.cc
@@ -59,11 +59,12 @@ StatusOr<std::pair<std::string, std::string>> 
ZipMap::Next() {
   auto key_len = GET_OR_RET(decodeLength());
   GET_OR_RET(peekOK(key_len));
   auto key = input_.substr(pos_, key_len);
-  pos_ += key_len + getEncodedLengthSize(key_len);
+  pos_ += key_len;  // decodeLength already process as 
getEncodedLengthSize(key_len);
   auto val_len = GET_OR_RET(decodeLength());
   GET_OR_RET(peekOK(val_len + 1 /* free byte */));
+  pos_ += 1;
   auto value = input_.substr(pos_, val_len);
-  pos_ += val_len + getEncodedLengthSize(val_len) + 1 /* free byte */;
+  pos_ += val_len;  // + getEncodedLengthSize(val_len) + 1 /* free byte */;
   return std::make_pair(key, value);
 }
 
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 057729f2..29dc277f 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -626,6 +626,20 @@ rocksdb::Status Database::GetSlotKeysInfo(int slot, 
std::map<int, uint64_t> *slo
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status Database::KeyExist(const std::string &key) {
+  int cnt = 0;
+  std::vector<rocksdb::Slice> keys;
+  keys.emplace_back(key);
+  auto s = Exists(keys, &cnt);
+  if (!s.ok()) {
+    return s;
+  }
+  if (cnt == 0) {
+    return rocksdb::Status::NotFound();
+  }
+  return rocksdb::Status::OK();
+}
+
 rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, 
const std::string &cursor, uint64_t limit,
                                     const std::string &subkey_prefix, 
std::vector<std::string> *keys,
                                     std::vector<std::string> *values) {
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 3e94b73b..c77135df 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -62,6 +62,7 @@ class Database {
   [[nodiscard]] rocksdb::Status ClearKeysOfSlot(const rocksdb::Slice &ns, int 
slot);
   [[nodiscard]] rocksdb::Status GetSlotKeysInfo(int slot, std::map<int, 
uint64_t> *slotskeys,
                                                 std::vector<std::string> 
*keys, int count);
+  [[nodiscard]] rocksdb::Status KeyExist(const std::string &key);
 
  protected:
   engine::Storage *storage_;
diff --git a/src/vendor/endianconv.h b/src/vendor/endianconv.h
index e2704275..3907578a 100644
--- a/src/vendor/endianconv.h
+++ b/src/vendor/endianconv.h
@@ -59,4 +59,14 @@ uint64_t intrev64(uint64_t v);
 #define intrev64ifbe(v) intrev64(v)
 #endif
 
+/* The functions htonu64() and ntohu64() convert the specified value to
+ * network byte ordering and back. In big endian systems they are no-ops. */
+#if (BYTE_ORDER == BIG_ENDIAN)
+#define htonu64(v) (v)
+#define ntohu64(v) (v)
+#else
+#define htonu64(v) intrev64(v)
+#define ntohu64(v) intrev64(v)
+#endif
+
 // NOLINTEND
diff --git a/tests/cppunit/rdb_stream_test.cc b/tests/cppunit/rdb_stream_test.cc
new file mode 100644
index 00000000..c82a04cf
--- /dev/null
+++ b/tests/cppunit/rdb_stream_test.cc
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "common/rdb_stream.h"
+
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <fstream>
+
+#include "rdb_util.h"
+
+TEST(RdbFileStreamOpenTest, FileNotExist) {
+  RdbFileStream reader("not_exist.rdb");
+  ASSERT_FALSE(reader.Open().IsOK());
+  ;
+}
+
+TEST(RdbFileStreamOpenTest, FileExist) {
+  std::string test_file = "hash-zipmap.rdb";
+  ScopedTestRDBFile temp(test_file, hash_zipmap_payload, 
sizeof(hash_zipmap_payload) - 1);
+  RdbFileStream reader(test_file);
+  ASSERT_TRUE(reader.Open().IsOK());
+}
+
+TEST(RdbFileStreamReadTest, ReadRdb) {
+  const std::string test_file = "encodings.rdb";
+  ScopedTestRDBFile temp(test_file, encodings_rdb_payload, 
sizeof(encodings_rdb_payload) - 1);
+
+  std::ifstream file(test_file, std::ios::binary | std::ios::ate);
+  std::streamsize size = file.tellg();
+  file.close();
+
+  RdbFileStream reader(test_file);
+  ASSERT_TRUE(reader.Open().IsOK());
+
+  char buf[16] = {0};
+  ASSERT_EQ(reader.Read(buf, 5).GetValue(), 5);
+  ASSERT_EQ(strncmp(buf, "REDIS", 5), 0);
+  size -= 5;
+
+  auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
+  while (size >= len) {
+    ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+    size -= len;
+  }
+
+  if (size > 0) {
+    ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+  }
+}
+
+TEST(RdbFileStreamReadTest, ReadRdbLittleChunk) {
+  const std::string test_file = "encodings.rdb";
+  ScopedTestRDBFile temp(test_file, encodings_rdb_payload, 
sizeof(encodings_rdb_payload) - 1);
+
+  std::ifstream file(test_file, std::ios::binary | std::ios::ate);
+  std::streamsize size = file.tellg();
+  file.close();
+
+  RdbFileStream reader(test_file, 16);
+  ASSERT_TRUE(reader.Open().IsOK());
+  char buf[32] = {0};
+  auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
+
+  while (size >= len) {
+    ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+    size -= len;
+  }
+
+  if (size > 0) {
+    ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+  }
+}
diff --git a/tests/cppunit/rdb_test.cc b/tests/cppunit/rdb_test.cc
new file mode 100644
index 00000000..d3a18355
--- /dev/null
+++ b/tests/cppunit/rdb_test.cc
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "storage/rdb.h"
+
+#include <cmath>
+#include <filesystem>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/rdb_stream.h"
+#include "config/config.h"
+#include "rdb_util.h"
+#include "storage/storage.h"
+#include "test_base.h"
+#include "types/redis_hash.h"
+#include "types/redis_list.h"
+#include "types/redis_set.h"
+#include "types/redis_string.h"
+#include "types/redis_zset.h"
+#include "vendor/crc64.h"
+
+class RDBTest : public TestBase {
+ public:
+  RDBTest(const RDBTest &) = delete;
+  RDBTest &operator=(const RDBTest &) = delete;
+
+ protected:
+  explicit RDBTest() : ns_(kDefaultNamespace) {}
+  ~RDBTest() override = default;
+  void SetUp() override { crc64_init(); }
+
+  void TearDown() override { ASSERT_TRUE(clearDBDir(config_->db_dir)); }
+
+  void loadRdb(const std::string &path) {
+    auto stream_ptr = std::make_unique<RdbFileStream>(path);
+    auto s = stream_ptr->Open();
+    ASSERT_TRUE(s.IsOK());
+
+    RDB rdb(storage_, ns_, std::move(stream_ptr));
+    s = rdb.LoadRdb(0);
+    ASSERT_TRUE(s.IsOK());
+  }
+
+  void stringCheck(const std::string &key, const std::string &expect) {
+    redis::String string_db(storage_, ns_);
+    std::string value;
+    auto s = string_db.Get(key, &value);
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect == value);
+  }
+
+  void setCheck(const std::string &key, const std::vector<std::string> 
&expect) {
+    redis::Set set_db(storage_, ns_);
+    std::vector<std::string> members;
+    auto s = set_db.Members(key, &members);
+
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect == members);
+  }
+
+  void hashCheck(const std::string &key, const std::map<std::string, 
std::string> &expect) {
+    redis::Hash hash_db(storage_, ns_);
+    std::vector<FieldValue> field_values;
+    auto s = hash_db.GetAll(key, &field_values);
+    ASSERT_TRUE(s.ok());
+
+    // size check
+    ASSERT_TRUE(field_values.size() == expect.size());
+    for (const auto &p : field_values) {
+      auto iter = expect.find(p.field);
+      if (iter == expect.end()) {
+        ASSERT_TRUE(false);
+      }
+      ASSERT_TRUE(iter->second == p.value);
+    }
+  }
+
+  void listCheck(const std::string &key, const std::vector<std::string> 
&expect) {
+    redis::List list_db(storage_, ns_);
+    std::vector<std::string> values;
+    auto s = list_db.Range(key, 0, -1, &values);
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect == values);
+  }
+
+  void zsetCheck(const std::string &key, const std::vector<MemberScore> 
&expect) {
+    redis::ZSet zset_db(storage_, ns_);
+    std::vector<MemberScore> member_scores;
+
+    RangeRankSpec spec;
+    auto s = zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect.size() == member_scores.size());
+    for (size_t i = 0; i < expect.size(); ++i) {
+      ASSERT_TRUE(expect[i].member == member_scores[i].member);
+      ASSERT_TRUE(std::fabs(expect[i].score - member_scores[i].score) < 
0.000001);
+    }
+  }
+
+  rocksdb::Status keyExist(const std::string &key) {
+    redis::Database redis(storage_, ns_);
+    return redis.KeyExist(key);
+  }
+
+  void flushDB() {
+    redis::Database redis(storage_, ns_);
+    auto s = redis.FlushDB();
+    ASSERT_TRUE(s.ok());
+  }
+
+  void encodingDataCheck();
+
+  std::string ns_;
+  std::string tmp_rdb_;
+
+ private:
+  static bool clearDBDir(const std::string &path) {
+    try {
+      std::filesystem::remove_all(path);
+    } catch (std::filesystem::filesystem_error &e) {
+      return false;
+    }
+    return true;
+  }
+};
+
+void RDBTest::encodingDataCheck() {
+  // string
+  stringCheck("compressible",
+              
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
+              "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
+  stringCheck("string", "Hello World");
+  stringCheck("number", "10");
+
+  // list
+  std::vector<std::string> list_expect = {"1", "2", "3", "a", "b", "c", 
"100000", "6000000000",
+                                          "1", "2", "3", "a", "b", "c", 
"100000", "6000000000",
+                                          "1", "2", "3", "a", "b", "c", 
"100000", "6000000000"};
+  listCheck("list", list_expect);
+
+  std::vector<std::string> list_zipped_expect = {"1", "2", "3", "a", "b", "c", 
"100000", "6000000000"};
+  listCheck("list_zipped", list_zipped_expect);
+
+  // set
+  std::vector<std::string> set_expect = {"1", "100000", "2", "3", 
"6000000000", "a", "b", "c"};
+  setCheck("set", set_expect);
+
+  std::vector<std::string> set_zipped_1_expect = {"1", "2", "3", "4"};
+  setCheck("set_zipped_1", set_zipped_1_expect);
+
+  std::vector<std::string> set_zipped_2_expect = {"100000", "200000", 
"300000", "400000"};
+  setCheck("set_zipped_2", set_zipped_2_expect);
+
+  std::vector<std::string> set_zipped_3_expect = {"1000000000", "2000000000", 
"3000000000",
+                                                  "4000000000", "5000000000", 
"6000000000"};
+  setCheck("set_zipped_3", set_zipped_3_expect);
+
+  // hash
+  std::map<std::string, std::string> hash_expect = {{"a", "1"},     {"aa", 
"10"},   {"aaa", "100"},       {"b", "2"},
+                                                    {"bb", "20"},   {"bbb", 
"200"}, {"c", "3"},           {"cc", "30"},
+                                                    {"ccc", "300"}, {"ddd", 
"400"}, {"eee", "5000000000"}};
+  hashCheck("hash", hash_expect);
+
+  std::map<std::string, std::string> hash_zipped_expect = {
+      {"a", "1"},
+      {"b", "2"},
+      {"c", "3"},
+  };
+  hashCheck("hash_zipped", hash_zipped_expect);
+
+  // zset
+  std::vector<MemberScore> zset_expect = {
+      {"a", 1},     {"b", 2},     {"c", 3},     {"aa", 10},     {"bb", 20},    
      {"cc", 30},
+      {"aaa", 100}, {"bbb", 200}, {"ccc", 300}, {"aaaa", 1000}, {"cccc", 
123456789}, {"bbbb", 5000000000}};
+  zsetCheck("zset", zset_expect);
+
+  std::vector<MemberScore> zset_zipped_expect = {
+      {"a", 1},
+      {"b", 2},
+      {"c", 3},
+  };
+  zsetCheck("zset_zipped", zset_zipped_expect);
+}
+
+std::string ConvertToString(const char *data, size_t len) { return {data, data 
+ len}; }
+
+TEST_F(RDBTest, LoadEncodings) {
+  std::map<std::string, std::string> data;
+  data.insert({"encodings.rdb", ConvertToString(encodings_rdb_payload, 
sizeof(encodings_rdb_payload) - 1)});
+  data.insert(
+      {"encodings_ver10.rdb", ConvertToString(encodings_ver10_rdb_payload, 
sizeof(encodings_ver10_rdb_payload) - 1)});
+  for (const auto &kv : data) {
+    tmp_rdb_ = kv.first;
+    ScopedTestRDBFile temp(tmp_rdb_, kv.second.data(), kv.second.size());
+    loadRdb(tmp_rdb_);
+    encodingDataCheck();
+    flushDB();
+  }
+}
+
+TEST_F(RDBTest, LoadHashZipMap) {
+  tmp_rdb_ = "hash-zipmap.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, hash_zipmap_payload, 
sizeof(hash_zipmap_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // hash
+  std::map<std::string, std::string> hash_expect = {
+      {"f1", "v1"},
+      {"f2", "v2"},
+  };
+  hashCheck("hash", hash_expect);
+}
+
+TEST_F(RDBTest, LoadHashZipList) {
+  tmp_rdb_ = "hash-ziplist.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, hash_ziplist_payload, 
sizeof(hash_ziplist_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // hash
+  std::map<std::string, std::string> hash_expect = {
+      {"f1", "v1"},
+      {"f2", "v2"},
+  };
+  hashCheck("hash", hash_expect);
+}
+
+TEST_F(RDBTest, LoadListQuickList) {
+  tmp_rdb_ = "list-quicklist.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, list_quicklist_payload, 
sizeof(list_quicklist_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // list
+  std::vector<std::string> list_expect = {"7"};
+  listCheck("list", list_expect);
+}
+
+TEST_F(RDBTest, LoadZSetZipList) {
+  tmp_rdb_ = "zset-ziplist.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, zset_ziplist_payload, 
sizeof(zset_ziplist_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // zset
+  std::vector<MemberScore> zset_expect = {
+      {"one", 1},
+      {"two", 2},
+  };
+  zsetCheck("zset", zset_expect);
+}
+
+TEST_F(RDBTest, LoadEmptyKeys) {
+  tmp_rdb_ = "corrupt_empty_keys.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, corrupt_empty_keys_payload, 
sizeof(corrupt_empty_keys_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  /* corrupt_empty_keys.rdb contains 9 keys with empty value:
+   "set"  "hash" "list_ziplist" "zset" "zset_listpack" "hash_ziplist" 
"list_quicklist" "zset_ziplist"
+   "list_quicklist_empty_ziplist"
+  */
+
+  // string
+  rocksdb::Status s = keyExist("empty_string");  // empty_string not exist in 
rdb file
+  ASSERT_TRUE(s.IsNotFound());
+
+  // list
+  s = keyExist("list_ziplist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = keyExist("list_quicklist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = keyExist("list_quicklist_empty_ziplist");
+
+  // set
+  s = keyExist("set");
+  ASSERT_TRUE(s.IsNotFound());
+
+  // hash
+  s = keyExist("hash");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = keyExist("hash_ziplist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  // zset
+  s = keyExist("zset");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = keyExist("zset_ziplist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = keyExist("zset_listpack");
+  ASSERT_TRUE(s.IsNotFound());
+}
\ No newline at end of file
diff --git a/tests/cppunit/rdb_util.h b/tests/cppunit/rdb_util.h
new file mode 100644
index 00000000..8ce743d5
--- /dev/null
+++ b/tests/cppunit/rdb_util.h
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#pragma once
+
+#include <gtest/gtest.h>
+
+#include <fstream>
+#include <string>
+
+// RDB test data, copy from Redis's tests/asset/*rdb, not shellcode.
+
+// zset-ziplist.rdb
+inline constexpr const char zset_ziplist_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x31\x30\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x0b\x32\x35\x35\x2e"
+    
"\x32\x35\x35\x2e\x32\x35\x35\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74"
+    
"\x69\x6d\x65\xc2\x62\xb7\x13\x61\xfa\x08\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\x50\xf4\x0c\x00\xfa\x0c"
+    
"\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0\x00\xfe\x00\xfb\x01\x00\x0c\x04\x7a\x73\x65\x74"
+    
"\x19\x19\x00\x00\x00\x16\x00\x00\x00\x04\x00\x00\x03\x6f\x6e\x65\x05\xf2\x02\x03\x74\x77\x6f\x05\xf3"
+    "\xff\xff\x1f\xb2\xfd\xf0\x99\x7f\x9e\x19";
+
+// corrupt_empty_keys.rdb
+inline constexpr const char corrupt_empty_keys_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x31\x30\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x0b\x32\x35\x35\x2e"
+    
"\x32\x35\x35\x2e\x32\x35\x35\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74"
+    
"\x69\x6d\x65\xc2\x7a\x18\x15\x61\xfa\x08\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\x80\x31\x10\x00\xfa\x0c"
+    
"\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0\x00\xfe\x00\xfb\x09\x00\x02\x03\x73\x65\x74\x00"
+    
"\x04\x04\x68\x61\x73\x68\x00\x0a\x0c\x6c\x69\x73\x74\x5f\x7a\x69\x70\x6c\x69\x73\x74\x0b\x0b\x00\x00"
+    
"\x00\x0a\x00\x00\x00\x00\x00\xff\x05\x04\x7a\x73\x65\x74\x00\x11\x0d\x7a\x73\x65\x74\x5f\x6c\x69\x73"
+    
"\x74\x70\x61\x63\x6b\x07\x07\x00\x00\x00\x00\x00\xff\x10\x0c\x68\x61\x73\x68\x5f\x7a\x69\x70\x6c\x69"
+    
"\x73\x74\x07\x07\x00\x00\x00\x00\x00\xff\x0e\x0e\x6c\x69\x73\x74\x5f\x71\x75\x69\x63\x6b\x6c\x69\x73"
+    
"\x74\x00\x0c\x0c\x7a\x73\x65\x74\x5f\x7a\x69\x70\x6c\x69\x73\x74\x0b\x0b\x00\x00\x00\x0a\x00\x00\x00"
+    
"\x00\x00\xff\x0e\x1c\x6c\x69\x73\x74\x5f\x71\x75\x69\x63\x6b\x6c\x69\x73\x74\x5f\x65\x6d\x70\x74\x79"
+    
"\x5f\x7a\x69\x70\x6c\x69\x73\x74\x01\x0b\x0b\x00\x00\x00\x0a\x00\x00\x00\x00\x00\xff\xff\xf0\xf5\x06"
+    "\xdd\xc6\x6e\x61\x83";
+
+// encodings.rdb
+inline constexpr const char encodings_rdb_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x34\xfe\x00\x03\x04\x7a\x73\x65\x74\x0c\x02\x62\x62\x02\x32\x30\x02\x63\x63\x02"
+    "\x33\x30"
+    
"\x03\x62\x62\x62\x03\x32\x30\x30\x04\x62\x62\x62\x62\x0a\x35\x30\x30\x30\x30\x30\x30\x30\x30\x30\x03\x63\x63\x63"
+    "\x03\x33"
+    
"\x30\x30\x04\x63\x63\x63\x63\x09\x31\x32\x33\x34\x35\x36\x37\x38\x39\x01\x61\x01\x31\x02\x61\x61\x02\x31\x30\x01"
+    "\x62\x01"
+    
"\x32\x03\x61\x61\x61\x03\x31\x30\x30\x01\x63\x01\x33\x04\x61\x61\x61\x61\x04\x31\x30\x30\x30\x0b\x0c\x73\x65\x74"
+    "\x5f\x7a"
+    
"\x69\x70\x70\x65\x64\x5f\x31\x10\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x0a\x0b\x6c\x69"
+    "\x73\x74"
+    
"\x5f\x7a\x69\x70\x70\x65\x64\x30\x30\x00\x00\x00\x25\x00\x00\x00\x08\x00\x00\xc0\x01\x00\x04\xc0\x02\x00\x04\xc0"
+    "\x03\x00"
+    
"\x04\x01\x61\x03\x01\x62\x03\x01\x63\x03\xd0\xa0\x86\x01\x00\x06\xe0\x00\xbc\xa0\x65\x01\x00\x00\x00\xff\x00\x06"
+    "\x73\x74"
+    
"\x72\x69\x6e\x67\x0b\x48\x65\x6c\x6c\x6f\x20\x57\x6f\x72\x6c\x64\x00\x0c\x63\x6f\x6d\x70\x72\x65\x73\x73\x69\x62"
+    "\x6c\x65"
+    
"\xc3\x09\x40\x89\x01\x61\x61\xe0\x7c\x00\x01\x61\x61\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x32\x18"
+    "\x04\x00"
+    
"\x00\x00\x04\x00\x00\x00\xa0\x86\x01\x00\x40\x0d\x03\x00\xe0\x93\x04\x00\x80\x1a\x06\x00\x00\x06\x6e\x75\x6d\x62"
+    "\x65\x72"
+    
"\xc0\x0a\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x33\x38\x08\x00\x00\x00\x06\x00\x00\x00\x00\xca\x9a"
+    "\x3b\x00"
+    
"\x00\x00\x00\x00\x94\x35\x77\x00\x00\x00\x00\x00\x5e\xd0\xb2\x00\x00\x00\x00\x00\x28\x6b\xee\x00\x00\x00\x00\x00"
+    "\xf2\x05"
+    
"\x2a\x01\x00\x00\x00\x00\xbc\xa0\x65\x01\x00\x00\x00\x02\x03\x73\x65\x74\x08\x0a\x36\x30\x30\x30\x30\x30\x30\x30"
+    "\x30\x30"
+    
"\xc2\xa0\x86\x01\x00\x01\x61\xc0\x01\x01\x62\xc0\x02\x01\x63\xc0\x03\x01\x04\x6c\x69\x73\x74\x18\xc0\x01\xc0\x02"
+    "\xc0\x03"
+    
"\x01\x61\x01\x62\x01\x63\xc2\xa0\x86\x01\x00\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\xc0\x01\xc0\x02\xc0\x03"
+    "\x01\x61"
+    
"\x01\x62\x01\x63\xc2\xa0\x86\x01\x00\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\xc0\x01\xc0\x02\xc0\x03\x01\x61"
+    "\x01\x62"
+    
"\x01\x63\xc2\xa0\x86\x01\x00\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\x0d\x0b\x68\x61\x73\x68\x5f\x7a\x69\x70"
+    "\x70\x65"
+    
"\x64\x20\x20\x00\x00\x00\x1b\x00\x00\x00\x06\x00\x00\x01\x61\x03\xc0\x01\x00\x04\x01\x62\x03\xc0\x02\x00\x04\x01"
+    "\x63\x03"
+    
"\xc0\x03\x00\xff\x0c\x0b\x7a\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x20\x20\x00\x00\x00\x1b\x00\x00\x00\x06\x00"
+    "\x00\x01"
+    
"\x61\x03\xc0\x01\x00\x04\x01\x62\x03\xc0\x02\x00\x04\x01\x63\x03\xc0\x03\x00\xff\x04\x04\x68\x61\x73\x68\x0b\x01"
+    "\x62\xc0"
+    
"\x02\x02\x61\x61\xc0\x0a\x01\x63\xc0\x03\x03\x61\x61\x61\xc0\x64\x02\x62\x62\xc0\x14\x02\x63\x63\xc0\x1e\x03\x62"
+    "\x62\x62"
+    
"\xc1\xc8\x00\x03\x63\x63\x63\xc1\x2c\x01\x03\x64\x64\x64\xc1\x90\x01\x03\x65\x65\x65\x0a\x35\x30\x30\x30\x30\x30"
+    "\x30\x30"
+    "\x30\x30\x01\x61\xc0\x01\xff";
+
+// hash-ziplist.rdb
+inline constexpr const char hash_ziplist_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x39\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x0b\x32\x35\x35\x2e\x32\x35\x35"
+    "\x2e\x32"
+    
"\x35\x35\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\xc8\x5c\x96\x60"
+    "\xfa\x08"
+    
"\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\x90\xad\x0c\x00\xfa\x0c\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0"
+    "\x00\xfe"
+    
"\x00\xfb\x01\x00\x0d\x04\x68\x61\x73\x68\x1b\x1b\x00\x00\x00\x16\x00\x00\x00\x04\x00\x00\x02\x66\x31\x04\x02\x76"
+    "\x31\x04"
+    "\x02\x66\x32\x04\x02\x76\x32\xff\xff\x4f\x9c\xd1\xfd\x16\x69\x98\x83";
+
+// hash-zipmap.rdb
+inline constexpr const char hash_zipmap_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x33\xfe\x00\x09\x04\x68\x61\x73\x68\x10\x02\x02\x66\x31\x02\x00\x76\x31\x02\x66"
+    "\x32\x02"
+    "\x00\x76\x32\xff\xff";
+
+// list-quicklist.rdb
+inline constexpr const char list_quicklist_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x38\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x05\x34\x2e\x30\x2e\x39\xfa\x0a"
+    "\x72\x65"
+    
"\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\x9f\x06\x26\x61\xfa\x08\x75\x73\x65\x64"
+    "\x2d\x6d"
+    
"\x65\x6d\xc2\x80\x92\x07\x00\xfa\x0c\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0\x00\xfe\x00\xfb\x02\x00"
+    "\x0e\x04"
+    
"\x6c\x69\x73\x74\x01\x0d\x0d\x00\x00\x00\x0a\x00\x00\x00\x01\x00\x00\xf8\xff\x00\x01\x78\xc0\x07\xff\x35\x72\xf8"
+    "\x54\x1a"
+    "\xc4\xd7\x40";
+
+// dumped from redis-server 7.0, sourced from the 'encodings.rdb' file.
+inline constexpr const char encodings_ver10_rdb_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x31\x30\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x05\x37\x2e\x30\x2e\x33\xfa\x0a"
+    "\x72\x65"
+    
"\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\x62\x65\x23\x65\xfa\x08\x75\x73\x65\x64"
+    "\x2d\x6d"
+    
"\x65\x6d\xc2\x28\x4f\x0e\x00\xfa\x08\x61\x6f\x66\x2d\x62\x61\x73\x65\xc0\x00\xfe\x00\xfb\x0d\x00\x11\x04\x7a\x73"
+    "\x65\x74"
+    
"\x40\x64\x64\x00\x00\x00\x18\x00\x81\x61\x02\x01\x01\x81\x62\x02\x02\x01\x81\x63\x02\x03\x01\x82\x61\x61\x03\x0a"
+    "\x01\x82"
+    
"\x62\x62\x03\x14\x01\x82\x63\x63\x03\x1e\x01\x83\x61\x61\x61\x04\x64\x01\x83\x62\x62\x62\x04\xc0\xc8\x02\x83\x63"
+    "\x63\x63"
+    
"\x04\xc1\x2c\x02\x84\x61\x61\x61\x61\x05\xc3\xe8\x02\x84\x63\x63\x63\x63\x05\xf3\x15\xcd\x5b\x07\x05\x84\x62\x62"
+    "\x62\x62"
+    
"\x05\xf4\x00\xf2\x05\x2a\x01\x00\x00\x00\x09\xff\x11\x0b\x7a\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x16\x16\x00"
+    "\x00\x00"
+    
"\x06\x00\x81\x61\x02\x01\x01\x81\x62\x02\x02\x01\x81\x63\x02\x03\x01\xff\x10\x04\x68\x61\x73\x68\x40\x56\x56\x00"
+    "\x00\x00"
+    
"\x16\x00\x81\x62\x02\x02\x01\x82\x61\x61\x03\x0a\x01\x81\x63\x02\x03\x01\x83\x61\x61\x61\x04\x64\x01\x82\x62\x62"
+    "\x03\x14"
+    
"\x01\x82\x63\x63\x03\x1e\x01\x83\x62\x62\x62\x04\xc0\xc8\x02\x83\x63\x63\x63\x04\xc1\x2c\x02\x83\x64\x64\x64\x04"
+    "\xc1\x90"
+    
"\x02\x83\x65\x65\x65\x04\xf4\x00\xf2\x05\x2a\x01\x00\x00\x00\x09\x81\x61\x02\x01\x01\xff\x0b\x0c\x73\x65\x74\x5f"
+    "\x7a\x69"
+    
"\x70\x70\x65\x64\x5f\x32\x18\x04\x00\x00\x00\x04\x00\x00\x00\xa0\x86\x01\x00\x40\x0d\x03\x00\xe0\x93\x04\x00\x80"
+    "\x1a\x06"
+    
"\x00\x12\x04\x6c\x69\x73\x74\x01\x02\xc3\x2b\x40\x61\x1f\x61\x00\x00\x00\x18\x00\x01\x01\x02\x01\x03\x01\x81\x61"
+    "\x02\x81"
+    
"\x62\x02\x81\x63\x02\xf2\xa0\x86\x01\x04\xf4\x00\xbc\xa0\x65\x01\x20\x1e\x00\x09\xe0\x32\x1d\x01\x09\xff\x02\x03"
+    "\x73\x65"
+    
"\x74\x08\xc0\x02\xc0\x01\xc2\xa0\x86\x01\x00\x01\x62\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\xc0\x03\x01\x61"
+    "\x01\x63"
+    
"\x00\x06\x6e\x75\x6d\x62\x65\x72\xc0\x0a\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x31\x10\x02\x00\x00"
+    "\x00\x04"
+    
"\x00\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x10\x0b\x68\x61\x73\x68\x5f\x7a\x69\x70\x70\x65\x64\x16\x16\x00\x00"
+    "\x00\x06"
+    
"\x00\x81\x61\x02\x01\x01\x81\x62\x02\x02\x01\x81\x63\x02\x03\x01\xff\x00\x0c\x63\x6f\x6d\x70\x72\x65\x73\x73\x69"
+    "\x62\x6c"
+    
"\x65\xc3\x09\x40\x89\x01\x61\x61\xe0\x7c\x00\x01\x61\x61\x00\x06\x73\x74\x72\x69\x6e\x67\x0b\x48\x65\x6c\x6c\x6f"
+    "\x20\x57"
+    
"\x6f\x72\x6c\x64\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x33\x38\x08\x00\x00\x00\x06\x00\x00\x00\x00"
+    "\xca\x9a"
+    
"\x3b\x00\x00\x00\x00\x00\x94\x35\x77\x00\x00\x00\x00\x00\x5e\xd0\xb2\x00\x00\x00\x00\x00\x28\x6b\xee\x00\x00\x00"
+    "\x00\x00"
+    
"\xf2\x05\x2a\x01\x00\x00\x00\x00\xbc\xa0\x65\x01\x00\x00\x00\x12\x0b\x6c\x69\x73\x74\x5f\x7a\x69\x70\x70\x65\x64"
+    "\x01\x02"
+    
"\x25\x25\x00\x00\x00\x08\x00\x01\x01\x02\x01\x03\x01\x81\x61\x02\x81\x62\x02\x81\x63\x02\xf2\xa0\x86\x01\x04\xf4"
+    "\x00\xbc"
+    "\xa0\x65\x01\x00\x00\x00\x09\xff\xff\x58\xe7\x62\x56\x52\x9b\xdf\x6c";
+
+class ScopedTestRDBFile {
+ public:
+  ScopedTestRDBFile(const std::string &name, const char *data, size_t len) : 
name_(name) {
+    std::ofstream out_file(name, std::ios::out | std::ios::binary);
+    if (!out_file) {
+      EXPECT_TRUE(false);
+    }
+
+    out_file.write(data, static_cast<std::streamsize>(len));
+    if (!out_file) {
+      EXPECT_TRUE(false);
+    }
+    out_file.close();
+  }
+
+  ScopedTestRDBFile(const ScopedTestRDBFile &) = delete;
+  ScopedTestRDBFile &operator=(const ScopedTestRDBFile &) = delete;
+
+  ~ScopedTestRDBFile() { std::remove(name_.c_str()); }
+
+ private:
+  std::string name_;
+};
diff --git a/tests/cppunit/test_base.h b/tests/cppunit/test_base.h
index 043b9329..5d7dbe76 100644
--- a/tests/cppunit/test_base.h
+++ b/tests/cppunit/test_base.h
@@ -59,6 +59,8 @@ class TestBase : public testing::Test {  // NOLINT
     if (ec) {
       std::cout << "Encounter filesystem error: " << ec << std::endl;
     }
+    const char *path = "test.conf";
+    unlink(path);
   }
 
   engine::Storage *storage_;

Reply via email to