torwig commented on code in PR #1798:
URL: https://github.com/apache/kvrocks/pull/1798#discussion_r1355011362
##########
src/storage/rdb.cc:
##########
@@ -39,53 +43,69 @@
constexpr const int RDB6BitLen = 0;
constexpr const int RDB14BitLen = 1;
constexpr const int RDBEncVal = 3;
-constexpr const int RDB32BitLen = 0x08;
+constexpr const int RDB32BitLen = 0x80;
constexpr const int RDB64BitLen = 0x81;
constexpr const int RDBEncInt8 = 0;
constexpr const int RDBEncInt16 = 1;
constexpr const int RDBEncInt32 = 2;
constexpr const int RDBEncLzf = 3;
-Status RDB::peekOk(size_t n) {
- if (pos_ + n > input_.size()) {
- return {Status::NotOK, "unexpected EOF"};
- }
- return Status::OK();
-}
-
-Status RDB::VerifyPayloadChecksum() {
- if (input_.size() < 10) {
+/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
+constexpr const int RDBOpcodeFunction2 = 245; /* function library data */
+constexpr const int RDBOpcodeFunction = 246; /* old function library data
for 7.0 rc1 and rc2 */
+constexpr const int RDBOpcodeModuleAux = 247; /* Module auxiliary data. */
+constexpr const int RDBOpcodeIdle = 248; /* LRU idle time. */
+constexpr const int RDBOpcodeFreq = 249; /* LFU frequency. */
+constexpr const int RDBOpcodeAux = 250; /* RDB aux field. */
+constexpr const int RDBOpcodeResizeDB = 251; /* Hash table resize hint. */
+constexpr const int RDBOpcodeExpireTimeMs = 252; /* Expire time in
milliseconds. */
+constexpr const int RDBOpcodeExpireTime = 253; /* Old expire time in
seconds. */
+constexpr const int RDBOpcodeSelectDB = 254; /* DB number of the following
keys. */
+constexpr const int RDBOpcodeEof = 255; /* End of the RDB file. */
+
+// The current support RDB version
+constexpr const int RDBVersion = 10;
+
+// NOLINTNEXTLINE
+#define GET_OR_RETWITHLOG(...)
\
+ ({
\
+ auto &&status = (__VA_ARGS__);
\
+ if (!status) {
\
+ LOG(WARNING) << "Short read or unsupported type loading DB.
Unrecoverable error, aborting now."; \
+ LOG(ERROR) << "Unexpected EOF reading RDB file";
\
+ return std::forward<decltype(status)>(status);
\
+ }
\
+ std::forward<decltype(status)>(status);
\
+ }).GetValue()
+
+Status RDB::VerifyPayloadChecksum(const std::string_view &payload) {
+ if (payload.size() < 10) {
return {Status::NotOK, "invalid payload length"};
}
- auto footer = input_.substr(input_.size() - 10);
+ auto footer = payload.substr(payload.size() - 10);
auto rdb_version = (footer[1] << 8) | footer[0];
// For now, the max redis rdb version is 11
if (rdb_version > 11) {
Review Comment:
I think we can make the value `11` a constant.
##########
src/common/rdb_stream.cc:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "rdb_stream.h"
+
+#include "fmt/format.h"
+#include "vendor/crc64.h"
+#include "vendor/endianconv.h"
+
+StatusOr<size_t> RdbStringStream::Read(char *buf, size_t n) {
+ if (pos_ + n > input_.size()) {
+ return {Status::NotOK, "unexpected EOF"};
+ }
+ memcpy(buf, input_.data() + pos_, n);
+ pos_ += n;
+ return n;
+}
+
+StatusOr<uint64_t> RdbStringStream::GetCheckSum() const {
+ if (input_.size() < 8) {
+ return {Status::NotOK, "invalid payload length"};
+ }
+ uint64_t crc = crc64(0, reinterpret_cast<const unsigned char
*>(input_.data()), input_.size() - 8);
+ memrev64ifbe(&crc);
+ return crc;
+}
+
+Status RdbFileStream::Open() {
+ ifs_.open(file_name_, std::ifstream::in | std::ifstream::binary);
+ if (!ifs_.is_open()) {
+ return {Status::NotOK, fmt::format("failed to open rdb file: '{}': {}",
file_name_, strerror(errno))};
+ }
+
+ return Status::OK();
+}
+
+StatusOr<size_t> RdbFileStream::Read(char *buf, size_t len) {
+ size_t n = 0;
+ while (len) {
+ size_t read_bytes = max_read_chunk_size_ < len ? max_read_chunk_size_ :
len;
Review Comment:
Perhaps, we can shorten this line by using `std::min(max_read_chunk_size_,
len)`.
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
Review Comment:
You can name the function more descriptive like `loadExpirationTimeSeconds`.
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
Review Comment:
You can name the function more descriptive like
`loadExpirationTimeMilliseconds`.
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+ uint64_t t64 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+ /* before Redis 5 (RDB version 9), the function
+ * failed to convert data to/from little endian, so RDB files with keys
having
+ * expires could not be shared between big endian and little endian systems
+ * (because the expire time will be totally wrong). comment from src/rdb.c:
rdbLoadMillisecondTime*/
+ if (rdb_version >= 9) {
+ memrev64ifbe(&t64);
+ }
+ return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+ if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+ return vec_str_ptr->size() == 0;
+ }
+ if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+ return vec_mem_ptr->size() == 0;
+ }
+ if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+ return map_ptr->size() == 0;
+ }
+
+ return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+ char buf[1024] = {0};
+ GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+ buf[9] = '\0';
+
+ if (memcmp(buf, "REDIS", 5) != 0) {
+ LOG(WARNING) << "Wrong signature trying to load DB from file";
+ return {Status::NotOK};
+ }
+
+ auto rdb_ver = std::atoi(buf + 5);
+ if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+ LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+ return {Status::NotOK};
+ }
+
+ uint64_t expire_time = 0;
+ int64_t expire_keys = 0;
+ int64_t load_keys = 0;
+ int64_t empty_keys_skipped = 0;
+ auto now = util::GetTimeStampMS();
+ uint32_t db_id = 0;
+ uint64_t skip_exist_keys = 0;
+ while (true) {
+ auto type = GET_OR_RETWITHLOG(loadRdbType());
+ if (type == RDBOpcodeExpireTime) {
+ expire_time = static_cast<uint64_t>(GET_OR_RETWITHLOG(loadTime()));
+ expire_time *= 1000;
+ continue;
+ } else if (type == RDBOpcodeExpireTimeMs) {
+ expire_time = GET_OR_RETWITHLOG(loadMillisecondTime(rdb_ver));
+ continue;
+ } else if (type == RDBOpcodeFreq) { // LFU frequency: not use in
kvrocks
+ GET_OR_RETWITHLOG(stream_->ReadByte()); // discard the value
+ continue;
+ } else if (type == RDBOpcodeIdle) { // LRU idle time: not use in kvrocks
+ uint64_t discard = 0;
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&discard), 8));
+ continue;
+ } else if (type == RDBOpcodeEof) {
+ break;
+ } else if (type == RDBOpcodeSelectDB) {
+ db_id = GET_OR_RETWITHLOG(loadObjectLen(nullptr));
+ continue;
+ } else if (type == RDBOpcodeResizeDB) { // not use in kvrocks, hint
redis for hash table resize
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // db_size
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // expires_size
+ continue;
+ } else if (type == RDBOpcodeAux) {
+ /* AUX: generic string-string fields. Use to add state to RDB
+ * which is backward compatible. Implementations of RDB loading
+ * are required to skip AUX fields they don't understand.
+ *
+ * An AUX field is composed of two strings: key and value. */
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(LoadStringObject());
+ continue;
+ } else if (type == RDBOpcodeModuleAux) {
+ LOG(WARNING) << "RDB module not supported";
+ return {Status::NotOK, "RDB module not supported"};
+ } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+ LOG(WARNING) << "RDB function not supported";
+ return {Status::NotOK, "RDB function not supported"};
+ } else {
+ if (!isObjectType(type)) {
+ LOG(WARNING) << "Invalid or Not supported object type: " << type;
+ return {Status::NotOK, "Invalid or Not supported object type"};
+ }
+ }
+
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(loadRdbObject(type, key));
+
+ if (db_index != db_id) { // skip db not match
+ continue;
+ }
+
+ if (isEmptyRedisObject(value)) { // compatible with empty value
+ /* Since we used to have bug that could lead to empty keys
+ * (See #8453), we rather not fail when empty key is encountered
+ * in an RDB file, instead we will silently discard it and
+ * continue loading. */
+ if (empty_keys_skipped++ < 10) {
+ LOG(WARNING) << "skipping empty key: " << key;
+ }
+ continue;
+ } else if (expire_time != 0 &&
+ expire_time < now) { // in redis this used to feed this
deletion to any connected replicas
+ expire_keys++;
+ continue;
+ }
+
+ if (is_nx) { // only load not exist key
+ auto s = exist(key);
+ if (!s.IsNotFound()) {
+ skip_exist_keys++; // skip it even it's not okay
+ if (!s.ok()) {
+ LOG(ERROR) << "check key " << key << " exist failed: " <<
s.ToString();
+ }
+ continue;
+ }
+ }
+
+ auto ret = saveRdbObject(type, key, value, expire_time);
+ if (!ret.IsOK()) {
+ LOG(WARNING) << "save rdb object key " << key << " failed: " <<
ret.Msg();
+ } else {
+ load_keys++;
+ }
+ }
+
+ // Verify the checksum if RDB version is >= 5
+ if (rdb_ver >= 5) {
+ uint64_t chk_sum = 0;
+ auto expected = GET_OR_RETWITHLOG(stream_->GetCheckSum());
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&chk_sum), 8));
+ if (chk_sum == 0) {
+ LOG(WARNING) << "RDB file was saved with checksum disabled: no check
performed.";
+ } else if (chk_sum != expected) {
+ LOG(WARNING) << "Wrong RDB checksum expected: " << chk_sum << " got: "
<< expected;
+ return {Status::NotOK, "Wrong RDB checksum"};
Review Comment:
So we check the checksum to return an error? What would be the result? As I
understand from the code, all objects will be loaded and the function
`RDB::LoadRdb` will return an error without any rollback. Can we write
something like "All objects were processed and loaded but the checksum is
unexpected" in the response to the client?
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+ uint64_t t64 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+ /* before Redis 5 (RDB version 9), the function
+ * failed to convert data to/from little endian, so RDB files with keys
having
+ * expires could not be shared between big endian and little endian systems
+ * (because the expire time will be totally wrong). comment from src/rdb.c:
rdbLoadMillisecondTime*/
+ if (rdb_version >= 9) {
+ memrev64ifbe(&t64);
+ }
+ return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+ if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+ return vec_str_ptr->size() == 0;
+ }
+ if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+ return vec_mem_ptr->size() == 0;
+ }
+ if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+ return map_ptr->size() == 0;
+ }
+
+ return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+ char buf[1024] = {0};
+ GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+ buf[9] = '\0';
+
+ if (memcmp(buf, "REDIS", 5) != 0) {
+ LOG(WARNING) << "Wrong signature trying to load DB from file";
+ return {Status::NotOK};
+ }
+
+ auto rdb_ver = std::atoi(buf + 5);
+ if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+ LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+ return {Status::NotOK};
+ }
+
+ uint64_t expire_time = 0;
+ int64_t expire_keys = 0;
+ int64_t load_keys = 0;
+ int64_t empty_keys_skipped = 0;
+ auto now = util::GetTimeStampMS();
+ uint32_t db_id = 0;
+ uint64_t skip_exist_keys = 0;
+ while (true) {
+ auto type = GET_OR_RETWITHLOG(loadRdbType());
+ if (type == RDBOpcodeExpireTime) {
+ expire_time = static_cast<uint64_t>(GET_OR_RETWITHLOG(loadTime()));
+ expire_time *= 1000;
+ continue;
+ } else if (type == RDBOpcodeExpireTimeMs) {
+ expire_time = GET_OR_RETWITHLOG(loadMillisecondTime(rdb_ver));
+ continue;
+ } else if (type == RDBOpcodeFreq) { // LFU frequency: not use in
kvrocks
+ GET_OR_RETWITHLOG(stream_->ReadByte()); // discard the value
+ continue;
+ } else if (type == RDBOpcodeIdle) { // LRU idle time: not use in kvrocks
+ uint64_t discard = 0;
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&discard), 8));
+ continue;
+ } else if (type == RDBOpcodeEof) {
+ break;
+ } else if (type == RDBOpcodeSelectDB) {
+ db_id = GET_OR_RETWITHLOG(loadObjectLen(nullptr));
+ continue;
+ } else if (type == RDBOpcodeResizeDB) { // not use in kvrocks, hint
redis for hash table resize
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // db_size
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // expires_size
+ continue;
+ } else if (type == RDBOpcodeAux) {
+ /* AUX: generic string-string fields. Use to add state to RDB
+ * which is backward compatible. Implementations of RDB loading
+ * are required to skip AUX fields they don't understand.
+ *
+ * An AUX field is composed of two strings: key and value. */
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(LoadStringObject());
+ continue;
+ } else if (type == RDBOpcodeModuleAux) {
+ LOG(WARNING) << "RDB module not supported";
+ return {Status::NotOK, "RDB module not supported"};
+ } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+ LOG(WARNING) << "RDB function not supported";
+ return {Status::NotOK, "RDB function not supported"};
+ } else {
+ if (!isObjectType(type)) {
+ LOG(WARNING) << "Invalid or Not supported object type: " << type;
+ return {Status::NotOK, "Invalid or Not supported object type"};
+ }
+ }
+
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(loadRdbObject(type, key));
+
+ if (db_index != db_id) { // skip db not match
+ continue;
+ }
+
+ if (isEmptyRedisObject(value)) { // compatible with empty value
+ /* Since we used to have bug that could lead to empty keys
+ * (See #8453), we rather not fail when empty key is encountered
+ * in an RDB file, instead we will silently discard it and
+ * continue loading. */
+ if (empty_keys_skipped++ < 10) {
+ LOG(WARNING) << "skipping empty key: " << key;
+ }
+ continue;
+ } else if (expire_time != 0 &&
+ expire_time < now) { // in redis this used to feed this
deletion to any connected replicas
+ expire_keys++;
+ continue;
+ }
+
+ if (is_nx) { // only load not exist key
+ auto s = exist(key);
+ if (!s.IsNotFound()) {
+ skip_exist_keys++; // skip it even it's not okay
+ if (!s.ok()) {
+ LOG(ERROR) << "check key " << key << " exist failed: " <<
s.ToString();
+ }
+ continue;
+ }
+ }
+
+ auto ret = saveRdbObject(type, key, value, expire_time);
+ if (!ret.IsOK()) {
+ LOG(WARNING) << "save rdb object key " << key << " failed: " <<
ret.Msg();
+ } else {
+ load_keys++;
+ }
+ }
+
+ // Verify the checksum if RDB version is >= 5
+ if (rdb_ver >= 5) {
+ uint64_t chk_sum = 0;
+ auto expected = GET_OR_RETWITHLOG(stream_->GetCheckSum());
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&chk_sum), 8));
+ if (chk_sum == 0) {
+ LOG(WARNING) << "RDB file was saved with checksum disabled: no check
performed.";
+ } else if (chk_sum != expected) {
+ LOG(WARNING) << "Wrong RDB checksum expected: " << chk_sum << " got: "
<< expected;
+ return {Status::NotOK, "Wrong RDB checksum"};
+ }
+ }
+
+ std::string skip_info = (is_nx ? ", exist keys skipped: " +
std::to_string(skip_exist_keys) : "");
+
+ if (empty_keys_skipped > 0) {
+ LOG(INFO) << "Done loading RDB, keys loaded: " << load_keys << ", keys
expired:" << expire_keys
+ << ", empty keys skipped: " << empty_keys_skipped << skip_info;
+ } else {
+ LOG(INFO) << "Done loading RDB, keys loaded: " << load_keys << ", keys
expired:" << expire_keys << skip_info;
+ }
+
+ return Status::OK();
+}
+
+rocksdb::Status RDB::exist(const std::string &key) {
+ int cnt = 0;
+ std::vector<rocksdb::Slice> keys;
+ keys.emplace_back(key);
+ redis::Database redis(storage_, ns_);
+ auto s = redis.Exists(keys, &cnt);
Review Comment:
I was thinking of introducing the function `Database::KeyExist` to make the
check, in this case, simpler without creating `std::vector` and renaming
`Database::Exists` to `Database::MultipleKeysExist`. However, I'm not sure
about it. @git-hulk @PragmaTwice What do you think?
##########
src/storage/rdb.cc:
##########
@@ -95,20 +115,17 @@ StatusOr<uint64_t> RDB::loadObjectLen(bool *is_encoded) {
return c & 0x3F;
case RDB14BitLen:
len = c & 0x3F;
- GET_OR_RET(peekOk(1));
- return (len << 8) | input_[pos_++];
- case RDB32BitLen:
- GET_OR_RET(peekOk(4));
- __builtin_memcpy(&len, input_.data() + pos_, 4);
- pos_ += 4;
- return len;
- case RDB64BitLen:
- GET_OR_RET(peekOk(8));
- __builtin_memcpy(&len, input_.data() + pos_, 8);
- pos_ += 8;
- return len;
+ return (len << 8) | GET_OR_RET(stream_->ReadByte());
default:
- return {Status::NotOK, fmt::format("Unknown length encoding {} in
loadObjectLen()", type)};
+ if (c == RDB32BitLen) {
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&len), 4));
Review Comment:
The same is related to `4` here and `8` down the code.
##########
src/storage/rdb.cc:
##########
@@ -39,53 +43,69 @@
constexpr const int RDB6BitLen = 0;
constexpr const int RDB14BitLen = 1;
constexpr const int RDBEncVal = 3;
-constexpr const int RDB32BitLen = 0x08;
+constexpr const int RDB32BitLen = 0x80;
constexpr const int RDB64BitLen = 0x81;
constexpr const int RDBEncInt8 = 0;
constexpr const int RDBEncInt16 = 1;
constexpr const int RDBEncInt32 = 2;
constexpr const int RDBEncLzf = 3;
-Status RDB::peekOk(size_t n) {
- if (pos_ + n > input_.size()) {
- return {Status::NotOK, "unexpected EOF"};
- }
- return Status::OK();
-}
-
-Status RDB::VerifyPayloadChecksum() {
- if (input_.size() < 10) {
+/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
+constexpr const int RDBOpcodeFunction2 = 245; /* function library data */
+constexpr const int RDBOpcodeFunction = 246; /* old function library data
for 7.0 rc1 and rc2 */
+constexpr const int RDBOpcodeModuleAux = 247; /* Module auxiliary data. */
+constexpr const int RDBOpcodeIdle = 248; /* LRU idle time. */
+constexpr const int RDBOpcodeFreq = 249; /* LFU frequency. */
+constexpr const int RDBOpcodeAux = 250; /* RDB aux field. */
+constexpr const int RDBOpcodeResizeDB = 251; /* Hash table resize hint. */
+constexpr const int RDBOpcodeExpireTimeMs = 252; /* Expire time in
milliseconds. */
+constexpr const int RDBOpcodeExpireTime = 253; /* Old expire time in
seconds. */
+constexpr const int RDBOpcodeSelectDB = 254; /* DB number of the following
keys. */
+constexpr const int RDBOpcodeEof = 255; /* End of the RDB file. */
+
+// The current support RDB version
+constexpr const int RDBVersion = 10;
+
+// NOLINTNEXTLINE
+#define GET_OR_RETWITHLOG(...)
\
+ ({
\
+ auto &&status = (__VA_ARGS__);
\
+ if (!status) {
\
+ LOG(WARNING) << "Short read or unsupported type loading DB.
Unrecoverable error, aborting now."; \
+ LOG(ERROR) << "Unexpected EOF reading RDB file";
\
+ return std::forward<decltype(status)>(status);
\
+ }
\
+ std::forward<decltype(status)>(status);
\
+ }).GetValue()
+
+Status RDB::VerifyPayloadChecksum(const std::string_view &payload) {
+ if (payload.size() < 10) {
return {Status::NotOK, "invalid payload length"};
}
- auto footer = input_.substr(input_.size() - 10);
+ auto footer = payload.substr(payload.size() - 10);
auto rdb_version = (footer[1] << 8) | footer[0];
// For now, the max redis rdb version is 11
if (rdb_version > 11) {
return {Status::NotOK, fmt::format("invalid or unsupported rdb version:
{}", rdb_version)};
}
- uint64_t crc = crc64(0, reinterpret_cast<const unsigned char
*>(input_.data()), input_.size() - 8);
- memrev64ifbe(&crc);
+ auto crc = GET_OR_RET(stream_->GetCheckSum());
if (memcmp(&crc, footer.data() + 2, 8)) {
Review Comment:
Perhaps here we also can make `2` and `8` constants (to describe that
something is a checksum padding and something is a checksum length).
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+ uint64_t t64 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+ /* before Redis 5 (RDB version 9), the function
+ * failed to convert data to/from little endian, so RDB files with keys
having
+ * expires could not be shared between big endian and little endian systems
+ * (because the expire time will be totally wrong). comment from src/rdb.c:
rdbLoadMillisecondTime*/
+ if (rdb_version >= 9) {
+ memrev64ifbe(&t64);
+ }
+ return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+ if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+ return vec_str_ptr->size() == 0;
+ }
+ if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+ return vec_mem_ptr->size() == 0;
+ }
+ if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+ return map_ptr->size() == 0;
+ }
+
+ return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+ char buf[1024] = {0};
+ GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+ buf[9] = '\0';
+
+ if (memcmp(buf, "REDIS", 5) != 0) {
+ LOG(WARNING) << "Wrong signature trying to load DB from file";
+ return {Status::NotOK};
+ }
+
+ auto rdb_ver = std::atoi(buf + 5);
+ if (rdb_ver < 1 || rdb_ver > RDBVersion) {
Review Comment:
Transform `1` into the `kMinRdbVersion` constant.
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+ uint64_t t64 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+ /* before Redis 5 (RDB version 9), the function
+ * failed to convert data to/from little endian, so RDB files with keys
having
+ * expires could not be shared between big endian and little endian systems
+ * (because the expire time will be totally wrong). comment from src/rdb.c:
rdbLoadMillisecondTime*/
+ if (rdb_version >= 9) {
+ memrev64ifbe(&t64);
+ }
+ return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+ if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+ return vec_str_ptr->size() == 0;
+ }
+ if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+ return vec_mem_ptr->size() == 0;
+ }
+ if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+ return map_ptr->size() == 0;
+ }
+
+ return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+ char buf[1024] = {0};
+ GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+ buf[9] = '\0';
+
+ if (memcmp(buf, "REDIS", 5) != 0) {
+ LOG(WARNING) << "Wrong signature trying to load DB from file";
+ return {Status::NotOK};
+ }
+
+ auto rdb_ver = std::atoi(buf + 5);
+ if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+ LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+ return {Status::NotOK};
+ }
+
+ uint64_t expire_time = 0;
+ int64_t expire_keys = 0;
+ int64_t load_keys = 0;
+ int64_t empty_keys_skipped = 0;
+ auto now = util::GetTimeStampMS();
+ uint32_t db_id = 0;
+ uint64_t skip_exist_keys = 0;
+ while (true) {
+ auto type = GET_OR_RETWITHLOG(loadRdbType());
+ if (type == RDBOpcodeExpireTime) {
+ expire_time = static_cast<uint64_t>(GET_OR_RETWITHLOG(loadTime()));
+ expire_time *= 1000;
+ continue;
+ } else if (type == RDBOpcodeExpireTimeMs) {
+ expire_time = GET_OR_RETWITHLOG(loadMillisecondTime(rdb_ver));
+ continue;
+ } else if (type == RDBOpcodeFreq) { // LFU frequency: not use in
kvrocks
+ GET_OR_RETWITHLOG(stream_->ReadByte()); // discard the value
+ continue;
+ } else if (type == RDBOpcodeIdle) { // LRU idle time: not use in kvrocks
+ uint64_t discard = 0;
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&discard), 8));
+ continue;
+ } else if (type == RDBOpcodeEof) {
+ break;
+ } else if (type == RDBOpcodeSelectDB) {
+ db_id = GET_OR_RETWITHLOG(loadObjectLen(nullptr));
+ continue;
+ } else if (type == RDBOpcodeResizeDB) { // not use in kvrocks, hint
redis for hash table resize
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // db_size
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // expires_size
+ continue;
+ } else if (type == RDBOpcodeAux) {
+ /* AUX: generic string-string fields. Use to add state to RDB
+ * which is backward compatible. Implementations of RDB loading
+ * are required to skip AUX fields they don't understand.
+ *
+ * An AUX field is composed of two strings: key and value. */
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(LoadStringObject());
+ continue;
+ } else if (type == RDBOpcodeModuleAux) {
+ LOG(WARNING) << "RDB module not supported";
+ return {Status::NotOK, "RDB module not supported"};
+ } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+ LOG(WARNING) << "RDB function not supported";
+ return {Status::NotOK, "RDB function not supported"};
+ } else {
+ if (!isObjectType(type)) {
+ LOG(WARNING) << "Invalid or Not supported object type: " << type;
+ return {Status::NotOK, "Invalid or Not supported object type"};
+ }
+ }
+
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(loadRdbObject(type, key));
+
+ if (db_index != db_id) { // skip db not match
+ continue;
+ }
+
+ if (isEmptyRedisObject(value)) { // compatible with empty value
+ /* Since we used to have bug that could lead to empty keys
+ * (See #8453), we rather not fail when empty key is encountered
+ * in an RDB file, instead we will silently discard it and
+ * continue loading. */
+ if (empty_keys_skipped++ < 10) {
+ LOG(WARNING) << "skipping empty key: " << key;
+ }
+ continue;
+ } else if (expire_time != 0 &&
+ expire_time < now) { // in redis this used to feed this
deletion to any connected replicas
+ expire_keys++;
+ continue;
+ }
+
+ if (is_nx) { // only load not exist key
+ auto s = exist(key);
+ if (!s.IsNotFound()) {
+ skip_exist_keys++; // skip it even it's not okay
+ if (!s.ok()) {
+ LOG(ERROR) << "check key " << key << " exist failed: " <<
s.ToString();
+ }
+ continue;
+ }
+ }
+
+ auto ret = saveRdbObject(type, key, value, expire_time);
+ if (!ret.IsOK()) {
+ LOG(WARNING) << "save rdb object key " << key << " failed: " <<
ret.Msg();
+ } else {
+ load_keys++;
+ }
+ }
+
+ // Verify the checksum if RDB version is >= 5
+ if (rdb_ver >= 5) {
Review Comment:
Make `5` as a constant (something like `kMinRdbVersionToVerifyChecksum` or
something shorter).
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+ uint64_t t64 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+ /* before Redis 5 (RDB version 9), the function
+ * failed to convert data to/from little endian, so RDB files with keys
having
+ * expires could not be shared between big endian and little endian systems
+ * (because the expire time will be totally wrong). comment from src/rdb.c:
rdbLoadMillisecondTime*/
+ if (rdb_version >= 9) {
+ memrev64ifbe(&t64);
+ }
+ return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+ if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+ return vec_str_ptr->size() == 0;
+ }
+ if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+ return vec_mem_ptr->size() == 0;
+ }
+ if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+ return map_ptr->size() == 0;
+ }
+
+ return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+ char buf[1024] = {0};
+ GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+ buf[9] = '\0';
+
+ if (memcmp(buf, "REDIS", 5) != 0) {
+ LOG(WARNING) << "Wrong signature trying to load DB from file";
+ return {Status::NotOK};
+ }
+
+ auto rdb_ver = std::atoi(buf + 5);
+ if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+ LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+ return {Status::NotOK};
+ }
+
+ uint64_t expire_time = 0;
Review Comment:
```
uint64_t expire_time = 0;
int64_t expire_keys = 0;
int64_t load_keys = 0;
int64_t empty_keys_skipped = 0;
uint64_t skip_exist_keys = 0;
```
Minor renaming to the following:
```
uint64_t expiration_time = 0;
int64_t expired_keys = 0;
int64_t loaded_keys = 0;
int64_t skipped_empty_keys = 0;
uint64_t skipped_existing_keys = 0;
```
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+ uint64_t t64 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+ /* before Redis 5 (RDB version 9), the function
+ * failed to convert data to/from little endian, so RDB files with keys
having
+ * expires could not be shared between big endian and little endian systems
+ * (because the expire time will be totally wrong). comment from src/rdb.c:
rdbLoadMillisecondTime*/
+ if (rdb_version >= 9) {
+ memrev64ifbe(&t64);
+ }
+ return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+ if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+ return vec_str_ptr->size() == 0;
+ }
+ if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+ return vec_mem_ptr->size() == 0;
+ }
+ if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+ return map_ptr->size() == 0;
+ }
+
+ return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+ char buf[1024] = {0};
+ GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+ buf[9] = '\0';
+
+ if (memcmp(buf, "REDIS", 5) != 0) {
+ LOG(WARNING) << "Wrong signature trying to load DB from file";
+ return {Status::NotOK};
+ }
+
+ auto rdb_ver = std::atoi(buf + 5);
+ if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+ LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+ return {Status::NotOK};
+ }
+
+ uint64_t expire_time = 0;
+ int64_t expire_keys = 0;
+ int64_t load_keys = 0;
+ int64_t empty_keys_skipped = 0;
+ auto now = util::GetTimeStampMS();
+ uint32_t db_id = 0;
+ uint64_t skip_exist_keys = 0;
+ while (true) {
+ auto type = GET_OR_RETWITHLOG(loadRdbType());
+ if (type == RDBOpcodeExpireTime) {
+ expire_time = static_cast<uint64_t>(GET_OR_RETWITHLOG(loadTime()));
+ expire_time *= 1000;
+ continue;
+ } else if (type == RDBOpcodeExpireTimeMs) {
+ expire_time = GET_OR_RETWITHLOG(loadMillisecondTime(rdb_ver));
+ continue;
+ } else if (type == RDBOpcodeFreq) { // LFU frequency: not use in
kvrocks
+ GET_OR_RETWITHLOG(stream_->ReadByte()); // discard the value
+ continue;
+ } else if (type == RDBOpcodeIdle) { // LRU idle time: not use in kvrocks
+ uint64_t discard = 0;
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&discard), 8));
+ continue;
+ } else if (type == RDBOpcodeEof) {
+ break;
+ } else if (type == RDBOpcodeSelectDB) {
+ db_id = GET_OR_RETWITHLOG(loadObjectLen(nullptr));
+ continue;
+ } else if (type == RDBOpcodeResizeDB) { // not use in kvrocks, hint
redis for hash table resize
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // db_size
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // expires_size
+ continue;
+ } else if (type == RDBOpcodeAux) {
+ /* AUX: generic string-string fields. Use to add state to RDB
+ * which is backward compatible. Implementations of RDB loading
+ * are required to skip AUX fields they don't understand.
+ *
+ * An AUX field is composed of two strings: key and value. */
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(LoadStringObject());
+ continue;
+ } else if (type == RDBOpcodeModuleAux) {
+ LOG(WARNING) << "RDB module not supported";
+ return {Status::NotOK, "RDB module not supported"};
+ } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+ LOG(WARNING) << "RDB function not supported";
+ return {Status::NotOK, "RDB function not supported"};
+ } else {
+ if (!isObjectType(type)) {
+ LOG(WARNING) << "Invalid or Not supported object type: " << type;
+ return {Status::NotOK, "Invalid or Not supported object type"};
+ }
+ }
+
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(loadRdbObject(type, key));
+
+ if (db_index != db_id) { // skip db not match
+ continue;
+ }
+
+ if (isEmptyRedisObject(value)) { // compatible with empty value
+ /* Since we used to have bug that could lead to empty keys
+ * (See #8453), we rather not fail when empty key is encountered
+ * in an RDB file, instead we will silently discard it and
+ * continue loading. */
+ if (empty_keys_skipped++ < 10) {
+ LOG(WARNING) << "skipping empty key: " << key;
+ }
+ continue;
+ } else if (expire_time != 0 &&
+ expire_time < now) { // in redis this used to feed this
deletion to any connected replicas
+ expire_keys++;
+ continue;
+ }
+
+ if (is_nx) { // only load not exist key
+ auto s = exist(key);
+ if (!s.IsNotFound()) {
+ skip_exist_keys++; // skip it even it's not okay
+ if (!s.ok()) {
+ LOG(ERROR) << "check key " << key << " exist failed: " <<
s.ToString();
+ }
+ continue;
+ }
+ }
+
+ auto ret = saveRdbObject(type, key, value, expire_time);
+ if (!ret.IsOK()) {
+ LOG(WARNING) << "save rdb object key " << key << " failed: " <<
ret.Msg();
+ } else {
+ load_keys++;
+ }
+ }
+
+ // Verify the checksum if RDB version is >= 5
+ if (rdb_ver >= 5) {
+ uint64_t chk_sum = 0;
+ auto expected = GET_OR_RETWITHLOG(stream_->GetCheckSum());
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&chk_sum), 8));
+ if (chk_sum == 0) {
+ LOG(WARNING) << "RDB file was saved with checksum disabled: no check
performed.";
+ } else if (chk_sum != expected) {
+ LOG(WARNING) << "Wrong RDB checksum expected: " << chk_sum << " got: "
<< expected;
+ return {Status::NotOK, "Wrong RDB checksum"};
+ }
+ }
+
+ std::string skip_info = (is_nx ? ", exist keys skipped: " +
std::to_string(skip_exist_keys) : "");
+
+ if (empty_keys_skipped > 0) {
+ LOG(INFO) << "Done loading RDB, keys loaded: " << load_keys << ", keys
expired:" << expire_keys
+ << ", empty keys skipped: " << empty_keys_skipped << skip_info;
+ } else {
+ LOG(INFO) << "Done loading RDB, keys loaded: " << load_keys << ", keys
expired:" << expire_keys << skip_info;
+ }
+
+ return Status::OK();
+}
+
+rocksdb::Status RDB::exist(const std::string &key) {
Review Comment:
I would rename this function to `keyExist` or as @git-hulk suggested
`isKeyExist`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]