git-hulk commented on code in PR #1798:
URL: https://github.com/apache/kvrocks/pull/1798#discussion_r1349893242


##########
src/storage/rdb.h:
##########
@@ -47,26 +53,31 @@ constexpr const int RDBTypeZSetListPack = 17;
 constexpr const int RDBTypeListQuickList2 = 18;
 constexpr const int RDBTypeStreamListPack2 = 19;
 constexpr const int RDBTypeSetListPack = 20;
-// NOTE: when adding new Redis object encoding type, update LoadObjectType.
+// NOTE: when adding new Redis object encoding type, update isObjectType.
 
 // Quick list node encoding
 constexpr const int QuickListNodeContainerPlain = 1;
 constexpr const int QuickListNodeContainerPacked = 2;
 
+class RdbStream;
+
+using RedisObjValue =
+    std::variant<std::string, std::vector<std::string>, 
std::vector<MemberScore>, std::map<std::string, std::string>>;
+
 class RDB {
  public:
-  explicit RDB(engine::Storage *storage, std::string ns, std::string_view 
input)
-      : storage_(storage), ns_(std::move(ns)), input_(input){};
+  explicit RDB(engine::Storage *storage, Config *config, std::string ns, 
std::shared_ptr<RdbStream> stream)

Review Comment:
   Can get config via storage's GetConfig.



##########
src/storage/rdb.cc:
##########
@@ -356,17 +378,28 @@ StatusOr<std::vector<MemberScore>> 
RDB::LoadZSetWithZipList() {
   return zset;
 }
 
-Status RDB::Restore(const std::string &key, uint64_t ttl_ms) {
+Status RDB::Restore(const std::string &key, std::string_view payload, uint64_t 
ttl_ms) {
   rocksdb::Status db_status;
 
   // Check the checksum of the payload
-  GET_OR_RET(VerifyPayloadChecksum());
+  GET_OR_RET(VerifyPayloadChecksum(payload));
 
   auto type = GET_OR_RET(LoadObjectType());
+
+  auto value = GET_OR_RET(loadRdbObject(type, key));
+
+  return saveRdbObject(type, key, value, ttl_ms);  // NOLINT

Review Comment:
   Why do we need to add NOLINT for this line?



##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
   }
   return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr, 
db_status.ToString()};
 }
+
+StatusOr<uint32_t> RDB::loadTime() {
+  uint32_t t32 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+  return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+  uint64_t t64 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+  /* before Redis 5 (RDB version 9), the function
+   * failed to convert data to/from little endian, so RDB files with keys 
having
+   * expires could not be shared between big endian and little endian systems
+   * (because the expire time will be totally wrong). comment from src/rdb.c: 
rdbLoadMillisecondTime*/
+  if (rdb_version >= 9) {
+    memrev64ifbe(&t64);
+  }
+  return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+  if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+    return vec_str_ptr->size() == 0;
+  }
+  if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+    return vec_mem_ptr->size() == 0;
+  }
+  if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+    return map_ptr->size() == 0;
+  }
+
+  return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+  char buf[1024] = {0};
+  GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+  buf[9] = '\0';
+
+  if (memcmp(buf, "REDIS", 5) != 0) {
+    LOG(WARNING) << "Wrong signature trying to load DB from file";
+    return {Status::NotOK};

Review Comment:
   Can we bring the error message into Status?



##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
   }
   return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr, 
db_status.ToString()};
 }
