This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new a618574f RDB: style enhancement for rdb load (#1839)
a618574f is described below
commit a618574f5ff2eb03255da66e4c9910969b4e0cce
Author: mwish <[email protected]>
AuthorDate: Fri Oct 20 17:59:56 2023 +0800
RDB: style enhancement for rdb load (#1839)
---
src/common/rdb_stream.cc | 22 +++++++++++++---------
src/common/rdb_stream.h | 6 +++---
src/storage/rdb.cc | 24 ++++++++++++++----------
tests/cppunit/rdb_stream_test.cc | 11 +++++------
4 files changed, 35 insertions(+), 28 deletions(-)
diff --git a/src/common/rdb_stream.cc b/src/common/rdb_stream.cc
index 83855ea6..15896eeb 100644
--- a/src/common/rdb_stream.cc
+++ b/src/common/rdb_stream.cc
@@ -23,13 +23,13 @@
#include "fmt/format.h"
#include "vendor/crc64.h"
-StatusOr<size_t> RdbStringStream::Read(char *buf, size_t n) {
+Status RdbStringStream::Read(char *buf, size_t n) {
if (pos_ + n > input_.size()) {
return {Status::NotOK, "unexpected EOF"};
}
memcpy(buf, input_.data() + pos_, n);
pos_ += n;
- return n;
+ return Status::OK();
}
StatusOr<uint64_t> RdbStringStream::GetCheckSum() const {
@@ -50,20 +50,24 @@ Status RdbFileStream::Open() {
return Status::OK();
}
-StatusOr<size_t> RdbFileStream::Read(char *buf, size_t len) {
- size_t n = 0;
+Status RdbFileStream::Read(char *buf, size_t len) {
while (len) {
size_t read_bytes = std::min(max_read_chunk_size_, len);
ifs_.read(buf, static_cast<std::streamsize>(read_bytes));
if (!ifs_.good()) {
- return Status(Status::NotOK, fmt::format("read failed: {}:",
strerror(errno)));
+ if (!ifs_.eof()) {
+ return {Status::NotOK, fmt::format("read failed: {}:",
strerror(errno))};
+ }
+ auto eof_read_bytes = static_cast<size_t>(ifs_.gcount());
+ if (read_bytes != eof_read_bytes) {
+ return {Status::NotOK, fmt::format("read failed: {}:",
strerror(errno))};
+ }
}
check_sum_ = crc64(check_sum_, reinterpret_cast<const unsigned char
*>(buf), read_bytes);
- buf = (char *)buf + read_bytes;
+ buf = buf + read_bytes;
+ DCHECK(len >= read_bytes);
len -= read_bytes;
total_read_bytes_ += read_bytes;
- n += read_bytes;
}
-
- return n;
+ return Status::OK();
}
diff --git a/src/common/rdb_stream.h b/src/common/rdb_stream.h
index 842d24a7..c67a1b46 100644
--- a/src/common/rdb_stream.h
+++ b/src/common/rdb_stream.h
@@ -32,7 +32,7 @@ class RdbStream {
RdbStream() = default;
virtual ~RdbStream() = default;
- virtual StatusOr<size_t> Read(char *buf, size_t len) = 0;
+ virtual Status Read(char *buf, size_t len) = 0;
virtual StatusOr<uint64_t> GetCheckSum() const = 0;
StatusOr<uint8_t> ReadByte() {
uint8_t value = 0;
@@ -51,7 +51,7 @@ class RdbStringStream : public RdbStream {
RdbStringStream &operator=(const RdbStringStream &) = delete;
~RdbStringStream() override = default;
- StatusOr<size_t> Read(char *buf, size_t len) override;
+ Status Read(char *buf, size_t len) override;
StatusOr<uint64_t> GetCheckSum() const override;
private:
@@ -68,7 +68,7 @@ class RdbFileStream : public RdbStream {
~RdbFileStream() override = default;
Status Open();
- StatusOr<size_t> Read(char *buf, size_t len) override;
+ Status Read(char *buf, size_t len) override;
StatusOr<uint64_t> GetCheckSum() const override {
uint64_t crc = check_sum_;
memrev64ifbe(&crc);
diff --git a/src/storage/rdb.cc b/src/storage/rdb.cc
index c73c172e..bd433a4d 100644
--- a/src/storage/rdb.cc
+++ b/src/storage/rdb.cc
@@ -173,9 +173,13 @@ StatusOr<std::string> RDB::loadEncodedString() {
}
// Normal string
- std::vector<char> vec(len);
- GET_OR_RET(stream_->Read(vec.data(), len));
- return std::string(vec.data(), len);
+ if (len == 0) {
+ return "";
+ }
+ std::string read_string;
+ read_string.resize(len);
+ GET_OR_RET(stream_->Read(read_string.data(), len));
+ return read_string;
}
StatusOr<std::vector<std::string>> RDB::LoadListWithQuickList(int type) {
@@ -196,7 +200,7 @@ StatusOr<std::vector<std::string>>
RDB::LoadListWithQuickList(int type) {
if (container == QuickListNodeContainerPlain) {
auto element = GET_OR_RET(loadEncodedString());
- list.push_back(element);
+ list.push_back(std::move(element));
continue;
}
@@ -204,11 +208,11 @@ StatusOr<std::vector<std::string>>
RDB::LoadListWithQuickList(int type) {
if (type == RDBTypeListQuickList2) {
ListPack lp(encoded_string);
auto elements = GET_OR_RET(lp.Entries());
- list.insert(list.end(), elements.begin(), elements.end());
+ list.insert(list.end(), std::make_move_iterator(elements.begin()),
std::make_move_iterator(elements.end()));
} else {
ZipList zip_list(encoded_string);
auto elements = GET_OR_RET(zip_list.Entries());
- list.insert(list.end(), elements.begin(), elements.end());
+ list.insert(list.end(), std::make_move_iterator(elements.begin()),
std::make_move_iterator(elements.end()));
}
}
return list;
@@ -222,7 +226,7 @@ StatusOr<std::vector<std::string>> RDB::LoadListObject() {
}
for (size_t i = 0; i < len; i++) {
auto element = GET_OR_RET(loadEncodedString());
- list.push_back(element);
+ list.push_back(std::move(element));
}
return list;
}
@@ -241,7 +245,7 @@ StatusOr<std::vector<std::string>> RDB::LoadSetObject() {
}
for (size_t i = 0; i < len; i++) {
auto element = GET_OR_RET(LoadStringObject());
- set.push_back(element);
+ set.push_back(std::move(element));
}
return set;
}
@@ -268,7 +272,7 @@ StatusOr<std::map<std::string, std::string>>
RDB::LoadHashObject() {
for (size_t i = 0; i < len; i++) {
auto field = GET_OR_RET(LoadStringObject());
auto value = GET_OR_RET(LoadStringObject());
- hash[field] = value;
+ hash[field] = std::move(value);
}
return hash;
}
@@ -471,7 +475,7 @@ Status RDB::saveRdbObject(int type, const std::string &key,
const RedisObjValue
const auto &member_scores = std::get<std::vector<MemberScore>>(obj);
redis::ZSet zset_db(storage_, ns_);
uint64_t count = 0;
- db_status = zset_db.Add(key, ZAddFlags(0), (redis::ZSet::MemberScores
*)&member_scores, &count);
+ db_status = zset_db.Add(key, ZAddFlags(0),
const_cast<std::vector<MemberScore> *>(&member_scores), &count);
} else if (type == RDBTypeHash || type == RDBTypeHashListPack || type ==
RDBTypeHashZipList ||
type == RDBTypeHashZipMap) {
const auto &entries = std::get<std::map<std::string, std::string>>(obj);
diff --git a/tests/cppunit/rdb_stream_test.cc b/tests/cppunit/rdb_stream_test.cc
index c82a04cf..267703b8 100644
--- a/tests/cppunit/rdb_stream_test.cc
+++ b/tests/cppunit/rdb_stream_test.cc
@@ -29,7 +29,6 @@
TEST(RdbFileStreamOpenTest, FileNotExist) {
RdbFileStream reader("not_exist.rdb");
ASSERT_FALSE(reader.Open().IsOK());
- ;
}
TEST(RdbFileStreamOpenTest, FileExist) {
@@ -51,18 +50,18 @@ TEST(RdbFileStreamReadTest, ReadRdb) {
ASSERT_TRUE(reader.Open().IsOK());
char buf[16] = {0};
- ASSERT_EQ(reader.Read(buf, 5).GetValue(), 5);
+ ASSERT_TRUE(reader.Read(buf, 5).IsOK());
ASSERT_EQ(strncmp(buf, "REDIS", 5), 0);
size -= 5;
auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
while (size >= len) {
- ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+ ASSERT_TRUE(reader.Read(buf, len).IsOK());
size -= len;
}
if (size > 0) {
- ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+ ASSERT_TRUE(reader.Read(buf, size).IsOK());
}
}
@@ -80,11 +79,11 @@ TEST(RdbFileStreamReadTest, ReadRdbLittleChunk) {
auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
while (size >= len) {
- ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+ ASSERT_TRUE(reader.Read(buf, len).IsOK());
size -= len;
}
if (size > 0) {
- ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+ ASSERT_TRUE(reader.Read(buf, size).IsOK());
}
}