git-hulk commented on code in PR #2227:
URL: https://github.com/apache/kvrocks/pull/2227#discussion_r1554948251
##########
src/storage/rdb.cc:
##########
@@ -680,3 +679,323 @@ Status RDB::LoadRdb(uint32_t db_index, bool
overwrite_exist_key) {
return Status::OK();
}
+
+Status RDB::Dump(const std::string &key, const RedisType type) {
+ unsigned char buf[2];
+ /* Serialize the object in an RDB-like format. It consist of an object type
+ * byte followed by the serialized object. This is understood by RESTORE. */
+ auto s = SaveObjectType(type);
+ if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};
+ s = SaveObject(key, type);
+ if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};
+
+ /* Write the footer, this is how it looks like:
+ * ----------------+---------------------+---------------+
+ * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+ * ----------------+---------------------+---------------+
+ * RDB version and CRC are both in little endian.
+ */
+
+ /* RDB version */
+ buf[0] = MaxRDBVersion & 0xff;
+ buf[1] = (MaxRDBVersion >> 8) & 0xff;
+ s = stream_->Write((const char *)buf, 2);
+ if (!s.IsOK()) {
+ return {Status::RedisExecErr, s.Msg()};
+ }
+
+ /* CRC64 */
+ std::string &output = static_cast<RdbStringStream
*>(stream_.get())->GetInput();
+ uint64_t crc = crc64(0, (unsigned char *)(output.c_str()), output.length());
+ memrev64ifbe(&crc);
+ s = stream_->Write((const char *)(&crc), 8);
+ if (!s.IsOK()) {
+ return {Status::RedisExecErr, s.Msg()};
+ }
+
+ return Status::OK();
+}
+
+Status RDB::SaveObjectType(const RedisType type) {
+ int robj_type = -1;
+ if (type == kRedisString) {
+ robj_type = RDBTypeString;
+ } else if (type == kRedisHash) {
+ robj_type = RDBTypeHash;
+ } else if (type == kRedisList) {
+ robj_type = RDBTypeListQuickList2;
+ } else if (type == kRedisSet) {
+ robj_type = RDBTypeSet;
+ } else if (type == kRedisZSet) {
+ robj_type = RDBTypeZSet2;
+ } else {
+ LOG(WARNING) << "Invalid or Not supported object type: " << type;
+ return {Status::NotOK, "Invalid or Not supported object type"};
+ }
+ return stream_->Write((const char *)(&robj_type), 1);
+}
+
+Status RDB::SaveObject(const std::string &key, const RedisType type) {
+ if (type == kRedisString) {
+ std::string value;
+ redis::String string_db(storage_, ns_);
+ auto s = string_db.Get(key, &value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ return SaveStringObject(value);
+ } else if (type == kRedisList) {
+ std::vector<std::string> elems;
+ redis::List list_db(storage_, ns_);
+ auto s = list_db.Range(key, 0, -1, &elems);
+ if (!s.ok() && !s.IsNotFound()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ return SaveListObject(elems);
+ } else if (type == kRedisSet) {
+ redis::Set set_db(storage_, ns_);
+ std::vector<std::string> members;
+ auto s = set_db.Members(key, &members);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ return SaveSetObject(members);
+ } else if (type == kRedisZSet) {
+ redis::ZSet zset_db(storage_, ns_);
+ std::vector<MemberScore> member_scores;
+ RangeScoreSpec spec;
+ auto s = zset_db.RangeByScore(key, spec, &member_scores, nullptr);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ std::sort(member_scores.begin(), member_scores.end(),
+ [](const MemberScore &v1, const MemberScore &v2) { return
v1.score > v2.score; });
+ return SaveZSetObject(member_scores);
+ } else if (type == kRedisHash) {
+ redis::Hash hash_db(storage_, ns_);
+ std::vector<FieldValue> field_values;
+ auto s = hash_db.GetAll(key, &field_values);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ return SaveHashObject(field_values);
+ } else {
+ LOG(WARNING) << "Invalid or Not supported object type: " << type;
+ return {Status::NotOK, "Invalid or Not supported object type"};
+ }
+}
+
+Status RDB::RdbSaveLen(uint64_t len) {
+ unsigned char buf[2];
+ if (len < (1 << 6)) {
+ /* Save a 6 bit len */
+ buf[0] = (len & 0xFF) | (RDB6BitLen << 6);
+ auto status = stream_->Write((const char *)buf, 1);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ return Status::OK();
+ } else if (len < (1 << 14)) {
+ /* Save a 14 bit len */
+ buf[0] = ((len >> 8) & 0xFF) | (RDB14BitLen << 6);
+ buf[1] = len & 0xFF;
+ auto status = stream_->Write((const char *)buf, 2);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ return Status::OK();
+ } else if (len <= UINT32_MAX) {
+ /* Save a 32 bit len */
+ buf[0] = RDB32BitLen;
+ auto status = stream_->Write((const char *)buf, 1);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ uint32_t len32 = htonl(len);
+ status = stream_->Write((const char *)(&len32), 4);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ return Status::OK();
+ } else {
+ /* Save a 64 bit len */
+ buf[0] = RDB64BitLen;
+ auto status = stream_->Write((const char *)buf, 1);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ len = htonu64(len);
+ status = stream_->Write((const char *)(&len), 8);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ return Status::OK();
+ }
+}
+
+Status RDB::SaveStringObject(const std::string &value) {
+ const size_t len = value.length();
+ int enclen = 0;
+
+ // When the length is less than 11, value may be an integer,
+ // so special encoding is performed.
+ if (len <= 11) {
+ unsigned char buf[5];
+ // convert string to long long
+ std::istringstream iss(value);
+ long long integer_value = 0;
+ if (iss.eof() && !iss.fail()) {
+ iss >> integer_value;
+ // encode integer
+ enclen = rdbEncodeInteger(integer_value, buf);
+ if (enclen > 0) {
+ auto status = stream_->Write((const char *)buf, enclen);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ return Status::OK();
+ }
+ }
+ }
+
+ // Since we do not support rdb compression,
+ // the lzf encoding method has not been implemented yet.
+
+ /* Store verbatim */
+ auto status = RdbSaveLen(value.length());
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ if (value.length() > 0) {
+ status = stream_->Write(value.c_str(), value.length());
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ }
+ return Status::OK();
+}
+
+Status RDB::SaveListObject(const std::vector<std::string> &elems) {
+ if (elems.size() > 0) {
+ auto status = RdbSaveLen(elems.size());
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ for (const auto &elem : elems) {
+ status = RdbSaveLen(1 /*plain container mode */);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ status = SaveStringObject(elem);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ }
+ } else {
+ LOG(WARNING) << "the size of elems is zero";
+ return {Status::NotOK, "the size of elems is zero"};
+ }
+ return Status::OK();
+}
+
+Status RDB::SaveSetObject(const std::vector<std::string> &members) {
+ if (members.size() > 0) {
+ auto status = RdbSaveLen(members.size());
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ for (const auto &elem : members) {
+ status = SaveStringObject(elem);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ }
+ } else {
+ LOG(WARNING) << "the size of elems is zero";
+ return {Status::NotOK, "the size of elems is zero"};
+ }
+ return Status::OK();
+}
+
+Status RDB::SaveZSetObject(const std::vector<MemberScore> &member_scores) {
+ if (member_scores.size() > 0) {
+ auto status = RdbSaveLen(member_scores.size());
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ for (const auto &elem : member_scores) {
+ status = SaveStringObject(elem.member);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ status = rdbSaveBinaryDoubleValue(elem.score);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ }
+ } else {
+ LOG(WARNING) << "the size of member_scores is zero";
+ return {Status::NotOK, "the size of ZSet is 0"};
+ }
+ return Status::OK();
+}
+
+Status RDB::SaveHashObject(const std::vector<FieldValue> &field_values) {
+ if (field_values.size() > 0) {
+ auto status = RdbSaveLen(field_values.size());
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ for (const auto &p : field_values) {
+ status = SaveStringObject(p.field);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+
+ status = SaveStringObject(p.value);
+ if (!status.IsOK()) {
+ return {Status::RedisExecErr, status.Msg()};
+ }
+ }
+ } else {
+ LOG(WARNING) << "the size of field_values is zero";
+ return {Status::NotOK, "the size of Hash is 0"};
+ }
+ return Status::OK();
+}
+
+int RDB::rdbEncodeInteger(const long long value, unsigned char *enc) {
+ if (value >= -(1 << 7) && value <= (1 << 7) - 1) {
+ enc[0] = (RDBEncVal << 6) | RDBEncInt8;
+ enc[1] = value & 0xFF;
+ return 2;
+ } else if (value >= -(1 << 15) && value <= (1 << 15) - 1) {
+ enc[0] = (RDBEncVal << 6) | RDBEncInt16;
+ enc[1] = value & 0xFF;
+ enc[2] = (value >> 8) & 0xFF;
+ return 3;
+ } else if (value >= -((long long)1 << 31) && value <= ((long long)1 << 31) -
1) {
+ enc[0] = (RDBEncVal << 6) | RDBEncInt32;
+ enc[1] = value & 0xFF;
+ enc[2] = (value >> 8) & 0xFF;
+ enc[3] = (value >> 16) & 0xFF;
+ enc[4] = (value >> 24) & 0xFF;
+ return 5;
+ } else {
+ return 0;
+ }
+}
+
+Status RDB::rdbSaveBinaryDoubleValue(double val) {
+ memrev64ifbe(&val);
+ return stream_->Write((const char *)(&val), sizeof(val));
+}
Review Comment:
```suggestion
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]