+
+StatusOr<uint32_t> RDB::loadTime() {
+  uint32_t t32 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+  return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+  uint64_t t64 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+  /* before Redis 5 (RDB version 9), the function
+   * failed to convert data to/from little endian, so RDB files with keys 
having
+   * expires could not be shared between big endian and little endian systems
+   * (because the expire time will be totally wrong). comment from src/rdb.c: 
rdbLoadMillisecondTime*/
+  if (rdb_version >= 9) {
+    memrev64ifbe(&t64);
+  }
+  return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+  if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+    return vec_str_ptr->size() == 0;
+  }
+  if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+    return vec_mem_ptr->size() == 0;
+  }
+  if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+    return map_ptr->size() == 0;
+  }
+
+  return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+  char buf[1024] = {0};
+  GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+  buf[9] = '\0';
+
+  if (memcmp(buf, "REDIS", 5) != 0) {
+    LOG(WARNING) << "Wrong signature trying to load DB from file";
+    return {Status::NotOK};
+  }
+
+  auto rdb_ver = std::atoi(buf + 5);
+  if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+    LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+    return {Status::NotOK};
+  }
+
+  uint64_t expire_time = 0;
+  int64_t expire_keys = 0;
+  int64_t load_keys = 0;
+  int64_t empty_keys_skipped = 0;
+  auto now = util::GetTimeStampMS();
+  uint32_t db_id = 0;
+  uint64_t skip_exist_keys = 0;
+  while (true) {
+    auto type = GET_OR_RETWITHLOG(loadRdbType());
+    if (type == RDBOpcodeExpireTime) {
+      expire_time = static_cast<uint64_t>(GET_OR_RETWITHLOG(loadTime()));
+      expire_time *= 1000;
+      continue;
+    } else if (type == RDBOpcodeExpireTimeMs) {
+      expire_time = GET_OR_RETWITHLOG(loadMillisecondTime(rdb_ver));
+      continue;
+    } else if (type == RDBOpcodeFreq) {        // LFU frequency: not use in 
kvrocks
+      GET_OR_RETWITHLOG(stream_->ReadByte());  // discard the value
+      continue;
+    } else if (type == RDBOpcodeIdle) {  // LRU idle time: not use in kvrocks
+      uint64_t discard = 0;
+      GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&discard), 8));
+      continue;
+    } else if (type == RDBOpcodeEof) {
+      break;
+    } else if (type == RDBOpcodeSelectDB) {
+      db_id = GET_OR_RETWITHLOG(loadObjectLen(nullptr));
+      continue;
+    } else if (type == RDBOpcodeResizeDB) {       // not use in kvrocks, hint 
redis for hash table resize
+      GET_OR_RETWITHLOG(loadObjectLen(nullptr));  // db_size
+      GET_OR_RETWITHLOG(loadObjectLen(nullptr));  // expires_size
+      continue;
+    } else if (type == RDBOpcodeAux) {
+      /* AUX: generic string-string fields. Use to add state to RDB
+       * which is backward compatible. Implementations of RDB loading
+       * are required to skip AUX fields they don't understand.
+       *
+       * An AUX field is composed of two strings: key and value. */
+      auto key = GET_OR_RETWITHLOG(LoadStringObject());
+      auto value = GET_OR_RETWITHLOG(LoadStringObject());
+      continue;
+    } else if (type == RDBOpcodeModuleAux) {
+      LOG(WARNING) << "RDB module not supported";
+      return {Status::NotOK, "RDB module not supported"};
+    } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+      LOG(WARNING) << "RDB function not supported";
+      return {Status::NotOK, "RDB function not supported"};
+    } else {
+      if (!isObjectType(type)) {
+        LOG(WARNING) << "Invalid or Not supported object type: " << type;
+        return {Status::NotOK, "Invalid or Not supported object type"};
+      }
+    }
+
+    auto key = GET_OR_RETWITHLOG(LoadStringObject());
+    auto value = GET_OR_RETWITHLOG(loadRdbObject(type, key));
+
+    if (db_index != db_id) {  // skip db not match
+      continue;
+    }
+
+    if (isEmptyRedisObject(value)) {  // compatible with empty value
+      /* Since we used to have bug that could lead to empty keys
+       * (See #8453), we rather not fail when empty key is encountered
+       * in an RDB file, instead we will silently discard it and
+       * continue loading. */
+      if (empty_keys_skipped++ < 10) {
+        LOG(WARNING) << "skipping empty key: " << key;
+      }
+      continue;
+    } else if (expire_time != 0 &&
+               expire_time < now) {  // in redis this used to feed this 
deletion to any connected replicas
+      expire_keys++;
+      continue;
+    }
+
+    if (is_nx) {  // only load not exist key
+      auto s = exist(key);
+      if (!s.IsNotFound()) {
+        skip_exist_keys++;  // skip it even it's not okay
+        if (!s.ok()) {
+          LOG(ERROR) << "check key " << key << " exist failed: " << 
s.ToString();
+        }
+        continue;
+      }
+    }
+
+    auto ret = saveRdbObject(type, key, value, expire_time);
+    if (!ret.IsOK()) {
+      LOG(WARNING) << "save rdb object key " << key << " failed: " << 
ret.Msg();
+    } else {
+      load_keys++;
+    }
+  }
+
+  // Verify the checksum if RDB version is >= 5
+  if (rdb_ver >= 5) {
+    uint64_t chk_sum = 0;
+    auto expected = GET_OR_RETWITHLOG(stream_->GetCheckSum());
+    GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&chk_sum), 8));
+    if (chk_sum == 0) {
+      LOG(WARNING) << "RDB file was saved with checksum disabled: no check 
performed.";
+    } else if (chk_sum != expected) {
+      LOG(WARNING) << "Wrong RDB checksum expected: " << chk_sum << " got: " 
<< expected;
+      return {Status::NotOK, "Wrong RDB checksum"};
+    }
+  }
+
+  std::string skip_info = (is_nx ? ", exist keys skipped: " + 
std::to_string(skip_exist_keys) : "");
+
+  if (empty_keys_skipped > 0) {
+    LOG(INFO) << "Done loading RDB,  keys loaded: " << load_keys << ", keys 
expired:" << expire_keys
+              << ", empty keys skipped: " << empty_keys_skipped << skip_info;
+  } else {
+    LOG(INFO) << "Done loading RDB,  keys loaded: " << load_keys << ", keys 
expired:" << expire_keys << skip_info;
+  }
+
+  return Status::OK();
+}
+
+rocksdb::Status RDB::exist(const std::string &key) {
+  int cnt = 0;
+  std::vector<rocksdb::Slice> keys;
+  keys.emplace_back(key);
+  redis::Database redis(storage_, ns_);
+  auto s = redis.Exists(keys, &cnt);
+  if (!s.ok()) {
+    return s;
+  }
+  if (cnt == 0) {
+    return rocksdb::Status::NotFound();
+  }
+  return rocksdb::Status::OK();
+}

Review Comment:
   ```suggestion
   }
   
   ```



