xq2010 commented on code in PR #1798:
URL: https://github.com/apache/kvrocks/pull/1798#discussion_r1358114636
##########
src/storage/rdb.cc:
##########
@@ -449,3 +510,188 @@ Status RDB::Restore(const std::string &key, uint64_t
ttl_ms) {
}
return db_status.ok() ? Status::OK() : Status{Status::RedisExecErr,
db_status.ToString()};
}
+
+StatusOr<uint32_t> RDB::loadTime() {
+ uint32_t t32 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t32), 4));
+ return t32;
+}
+
+StatusOr<uint64_t> RDB::loadMillisecondTime(int rdb_version) {
+ uint64_t t64 = 0;
+ GET_OR_RET(stream_->Read(reinterpret_cast<char *>(&t64), 8));
+ /* before Redis 5 (RDB version 9), the function
+ * failed to convert data to/from little endian, so RDB files with keys
having
+ * expires could not be shared between big endian and little endian systems
+ * (because the expire time will be totally wrong). comment from src/rdb.c:
rdbLoadMillisecondTime*/
+ if (rdb_version >= 9) {
+ memrev64ifbe(&t64);
+ }
+ return t64;
+}
+
+bool RDB::isEmptyRedisObject(const RedisObjValue &value) {
+ if (auto vec_str_ptr = std::get_if<std::vector<std::string>>(&value)) {
+ return vec_str_ptr->size() == 0;
+ }
+ if (auto vec_mem_ptr = std::get_if<std::vector<MemberScore>>(&value)) {
+ return vec_mem_ptr->size() == 0;
+ }
+ if (auto map_ptr = std::get_if<std::map<std::string, std::string>>(&value)) {
+ return map_ptr->size() == 0;
+ }
+
+ return false;
+}
+
+// Load RDB file: copy from redis/src/rdb.c:branch 7.0, 76b9c13d.
+Status RDB::LoadRdb(uint32_t db_index, bool is_nx) {
+ char buf[1024] = {0};
+ GET_OR_RETWITHLOG(stream_->Read(buf, 9));
+ buf[9] = '\0';
+
+ if (memcmp(buf, "REDIS", 5) != 0) {
+ LOG(WARNING) << "Wrong signature trying to load DB from file";
+ return {Status::NotOK};
+ }
+
+ auto rdb_ver = std::atoi(buf + 5);
+ if (rdb_ver < 1 || rdb_ver > RDBVersion) {
+ LOG(WARNING) << "Can't handle RDB format version " << rdb_ver;
+ return {Status::NotOK};
+ }
+
+ uint64_t expire_time = 0;
+ int64_t expire_keys = 0;
+ int64_t load_keys = 0;
+ int64_t empty_keys_skipped = 0;
+ auto now = util::GetTimeStampMS();
+ uint32_t db_id = 0;
+ uint64_t skip_exist_keys = 0;
+ while (true) {
+ auto type = GET_OR_RETWITHLOG(loadRdbType());
+ if (type == RDBOpcodeExpireTime) {
+ expire_time = static_cast<uint64_t>(GET_OR_RETWITHLOG(loadTime()));
+ expire_time *= 1000;
+ continue;
+ } else if (type == RDBOpcodeExpireTimeMs) {
+ expire_time = GET_OR_RETWITHLOG(loadMillisecondTime(rdb_ver));
+ continue;
+ } else if (type == RDBOpcodeFreq) { // LFU frequency: not use in
kvrocks
+ GET_OR_RETWITHLOG(stream_->ReadByte()); // discard the value
+ continue;
+ } else if (type == RDBOpcodeIdle) { // LRU idle time: not use in kvrocks
+ uint64_t discard = 0;
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&discard), 8));
+ continue;
+ } else if (type == RDBOpcodeEof) {
+ break;
+ } else if (type == RDBOpcodeSelectDB) {
+ db_id = GET_OR_RETWITHLOG(loadObjectLen(nullptr));
+ continue;
+ } else if (type == RDBOpcodeResizeDB) { // not use in kvrocks, hint
redis for hash table resize
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // db_size
+ GET_OR_RETWITHLOG(loadObjectLen(nullptr)); // expires_size
+ continue;
+ } else if (type == RDBOpcodeAux) {
+ /* AUX: generic string-string fields. Use to add state to RDB
+ * which is backward compatible. Implementations of RDB loading
+ * are required to skip AUX fields they don't understand.
+ *
+ * An AUX field is composed of two strings: key and value. */
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(LoadStringObject());
+ continue;
+ } else if (type == RDBOpcodeModuleAux) {
+ LOG(WARNING) << "RDB module not supported";
+ return {Status::NotOK, "RDB module not supported"};
+ } else if (type == RDBOpcodeFunction || type == RDBOpcodeFunction2) {
+ LOG(WARNING) << "RDB function not supported";
+ return {Status::NotOK, "RDB function not supported"};
+ } else {
+ if (!isObjectType(type)) {
+ LOG(WARNING) << "Invalid or Not supported object type: " << type;
+ return {Status::NotOK, "Invalid or Not supported object type"};
+ }
+ }
+
+ auto key = GET_OR_RETWITHLOG(LoadStringObject());
+ auto value = GET_OR_RETWITHLOG(loadRdbObject(type, key));
+
+ if (db_index != db_id) { // skip db not match
+ continue;
+ }
+
+ if (isEmptyRedisObject(value)) { // compatible with empty value
+ /* Since we used to have bug that could lead to empty keys
+ * (See #8453), we rather not fail when empty key is encountered
+ * in an RDB file, instead we will silently discard it and
+ * continue loading. */
+ if (empty_keys_skipped++ < 10) {
+ LOG(WARNING) << "skipping empty key: " << key;
+ }
+ continue;
+ } else if (expire_time != 0 &&
+ expire_time < now) { // in redis this used to feed this
deletion to any connected replicas
+ expire_keys++;
+ continue;
+ }
+
+ if (is_nx) { // only load not exist key
+ auto s = exist(key);
+ if (!s.IsNotFound()) {
+ skip_exist_keys++; // skip it even it's not okay
+ if (!s.ok()) {
+ LOG(ERROR) << "check key " << key << " exist failed: " <<
s.ToString();
+ }
+ continue;
+ }
+ }
+
+ auto ret = saveRdbObject(type, key, value, expire_time);
+ if (!ret.IsOK()) {
+ LOG(WARNING) << "save rdb object key " << key << " failed: " <<
ret.Msg();
+ } else {
+ load_keys++;
+ }
+ }
+
+ // Verify the checksum if RDB version is >= 5
+ if (rdb_ver >= 5) {
+ uint64_t chk_sum = 0;
+ auto expected = GET_OR_RETWITHLOG(stream_->GetCheckSum());
+ GET_OR_RETWITHLOG(stream_->Read(reinterpret_cast<char *>(&chk_sum), 8));
+ if (chk_sum == 0) {
+ LOG(WARNING) << "RDB file was saved with checksum disabled: no check
performed.";
+ } else if (chk_sum != expected) {
+ LOG(WARNING) << "Wrong RDB checksum expected: " << chk_sum << " got: "
<< expected;
+ return {Status::NotOK, "Wrong RDB checksum"};
Review Comment:
Yes, if the checksum does not match, we will not perform a rollback. In
Redis, if a checksum error occurs, the server will exit during start-up.
However, as a command to load the RDB, it is not suitable to exit. Instead, I
will add an error message to be sent to the client.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]