##########
tests/cppunit/rdb_util.h:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#pragma once
+
+#include <gtest/gtest.h>
+
+#include <fstream>
+#include <string>
+
+// RDB test data, copy from Redis's tests/asset/*rdb, not shellcode.
+
+// zset-ziplist.rdb
+inline constexpr const char zset_ziplist_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x31\x30\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x0b\x32\x35\x35\x2e"
+    
"\x32\x35\x35\x2e\x32\x35\x35\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74"
+    
"\x69\x6d\x65\xc2\x62\xb7\x13\x61\xfa\x08\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\x50\xf4\x0c\x00\xfa\x0c"
+    
"\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0\x00\xfe\x00\xfb\x01\x00\x0c\x04\x7a\x73\x65\x74"
+    
"\x19\x19\x00\x00\x00\x16\x00\x00\x00\x04\x00\x00\x03\x6f\x6e\x65\x05\xf2\x02\x03\x74\x77\x6f\x05\xf3"
+    "\xff\xff\x1f\xb2\xfd\xf0\x99\x7f\x9e\x19";
+
+// corrupt_empty_keys.rdb
+inline constexpr const char corrupt_empty_keys_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x31\x30\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x0b\x32\x35\x35\x2e"
+    
"\x32\x35\x35\x2e\x32\x35\x35\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74"
+    
"\x69\x6d\x65\xc2\x7a\x18\x15\x61\xfa\x08\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\x80\x31\x10\x00\xfa\x0c"
+    
"\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0\x00\xfe\x00\xfb\x09\x00\x02\x03\x73\x65\x74\x00"
+    
"\x04\x04\x68\x61\x73\x68\x00\x0a\x0c\x6c\x69\x73\x74\x5f\x7a\x69\x70\x6c\x69\x73\x74\x0b\x0b\x00\x00"
+    
"\x00\x0a\x00\x00\x00\x00\x00\xff\x05\x04\x7a\x73\x65\x74\x00\x11\x0d\x7a\x73\x65\x74\x5f\x6c\x69\x73"
+    
"\x74\x70\x61\x63\x6b\x07\x07\x00\x00\x00\x00\x00\xff\x10\x0c\x68\x61\x73\x68\x5f\x7a\x69\x70\x6c\x69"
+    
"\x73\x74\x07\x07\x00\x00\x00\x00\x00\xff\x0e\x0e\x6c\x69\x73\x74\x5f\x71\x75\x69\x63\x6b\x6c\x69\x73"
+    
"\x74\x00\x0c\x0c\x7a\x73\x65\x74\x5f\x7a\x69\x70\x6c\x69\x73\x74\x0b\x0b\x00\x00\x00\x0a\x00\x00\x00"
+    
"\x00\x00\xff\x0e\x1c\x6c\x69\x73\x74\x5f\x71\x75\x69\x63\x6b\x6c\x69\x73\x74\x5f\x65\x6d\x70\x74\x79"
+    
"\x5f\x7a\x69\x70\x6c\x69\x73\x74\x01\x0b\x0b\x00\x00\x00\x0a\x00\x00\x00\x00\x00\xff\xff\xf0\xf5\x06"
+    "\xdd\xc6\x6e\x61\x83";
+
+// encodings.rdb
+inline constexpr const char encodings_rdb_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x34\xfe\x00\x03\x04\x7a\x73\x65\x74\x0c\x02\x62\x62\x02\x32\x30\x02\x63\x63\x02"
+    "\x33\x30"
+    
"\x03\x62\x62\x62\x03\x32\x30\x30\x04\x62\x62\x62\x62\x0a\x35\x30\x30\x30\x30\x30\x30\x30\x30\x30\x03\x63\x63\x63"
+    "\x03\x33"
+    
"\x30\x30\x04\x63\x63\x63\x63\x09\x31\x32\x33\x34\x35\x36\x37\x38\x39\x01\x61\x01\x31\x02\x61\x61\x02\x31\x30\x01"
+    "\x62\x01"
+    
"\x32\x03\x61\x61\x61\x03\x31\x30\x30\x01\x63\x01\x33\x04\x61\x61\x61\x61\x04\x31\x30\x30\x30\x0b\x0c\x73\x65\x74"
+    "\x5f\x7a"
+    
"\x69\x70\x70\x65\x64\x5f\x31\x10\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x0a\x0b\x6c\x69"
+    "\x73\x74"
+    
"\x5f\x7a\x69\x70\x70\x65\x64\x30\x30\x00\x00\x00\x25\x00\x00\x00\x08\x00\x00\xc0\x01\x00\x04\xc0\x02\x00\x04\xc0"
+    "\x03\x00"
+    
"\x04\x01\x61\x03\x01\x62\x03\x01\x63\x03\xd0\xa0\x86\x01\x00\x06\xe0\x00\xbc\xa0\x65\x01\x00\x00\x00\xff\x00\x06"
+    "\x73\x74"
+    
"\x72\x69\x6e\x67\x0b\x48\x65\x6c\x6c\x6f\x20\x57\x6f\x72\x6c\x64\x00\x0c\x63\x6f\x6d\x70\x72\x65\x73\x73\x69\x62"
+    "\x6c\x65"
+    
"\xc3\x09\x40\x89\x01\x61\x61\xe0\x7c\x00\x01\x61\x61\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x32\x18"
+    "\x04\x00"
+    
"\x00\x00\x04\x00\x00\x00\xa0\x86\x01\x00\x40\x0d\x03\x00\xe0\x93\x04\x00\x80\x1a\x06\x00\x00\x06\x6e\x75\x6d\x62"
+    "\x65\x72"
+    
"\xc0\x0a\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x33\x38\x08\x00\x00\x00\x06\x00\x00\x00\x00\xca\x9a"
+    "\x3b\x00"
+    
"\x00\x00\x00\x00\x94\x35\x77\x00\x00\x00\x00\x00\x5e\xd0\xb2\x00\x00\x00\x00\x00\x28\x6b\xee\x00\x00\x00\x00\x00"
+    "\xf2\x05"
+    
"\x2a\x01\x00\x00\x00\x00\xbc\xa0\x65\x01\x00\x00\x00\x02\x03\x73\x65\x74\x08\x0a\x36\x30\x30\x30\x30\x30\x30\x30"
+    "\x30\x30"
+    
"\xc2\xa0\x86\x01\x00\x01\x61\xc0\x01\x01\x62\xc0\x02\x01\x63\xc0\x03\x01\x04\x6c\x69\x73\x74\x18\xc0\x01\xc0\x02"
+    "\xc0\x03"
+    
"\x01\x61\x01\x62\x01\x63\xc2\xa0\x86\x01\x00\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\xc0\x01\xc0\x02\xc0\x03"
+    "\x01\x61"
+    
"\x01\x62\x01\x63\xc2\xa0\x86\x01\x00\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\xc0\x01\xc0\x02\xc0\x03\x01\x61"
+    "\x01\x62"
+    
"\x01\x63\xc2\xa0\x86\x01\x00\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\x0d\x0b\x68\x61\x73\x68\x5f\x7a\x69\x70"
+    "\x70\x65"
+    
"\x64\x20\x20\x00\x00\x00\x1b\x00\x00\x00\x06\x00\x00\x01\x61\x03\xc0\x01\x00\x04\x01\x62\x03\xc0\x02\x00\x04\x01"
+    "\x63\x03"
+    
"\xc0\x03\x00\xff\x0c\x0b\x7a\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x20\x20\x00\x00\x00\x1b\x00\x00\x00\x06\x00"
+    "\x00\x01"
+    
"\x61\x03\xc0\x01\x00\x04\x01\x62\x03\xc0\x02\x00\x04\x01\x63\x03\xc0\x03\x00\xff\x04\x04\x68\x61\x73\x68\x0b\x01"
+    "\x62\xc0"
+    
"\x02\x02\x61\x61\xc0\x0a\x01\x63\xc0\x03\x03\x61\x61\x61\xc0\x64\x02\x62\x62\xc0\x14\x02\x63\x63\xc0\x1e\x03\x62"
+    "\x62\x62"
+    
"\xc1\xc8\x00\x03\x63\x63\x63\xc1\x2c\x01\x03\x64\x64\x64\xc1\x90\x01\x03\x65\x65\x65\x0a\x35\x30\x30\x30\x30\x30"
+    "\x30\x30"
+    "\x30\x30\x01\x61\xc0\x01\xff";
+
+// hash-ziplist.rdb
+inline constexpr const char hash_ziplist_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x39\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x0b\x32\x35\x35\x2e\x32\x35\x35"
+    "\x2e\x32"
+    
"\x35\x35\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\xc8\x5c\x96\x60"
+    "\xfa\x08"
+    
"\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\x90\xad\x0c\x00\xfa\x0c\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0"
+    "\x00\xfe"
+    
"\x00\xfb\x01\x00\x0d\x04\x68\x61\x73\x68\x1b\x1b\x00\x00\x00\x16\x00\x00\x00\x04\x00\x00\x02\x66\x31\x04\x02\x76"
+    "\x31\x04"
+    "\x02\x66\x32\x04\x02\x76\x32\xff\xff\x4f\x9c\xd1\xfd\x16\x69\x98\x83";
+
+// hash-zipmap.rdb
+inline constexpr const char hash_zipmap_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x33\xfe\x00\x09\x04\x68\x61\x73\x68\x10\x02\x02\x66\x31\x02\x00\x76\x31\x02\x66"
+    "\x32\x02"
+    "\x00\x76\x32\xff\xff";
+
+// list-quicklist.rdb
+inline constexpr const char list_quicklist_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x30\x38\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x05\x34\x2e\x30\x2e\x39\xfa\x0a"
+    "\x72\x65"
+    
"\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\x9f\x06\x26\x61\xfa\x08\x75\x73\x65\x64"
+    "\x2d\x6d"
+    
"\x65\x6d\xc2\x80\x92\x07\x00\xfa\x0c\x61\x6f\x66\x2d\x70\x72\x65\x61\x6d\x62\x6c\x65\xc0\x00\xfe\x00\xfb\x02\x00"
+    "\x0e\x04"
+    
"\x6c\x69\x73\x74\x01\x0d\x0d\x00\x00\x00\x0a\x00\x00\x00\x01\x00\x00\xf8\xff\x00\x01\x78\xc0\x07\xff\x35\x72\xf8"
+    "\x54\x1a"
+    "\xc4\xd7\x40";
+
+// dumped from redis-server 7.0, sourced from the 'encodings.rdb' file.
+inline constexpr const char encodings_ver10_rdb_payload[] =
+    
"\x52\x45\x44\x49\x53\x30\x30\x31\x30\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x05\x37\x2e\x30\x2e\x33\xfa\x0a"
+    "\x72\x65"
+    
"\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\x62\x65\x23\x65\xfa\x08\x75\x73\x65\x64"
+    "\x2d\x6d"
+    
"\x65\x6d\xc2\x28\x4f\x0e\x00\xfa\x08\x61\x6f\x66\x2d\x62\x61\x73\x65\xc0\x00\xfe\x00\xfb\x0d\x00\x11\x04\x7a\x73"
+    "\x65\x74"
+    
"\x40\x64\x64\x00\x00\x00\x18\x00\x81\x61\x02\x01\x01\x81\x62\x02\x02\x01\x81\x63\x02\x03\x01\x82\x61\x61\x03\x0a"
+    "\x01\x82"
+    
"\x62\x62\x03\x14\x01\x82\x63\x63\x03\x1e\x01\x83\x61\x61\x61\x04\x64\x01\x83\x62\x62\x62\x04\xc0\xc8\x02\x83\x63"
+    "\x63\x63"
+    
"\x04\xc1\x2c\x02\x84\x61\x61\x61\x61\x05\xc3\xe8\x02\x84\x63\x63\x63\x63\x05\xf3\x15\xcd\x5b\x07\x05\x84\x62\x62"
+    "\x62\x62"
+    
"\x05\xf4\x00\xf2\x05\x2a\x01\x00\x00\x00\x09\xff\x11\x0b\x7a\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x16\x16\x00"
+    "\x00\x00"
+    
"\x06\x00\x81\x61\x02\x01\x01\x81\x62\x02\x02\x01\x81\x63\x02\x03\x01\xff\x10\x04\x68\x61\x73\x68\x40\x56\x56\x00"
+    "\x00\x00"
+    
"\x16\x00\x81\x62\x02\x02\x01\x82\x61\x61\x03\x0a\x01\x81\x63\x02\x03\x01\x83\x61\x61\x61\x04\x64\x01\x82\x62\x62"
+    "\x03\x14"
+    
"\x01\x82\x63\x63\x03\x1e\x01\x83\x62\x62\x62\x04\xc0\xc8\x02\x83\x63\x63\x63\x04\xc1\x2c\x02\x83\x64\x64\x64\x04"
+    "\xc1\x90"
+    
"\x02\x83\x65\x65\x65\x04\xf4\x00\xf2\x05\x2a\x01\x00\x00\x00\x09\x81\x61\x02\x01\x01\xff\x0b\x0c\x73\x65\x74\x5f"
+    "\x7a\x69"
+    
"\x70\x70\x65\x64\x5f\x32\x18\x04\x00\x00\x00\x04\x00\x00\x00\xa0\x86\x01\x00\x40\x0d\x03\x00\xe0\x93\x04\x00\x80"
+    "\x1a\x06"
+    
"\x00\x12\x04\x6c\x69\x73\x74\x01\x02\xc3\x2b\x40\x61\x1f\x61\x00\x00\x00\x18\x00\x01\x01\x02\x01\x03\x01\x81\x61"
+    "\x02\x81"
+    
"\x62\x02\x81\x63\x02\xf2\xa0\x86\x01\x04\xf4\x00\xbc\xa0\x65\x01\x20\x1e\x00\x09\xe0\x32\x1d\x01\x09\xff\x02\x03"
+    "\x73\x65"
+    
"\x74\x08\xc0\x02\xc0\x01\xc2\xa0\x86\x01\x00\x01\x62\x0a\x36\x30\x30\x30\x30\x30\x30\x30\x30\x30\xc0\x03\x01\x61"
+    "\x01\x63"
+    
"\x00\x06\x6e\x75\x6d\x62\x65\x72\xc0\x0a\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x31\x10\x02\x00\x00"
+    "\x00\x04"
+    
"\x00\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x10\x0b\x68\x61\x73\x68\x5f\x7a\x69\x70\x70\x65\x64\x16\x16\x00\x00"
+    "\x00\x06"
+    
"\x00\x81\x61\x02\x01\x01\x81\x62\x02\x02\x01\x81\x63\x02\x03\x01\xff\x00\x0c\x63\x6f\x6d\x70\x72\x65\x73\x73\x69"
+    "\x62\x6c"
+    
"\x65\xc3\x09\x40\x89\x01\x61\x61\xe0\x7c\x00\x01\x61\x61\x00\x06\x73\x74\x72\x69\x6e\x67\x0b\x48\x65\x6c\x6c\x6f"
+    "\x20\x57"
+    
"\x6f\x72\x6c\x64\x0b\x0c\x73\x65\x74\x5f\x7a\x69\x70\x70\x65\x64\x5f\x33\x38\x08\x00\x00\x00\x06\x00\x00\x00\x00"
+    "\xca\x9a"
+    
"\x3b\x00\x00\x00\x00\x00\x94\x35\x77\x00\x00\x00\x00\x00\x5e\xd0\xb2\x00\x00\x00\x00\x00\x28\x6b\xee\x00\x00\x00"
+    "\x00\x00"
+    
"\xf2\x05\x2a\x01\x00\x00\x00\x00\xbc\xa0\x65\x01\x00\x00\x00\x12\x0b\x6c\x69\x73\x74\x5f\x7a\x69\x70\x70\x65\x64"
+    "\x01\x02"
+    
"\x25\x25\x00\x00\x00\x08\x00\x01\x01\x02\x01\x03\x01\x81\x61\x02\x81\x62\x02\x81\x63\x02\xf2\xa0\x86\x01\x04\xf4"
+    "\x00\xbc"
+    "\xa0\x65\x01\x00\x00\x00\x09\xff\xff\x58\xe7\x62\x56\x52\x9b\xdf\x6c";
+
+class ScopedTestRDBFile {
+ public:
+  ScopedTestRDBFile(const std::string &name, const char *data, size_t len) : 
name_(name) {
+    std::ofstream out_file(name, std::ios::out | std::ios::binary);
+    if (!out_file) {
+      EXPECT_TRUE(false);
+    }
+
+    out_file.write(data, static_cast<std::streamsize>(len));
+    if (!out_file) {
+      EXPECT_TRUE(false);
+    }
+    out_file.close();
+  }
+
+  ScopedTestRDBFile(const ScopedTestRDBFile &) = delete;
+  ScopedTestRDBFile &operator=(const ScopedTestRDBFile &) = delete;
+
+  ~ScopedTestRDBFile() { std::remove(name_.c_str()); }
+
+ private:
+  std::string name_;
+};

Review Comment:
   ```suggestion
   };
   
   ```



##########
src/storage/rdb.cc:
##########
@@ -426,6 +443,50 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
     } else {
       elements = GET_OR_RET(LoadListWithQuickList(type));
     }
+    return elements;
+  } else {
+    return {Status::RedisParseErr, fmt::format("unsupported type: {}", type)};
+  }
+
+  // can't be here
+  return Status::OK();
+}

Review Comment:
   ```suggestion
     }
   
     return {Status::RedisParseErr, fmt::format("unsupported type: {}", type)};
   }
   ```



##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
   }
   return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr, 
db_status.ToString()};
 }
+
+StatusOr<uint32_t> RDB::loadTime() {
+  uint32_t t32 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+  return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+  uint64_t t64 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+  /* before Redis 5 (RDB version 9), the function
+   * failed to convert data to/from little endian, so RDB files with keys 
having
+   * expires could not be shared between big endian and little endian systems
+   * (because the expire time will be totally wrong). comment from src/rdb.c: 
rdbLoadMillisecondTime*/
+  if (rdb_version >= 9) {
+    memrev64ifbe(&t64);
+  }
+  return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+  if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+    return vec_str_ptr->size() == 0;
+  }
+  if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+    return vec_mem_ptr->size() == 0;
+  }
+  if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+    return map_ptr->size() == 0;
+  }
+
+  return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+  char buf[1024] = {0};
+  GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+  buf[9] = '\0';
+
+  if (memcmp(buf, "REDIS", 5) != 0) {
+    LOG(WARNING) << "Wrong signature trying to load DB from file";
+    return {Status::NotOK};
+  }
+
+  auto rdb_ver = std::atoi(buf + 5);
+  if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+    LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+    return {Status::NotOK};

Review Comment:
   The same as the above



##########
src/storage/rdb.h:
##########
@@ -87,16 +97,29 @@ class RDB {
   StatusOr<std::vector<std::string>> LoadListWithZipList();
   StatusOr<std::vector<std::string>> LoadListWithQuickList(int type);
 
+  // Load rdb
+  Status LoadRdb(uint32_t db_index, bool is_nx = false);
+
  private:
   engine::Storage *storage_;
   std::string ns_;
-  std::string_view input_;
-  size_t pos_ = 0;
+  std::shared_ptr<RdbStream> stream_;
 
   StatusOr<std::string> loadLzfString();
   StatusOr<std::string> loadEncodedString();
   StatusOr<uint64_t> loadObjectLen(bool *is_encoded);
-  Status peekOk(size_t n);
   StatusOr<double> loadBinaryDouble();
   StatusOr<double> loadDouble();
+
+  StatusOr<int> loadRdbType();
+  StatusOr<RedisObjValue> loadRdbObject(int rdbtype, const std::string &key);
+  Status saveRdbObject(int type, const std::string &key, const RedisObjValue 
&obj, uint64_t ttl_ms);
+  StatusOr<uint32_t> loadTime();
+  StatusOr<uint64_t> loadMillisecondTime(int rdb_version);
+  rocksdb::Status exist(const std::string &key);

Review Comment:
   exist => isKeyExisted



##########
tests/cppunit/rdb_test.cc:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "storage/rdb.h"
+
+#include <cmath>
+#include <filesystem>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/rdb_stream.h"
+#include "config/config.h"
+#include "rdb_util.h"
+#include "storage/storage.h"
+#include "test_base.h"
+#include "types/redis_hash.h"
+#include "types/redis_list.h"
+#include "types/redis_set.h"
+#include "types/redis_string.h"
+#include "types/redis_zset.h"
+#include "vendor/crc64.h"
+
+class RDBTest : public TestBase {
+ public:
+  RDBTest(const RDBTest &) = delete;
+  RDBTest &operator=(const RDBTest &) = delete;
+
+ protected:
+  explicit RDBTest() : ns_(kDefaultNamespace) {}
+  ~RDBTest() override = default;
+  void SetUp() override { crc64_init(); }
+
+  void TearDown() override { ASSERT_TRUE(clearDBDir(config_->db_dir)); }
+
+  void loadRdb(const std::string &path) {
+    auto stream_ptr = std::make_shared<RdbFileStream>(path);
+    auto s = stream_ptr->Open();
+    ASSERT_TRUE(s.IsOK());
+
+    RDB rdb(storage_, ns_, stream_ptr);
+    s = rdb.LoadRdb(0);
+    ASSERT_TRUE(s.IsOK());
+  }
+
+  void stringCheck(const std::string &key, const std::string &expect) {
+    redis::String string_db(storage_, ns_);
+    std::string value;
+    auto s = string_db.Get(key, &value);
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect == value);
+  }
+
+  void setCheck(const std::string &key, const std::vector<std::string> 
&expect) {
+    redis::Set set_db(storage_, ns_);
+    std::vector<std::string> members;
+    auto s = set_db.Members(key, &members);
+
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect == members);
+  }
+
+  void hashCheck(const std::string &key, const std::map<std::string, 
std::string> &expect) {
+    redis::Hash hash_db(storage_, ns_);
+    std::vector<FieldValue> field_values;
+    auto s = hash_db.GetAll(key, &field_values);
+    ASSERT_TRUE(s.ok());
+
+    // size check
+    ASSERT_TRUE(field_values.size() == expect.size());
+    for (const auto &p : field_values) {
+      auto iter = expect.find(p.field);
+      if (iter == expect.end()) {
+        ASSERT_TRUE(false);
+      }
+      ASSERT_TRUE(iter->second == p.value);
+    }
+  }
+
+  void listCheck(const std::string &key, const std::vector<std::string> 
&expect) {
+    redis::List list_db(storage_, ns_);
+    std::vector<std::string> values;
+    auto s = list_db.Range(key, 0, -1, &values);
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect == values);
+  }
+
+  void zsetCheck(const std::string &key, const std::vector<MemberScore> 
&expect) {
+    redis::ZSet zset_db(storage_, ns_);
+    std::vector<MemberScore> member_scores;
+
+    RangeRankSpec spec;
+    auto s = zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+    ASSERT_TRUE(s.ok());
+    ASSERT_TRUE(expect.size() == member_scores.size());
+    for (size_t i = 0; i < expect.size(); ++i) {
+      ASSERT_TRUE(expect[i].member == member_scores[i].member);
+      ASSERT_TRUE(std::fabs(expect[i].score - member_scores[i].score) < 
0.000001);
+    }
+  }
+
+  rocksdb::Status exists(const std::string &key) {
+    int cnt = 0;
+    std::vector<rocksdb::Slice> keys;
+    keys.emplace_back(key);
+    redis::Database redis(storage_, ns_);
+    auto s = redis.Exists(keys, &cnt);
+    if (!s.ok()) {
+      return s;
+    }
+    if (cnt == 0) {
+      return rocksdb::Status::NotFound();
+    }
+    return rocksdb::Status::OK();
+  }
+
+  void flushDB() {
+    redis::Database redis(storage_, ns_);
+    auto s = redis.FlushDB();
+    ASSERT_TRUE(s.ok());
+  }
+
+  void encodingDataCheck();
+
+  std::string ns_;
+  std::string tmp_rdb_;
+
+ private:
+  static bool clearDBDir(const std::string &path) {
+    try {
+      std::filesystem::remove_all(path);
+    } catch (std::filesystem::filesystem_error &e) {
+      return false;
+    }
+    return true;
+  }
+};
+
+void RDBTest::encodingDataCheck() {
+  // string
+  stringCheck("compressible",
+              
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
+              "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
+  stringCheck("string", "Hello World");
+  stringCheck("number", "10");
+
+  // list
+  std::vector<std::string> list_expect = {"1", "2", "3", "a", "b", "c", 
"100000", "6000000000",
+                                          "1", "2", "3", "a", "b", "c", 
"100000", "6000000000",
+                                          "1", "2", "3", "a", "b", "c", 
"100000", "6000000000"};
+  listCheck("list", list_expect);
+
+  std::vector<std::string> list_zipped_expect = {"1", "2", "3", "a", "b", "c", 
"100000", "6000000000"};
+  listCheck("list_zipped", list_zipped_expect);
+
+  // set
+  std::vector<std::string> set_expect = {"1", "100000", "2", "3", 
"6000000000", "a", "b", "c"};
+  setCheck("set", set_expect);
+
+  std::vector<std::string> set_zipped_1_expect = {"1", "2", "3", "4"};
+  setCheck("set_zipped_1", set_zipped_1_expect);
+
+  std::vector<std::string> set_zipped_2_expect = {"100000", "200000", 
"300000", "400000"};
+  setCheck("set_zipped_2", set_zipped_2_expect);
+
+  std::vector<std::string> set_zipped_3_expect = {"1000000000", "2000000000", 
"3000000000",
+                                                  "4000000000", "5000000000", 
"6000000000"};
+  setCheck("set_zipped_3", set_zipped_3_expect);
+
+  // hash
+  std::map<std::string, std::string> hash_expect = {{"a", "1"},     {"aa", 
"10"},   {"aaa", "100"},       {"b", "2"},
+                                                    {"bb", "20"},   {"bbb", 
"200"}, {"c", "3"},           {"cc", "30"},
+                                                    {"ccc", "300"}, {"ddd", 
"400"}, {"eee", "5000000000"}};
+  hashCheck("hash", hash_expect);
+
+  std::map<std::string, std::string> hash_zipped_expect = {
+      {"a", "1"},
+      {"b", "2"},
+      {"c", "3"},
+  };
+  hashCheck("hash_zipped", hash_zipped_expect);
+
+  // zset
+  std::vector<MemberScore> zset_expect = {
+      {"a", 1},     {"b", 2},     {"c", 3},     {"aa", 10},     {"bb", 20},    
      {"cc", 30},
+      {"aaa", 100}, {"bbb", 200}, {"ccc", 300}, {"aaaa", 1000}, {"cccc", 
123456789}, {"bbbb", 5000000000}};
+  zsetCheck("zset", zset_expect);
+
+  std::vector<MemberScore> zset_zipped_expect = {
+      {"a", 1},
+      {"b", 2},
+      {"c", 3},
+  };
+  zsetCheck("zset_zipped", zset_zipped_expect);
+}
+
+std::string ConvertToString(const char *data, size_t len) { return {data, data 
+ len}; }
+
+TEST_F(RDBTest, LoadEncodings) {
+  std::map<std::string, std::string> data;
+  data.insert({"encodings.rdb", ConvertToString(encodings_rdb_payload, 
sizeof(encodings_rdb_payload) - 1)});
+  data.insert(
+      {"encodings_ver10.rdb", ConvertToString(encodings_ver10_rdb_payload, 
sizeof(encodings_ver10_rdb_payload) - 1)});
+  for (const auto &kv : data) {
+    tmp_rdb_ = kv.first;
+    ScopedTestRDBFile temp(tmp_rdb_, kv.second.data(), kv.second.size());
+    loadRdb(tmp_rdb_);
+    encodingDataCheck();
+    flushDB();
+  }
+}
+
+TEST_F(RDBTest, LoadHashZipMap) {
+  tmp_rdb_ = "hash-zipmap.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, hash_zipmap_payload, 
sizeof(hash_zipmap_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // hash
+  std::map<std::string, std::string> hash_expect = {
+      {"f1", "v1"},
+      {"f2", "v2"},
+  };
+  hashCheck("hash", hash_expect);
+}
+
+TEST_F(RDBTest, LoadHashZipList) {
+  tmp_rdb_ = "hash-ziplist.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, hash_ziplist_payload, 
sizeof(hash_ziplist_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // hash
+  std::map<std::string, std::string> hash_expect = {
+      {"f1", "v1"},
+      {"f2", "v2"},
+  };
+  hashCheck("hash", hash_expect);
+}
+
+TEST_F(RDBTest, LoadListQuickList) {
+  tmp_rdb_ = "list-quicklist.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, list_quicklist_payload, 
sizeof(list_quicklist_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // list
+  std::vector<std::string> list_expect = {"7"};
+  listCheck("list", list_expect);
+}
+
+TEST_F(RDBTest, LoadZSetZipList) {
+  tmp_rdb_ = "zset-ziplist.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, zset_ziplist_payload, 
sizeof(zset_ziplist_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  // zset
+  std::vector<MemberScore> zset_expect = {
+      {"one", 1},
+      {"two", 2},
+  };
+  zsetCheck("zset", zset_expect);
+}
+
+TEST_F(RDBTest, LoadEmptyKeys) {
+  tmp_rdb_ = "corrupt_empty_keys.rdb";
+  ScopedTestRDBFile temp(tmp_rdb_, corrupt_empty_keys_payload, 
sizeof(corrupt_empty_keys_payload) - 1);
+  loadRdb(tmp_rdb_);
+
+  /* corrupt_empty_keys.rdb contains 9 keys with empty value:
+   "set"  "hash" "list_ziplist" "zset" "zset_listpack" "hash_ziplist" 
"list_quicklist" "zset_ziplist"
+   "list_quicklist_empty_ziplist"
+  */
+
+  // string
+  rocksdb::Status s = exists("empty_string");  // empty_string not exist in 
rdb file
+  ASSERT_TRUE(s.IsNotFound());
+
+  // list
+  s = exists("list_ziplist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = exists("list_quicklist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = exists("list_quicklist_empty_ziplist");
+
+  // set
+  s = exists("set");
+  ASSERT_TRUE(s.IsNotFound());
+
+  // hash
+  s = exists("hash");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = exists("hash_ziplist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  // zset
+  s = exists("zset");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = exists("zset_ziplist");
+  ASSERT_TRUE(s.IsNotFound());
+
+  s = exists("zset_listpack");
+  ASSERT_TRUE(s.IsNotFound());
+}

Review Comment:
   ```suggestion
   }
   
   ```



##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t 
ttl_ms) {
   }
   return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr, 
db_status.ToString()};
 }
+
+StatusOr<uint32_t> RDB::loadTime() {
+  uint32_t t32 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+  return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+  uint64_t t64 = 0;
+  GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+  /* before Redis 5 (RDB version 9), the function
+   * failed to convert data to/from little endian, so RDB files with keys 
having
+   * expires could not be shared between big endian and little endian systems
+   * (because the expire time will be totally wrong). comment from src/rdb.c: 
rdbLoadMillisecondTime*/
+  if (rdb_version >= 9) {
+    memrev64ifbe(&t64);
+  }
+  return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+  if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+    return vec_str_ptr->size() == 0;
+  }
+  if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+    return vec_mem_ptr->size() == 0;
+  }
+  if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+    return map_ptr->size() == 0;
+  }
+
+  return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+  char buf[1024] = {0};
+  GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+  buf[9] = '\0';
+
+  if (memcmp(buf, "REDIS", 5) != 0) {
+    LOG(WARNING) << "Wrong signature trying to load DB from file";
+    return {Status::NotOK};
+  }
+
+  auto rdb_ver = std::atoi(buf + 5);
+  if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+    LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+    return {Status::NotOK};
+  }
+
+  uint64_t expire_time = 0;
+  int64_t expire_keys = 0;
+  int64_t load_keys = 0;
+  int64_t empty_keys_skipped = 0;
+  auto now = util::GetTimeStampMS();
+  uint32_t db_id = 0;
+  uint64_t skip_exist_keys = 0;
+  while (true) {
+    auto type = GET_OR_RETWITHLOG(loadRdbType());
+    if (type == RDBOpcodeExpireTime) {
+      expire_time = static_cast<uint64_t>(GET_OR_RETWITHLOG(loadTime()));
+      expire_time *= 1000;
+      continue;
+    } else if (type == RDBOpcodeExpireTimeMs) {
+      expire_time = GET_OR_RETWITHLOG(loadMillisecondTime(rdb_ver));
+      continue;
+    } else if (type == RDBOpcodeFreq) {        // LFU frequency: not use in 
kvrocks
+      GET_OR_RETWITHLOG(stream_->ReadByte());  // discard the value
+      continue;
+    } else if (type == RDBOpcodeIdle) {  // LRU idle time: not use in kvrocks
+      uint64_t discard = 0;
+      GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&discard), 8));
+      continue;
+    } else if (type == RDBOpcodeEof) {
+      break;
+    } else if (type == RDBOpcodeSelectDB) {
+      db_id = GET_OR_RETWITHLOG(loadObjectLen(nullptr));
+      continue;
+    } else if (type == RDBOpcodeResizeDB) {       // not use in kvrocks, hint 
redis for hash table resize
+      GET_OR_RETWITHLOG(loadObjectLen(nullptr));  // db_size
+      GET_OR_RETWITHLOG(loadObjectLen(nullptr));  // expires_size
+      continue;
+    } else if (type == RDBOpcodeAux) {
+      /* AUX: generic string-string fields. Use to add state to RDB
+       * which is backward compatible. Implementations of RDB loading
+       * are required to skip AUX fields they don't understand.
+       *
+       * An AUX field is composed of two strings: key and value. */
+      auto key = GET_OR_RETWITHLOG(LoadStringObject());
+      auto value = GET_OR_RETWITHLOG(LoadStringObject());
+      continue;
+    } else if (type == RDBOpcodeModuleAux) {
+      LOG(WARNING) << "RDB module not supported";
+      return {Status::NotOK, "RDB module not supported"};
+    } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+      LOG(WARNING) << "RDB function not supported";
+      return {Status::NotOK, "RDB function not supported"};
+    } else {
+      if (!isObjectType(type)) {
+        LOG(WARNING) << "Invalid or Not supported object type: " << type;
+        return {Status::NotOK, "Invalid or Not supported object type"};
+      }
+    }
+
+    auto key = GET_OR_RETWITHLOG(LoadStringObject());
+    auto value = GET_OR_RETWITHLOG(loadRdbObject(type, key));
+
+    if (db_index != db_id) {  // skip db not match
+      continue;
+    }
+
+    if (isEmptyRedisObject(value)) {  // compatible with empty value
+      /* Since we used to have bug that could lead to empty keys
+       * (See #8453), we rather not fail when empty key is encountered
+       * in an RDB file, instead we will silently discard it and
+       * continue loading. */
+      if (empty_keys_skipped++ < 10) {
+        LOG(WARNING) << "skipping empty key: " << key;
+      }
+      continue;
+    } else if (expire_time != 0 &&
+               expire_time < now) {  // in redis this used to feed this 
deletion to any connected replicas
+      expire_keys++;
+      continue;
+    }
+
+    if (is_nx) {  // only load not exist key
+      auto s = exist(key);
+      if (!s.IsNotFound()) {
+        skip_exist_keys++;  // skip it even it's not okay
+        if (!s.ok()) {
+          LOG(ERROR) << "check key " << key << " exist failed: " << 
s.ToString();
+        }
+        continue;
+      }
+    }
+
+    auto ret = saveRdbObject(type, key, value, expire_time);
+    if (!ret.IsOK()) {
+      LOG(WARNING) << "save rdb object key " << key << " failed: " << 
ret.Msg();
+    } else {
+      load_keys++;
+    }
+  }
+
+  // Verify the checksum if RDB version is >= 5
+  if (rdb_ver >= 5) {
+    uint64_t chk_sum = 0;
+    auto expected = GET_OR_RETWITHLOG(stream_->GetCheckSum());
+    GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&chk_sum), 8));
+    if (chk_sum == 0) {
+      LOG(WARNING) << "RDB file was saved with checksum disabled: no check 
performed.";
+    } else if (chk_sum != expected) {
+      LOG(WARNING) << "Wrong RDB checksum expected: " << chk_sum << " got: " 
<< expected;
+      return {Status::NotOK, "Wrong RDB checksum"};
+    }
+  }
+
+  std::string skip_info = (is_nx ? ", exist keys skipped: " + 
std::to_string(skip_exist_keys) : "");
+
+  if (empty_keys_skipped > 0) {

Review Comment:
   I think we can print the empty keys info, no matter it's zero or not.



##########
src/storage/rdb.cc:
##########
@@ -39,53 +43,69 @@
 constexpr const int RDB6BitLen = 0;
 constexpr const int RDB14BitLen = 1;
 constexpr const int RDBEncVal = 3;
-constexpr const int RDB32BitLen = 0x08;
+constexpr const int RDB32BitLen = 0x80;
 constexpr const int RDB64BitLen = 0x81;
 constexpr const int RDBEncInt8 = 0;
 constexpr const int RDBEncInt16 = 1;
 constexpr const int RDBEncInt32 = 2;
 constexpr const int RDBEncLzf = 3;
 
-Status RDB::peekOk(size_t n) {
-  if (pos_ + n > input_.size()) {
-    return {Status::NotOK, "unexpected EOF"};
-  }
-  return Status::OK();
-}
-
-Status RDB::VerifyPayloadChecksum() {
-  if (input_.size() < 10) {
+/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
+constexpr const int RDBOpcodeFunction2 = 245;    /* function library data */
+constexpr const int RDBOpcodeFunction = 246;     /* old function library data 
for 7.0 rc1 and rc2 */
+constexpr const int RDBOpcodeModuleAux = 247;    /* Module auxiliary data. */
+constexpr const int RDBOpcodeIdle = 248;         /* LRU idle time. */
+constexpr const int RDBOpcodeFreq = 249;         /* LFU frequency. */
+constexpr const int RDBOpcodeAux = 250;          /* RDB aux field. */
+constexpr const int RDBOpcodeResizeDB = 251;     /* Hash table resize hint. */
+constexpr const int RDBOpcodeExpireTimeMs = 252; /* Expire time in 
milliseconds. */
+constexpr const int RDBOpcodeExpireTime = 253;   /* Old expire time in 
seconds. */
+constexpr const int RDBOpcodeSelectDB = 254;     /* DB number of the following 
keys. */
+constexpr const int RDBOpcodeEof = 255;          /* End of the RDB file. */
+
+// The current support RDB version
+constexpr const int RDBVersion = 10;
+
+// NOLINTNEXTLINE
+#define GET_OR_RETWITHLOG(...)                                                 
                        \
+  ({                                                                           
                        \
+    auto &&status = (__VA_ARGS__);                                             
                        \
+    if (!status) {                                                             
                        \
+      LOG(WARNING) << "Short read or unsupported type loading DB. 
Unrecoverable error, aborting now."; \
+      LOG(ERROR) << "Unexpected EOF reading RDB file";                         
                        \
+      return std::forward<decltype(status)>(status);                           
                        \
+    }                                                                          
                        \
+    std::forward<decltype(status)>(status);                                    
                        \
+  }).GetValue()

Review Comment:
   @PragmaTwice What do you think about this implementation.



##########
tests/cppunit/rdb_stream_test.cc:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "common/rdb_stream.h"
+
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <fstream>
+
+#include "rdb_util.h"
+
+TEST(RdbFileStreamOpenTest, FileNotExist) {
+  RdbFileStream reader("not_exist.rdb");
+  ASSERT_FALSE(reader.Open().IsOK());
+  ;
+}
+
+TEST(RdbFileStreamOpenTest, FileExist) {
+  std::string test_file = "hash-zipmap.rdb";
+  ScopedTestRDBFile temp(test_file, hash_zipmap_payload, 
sizeof(hash_zipmap_payload) - 1);
+  RdbFileStream reader(test_file);
+  ASSERT_TRUE(reader.Open().IsOK());
+}
+
+TEST(RdbFileStreamReadTest, ReadRdb) {
+  const std::string test_file = "encodings.rdb";
+  ScopedTestRDBFile temp(test_file, encodings_rdb_payload, 
sizeof(encodings_rdb_payload) - 1);
+
+  std::ifstream file(test_file, std::ios::binary | std::ios::ate);
+  std::streamsize size = file.tellg();
+  file.close();
+
+  RdbFileStream reader(test_file);
+  ASSERT_TRUE(reader.Open().IsOK());
+
+  char buf[16] = {0};
+  ASSERT_EQ(reader.Read(buf, 5).GetValue(), 5);
+  ASSERT_EQ(strncmp(buf, "REDIS", 5), 0);
+  size -= 5;
+
+  auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
+  while (size >= len) {
+    ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+    size -= len;
+  }
+
+  if (size > 0) {
+    ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+  }
+}
+
+TEST(RdbFileStreamReadTest, ReadRdbLittleChunk) {
+  const std::string test_file = "encodings.rdb";
+  ScopedTestRDBFile temp(test_file, encodings_rdb_payload, 
sizeof(encodings_rdb_payload) - 1);
+
+  std::ifstream file(test_file, std::ios::binary | std::ios::ate);
+  std::streamsize size = file.tellg();
+  file.close();
+
+  RdbFileStream reader(test_file, 16);
+  ASSERT_TRUE(reader.Open().IsOK());
+  char buf[32] = {0};
+  auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
+
+  while (size >= len) {
+    ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+    size -= len;
+  }
+
+  if (size > 0) {
+    ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+  }
+}

Review Comment:
   ```suggestion
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to