This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new a618574f RDB: style enhancement for rdb load (#1839)
a618574f is described below

commit a618574f5ff2eb03255da66e4c9910969b4e0cce
Author: mwish <[email protected]>
AuthorDate: Fri Oct 20 17:59:56 2023 +0800

    RDB: style enhancement for rdb load (#1839)
---
 src/common/rdb_stream.cc         | 22 +++++++++++++---------
 src/common/rdb_stream.h          |  6 +++---
 src/storage/rdb.cc               | 24 ++++++++++++++----------
 tests/cppunit/rdb_stream_test.cc | 11 +++++------
 4 files changed, 35 insertions(+), 28 deletions(-)

diff --git a/src/common/rdb_stream.cc b/src/common/rdb_stream.cc
index 83855ea6..15896eeb 100644
--- a/src/common/rdb_stream.cc
+++ b/src/common/rdb_stream.cc
@@ -23,13 +23,13 @@
 #include "fmt/format.h"
 #include "vendor/crc64.h"
 
-StatusOr<size_t> RdbStringStream::Read(char *buf, size_t n) {
+Status RdbStringStream::Read(char *buf, size_t n) {
   if (pos_ + n > input_.size()) {
     return {Status::NotOK, "unexpected EOF"};
   }
   memcpy(buf, input_.data() + pos_, n);
   pos_ += n;
-  return n;
+  return Status::OK();
 }
 
 StatusOr<uint64_t> RdbStringStream::GetCheckSum() const {
@@ -50,20 +50,24 @@ Status RdbFileStream::Open() {
   return Status::OK();
 }
 
-StatusOr<size_t> RdbFileStream::Read(char *buf, size_t len) {
-  size_t n = 0;
+Status RdbFileStream::Read(char *buf, size_t len) {
   while (len) {
     size_t read_bytes = std::min(max_read_chunk_size_, len);
     ifs_.read(buf, static_cast<std::streamsize>(read_bytes));
     if (!ifs_.good()) {
-      return Status(Status::NotOK, fmt::format("read failed: {}:", 
strerror(errno)));
+      if (!ifs_.eof()) {
+        return {Status::NotOK, fmt::format("read failed: {}:", 
strerror(errno))};
+      }
+      auto eof_read_bytes = static_cast<size_t>(ifs_.gcount());
+      if (read_bytes != eof_read_bytes) {
+        return {Status::NotOK, fmt::format("read failed: {}:", 
strerror(errno))};
+      }
     }
     check_sum_ = crc64(check_sum_, reinterpret_cast<const unsigned char 
*>(buf), read_bytes);
-    buf = (char *)buf + read_bytes;
+    buf = buf + read_bytes;
+    DCHECK(len >= read_bytes);
     len -= read_bytes;
     total_read_bytes_ += read_bytes;
-    n += read_bytes;
   }
-
-  return n;
+  return Status::OK();
 }
diff --git a/src/common/rdb_stream.h b/src/common/rdb_stream.h
index 842d24a7..c67a1b46 100644
--- a/src/common/rdb_stream.h
+++ b/src/common/rdb_stream.h
@@ -32,7 +32,7 @@ class RdbStream {
   RdbStream() = default;
   virtual ~RdbStream() = default;
 
-  virtual StatusOr<size_t> Read(char *buf, size_t len) = 0;
+  virtual Status Read(char *buf, size_t len) = 0;
   virtual StatusOr<uint64_t> GetCheckSum() const = 0;
   StatusOr<uint8_t> ReadByte() {
     uint8_t value = 0;
@@ -51,7 +51,7 @@ class RdbStringStream : public RdbStream {
   RdbStringStream &operator=(const RdbStringStream &) = delete;
   ~RdbStringStream() override = default;
 
-  StatusOr<size_t> Read(char *buf, size_t len) override;
+  Status Read(char *buf, size_t len) override;
   StatusOr<uint64_t> GetCheckSum() const override;
 
  private:
@@ -68,7 +68,7 @@ class RdbFileStream : public RdbStream {
   ~RdbFileStream() override = default;
 
   Status Open();
-  StatusOr<size_t> Read(char *buf, size_t len) override;
+  Status Read(char *buf, size_t len) override;
   StatusOr<uint64_t> GetCheckSum() const override {
     uint64_t crc = check_sum_;
     memrev64ifbe(&crc);
diff --git a/src/storage/rdb.cc b/src/storage/rdb.cc
index c73c172e..bd433a4d 100644
--- a/src/storage/rdb.cc
+++ b/src/storage/rdb.cc
@@ -173,9 +173,13 @@ StatusOr<std::string> RDB::loadEncodedString() {
   }
 
   // Normal string
-  std::vector<char> vec(len);
-  GET_OR_RET(stream_->Read(vec.data(), len));
-  return std::string(vec.data(), len);
+  if (len == 0) {
+    return "";
+  }
+  std::string read_string;
+  read_string.resize(len);
+  GET_OR_RET(stream_->Read(read_string.data(), len));
+  return read_string;
 }
 
 StatusOr<std::vector<std::string>> RDB::LoadListWithQuickList(int type) {
@@ -196,7 +200,7 @@ StatusOr<std::vector<std::string>> 
RDB::LoadListWithQuickList(int type) {
 
     if (container == QuickListNodeContainerPlain) {
       auto element = GET_OR_RET(loadEncodedString());
-      list.push_back(element);
+      list.push_back(std::move(element));
       continue;
     }
 
@@ -204,11 +208,11 @@ StatusOr<std::vector<std::string>> 
RDB::LoadListWithQuickList(int type) {
     if (type == RDBTypeListQuickList2) {
       ListPack lp(encoded_string);
       auto elements = GET_OR_RET(lp.Entries());
-      list.insert(list.end(), elements.begin(), elements.end());
+      list.insert(list.end(), std::make_move_iterator(elements.begin()), 
std::make_move_iterator(elements.end()));
     } else {
       ZipList zip_list(encoded_string);
       auto elements = GET_OR_RET(zip_list.Entries());
-      list.insert(list.end(), elements.begin(), elements.end());
+      list.insert(list.end(), std::make_move_iterator(elements.begin()), 
std::make_move_iterator(elements.end()));
     }
   }
   return list;
@@ -222,7 +226,7 @@ StatusOr<std::vector<std::string>> RDB::LoadListObject() {
   }
   for (size_t i = 0; i < len; i++) {
     auto element = GET_OR_RET(loadEncodedString());
-    list.push_back(element);
+    list.push_back(std::move(element));
   }
   return list;
 }
@@ -241,7 +245,7 @@ StatusOr<std::vector<std::string>> RDB::LoadSetObject() {
   }
   for (size_t i = 0; i < len; i++) {
     auto element = GET_OR_RET(LoadStringObject());
-    set.push_back(element);
+    set.push_back(std::move(element));
   }
   return set;
 }
@@ -268,7 +272,7 @@ StatusOr<std::map<std::string, std::string>> 
RDB::LoadHashObject() {
   for (size_t i = 0; i < len; i++) {
     auto field = GET_OR_RET(LoadStringObject());
     auto value = GET_OR_RET(LoadStringObject());
-    hash[field] = value;
+    hash[field] = std::move(value);
   }
   return hash;
 }
@@ -471,7 +475,7 @@ Status RDB::saveRdbObject(int type, const std::string &key, 
const RedisObjValue
     const auto &member_scores = std::get<std::vector<MemberScore>>(obj);
     redis::ZSet zset_db(storage_, ns_);
     uint64_t count = 0;
-    db_status = zset_db.Add(key, ZAddFlags(0), (redis::ZSet::MemberScores 
*)&member_scores, &count);
+    db_status = zset_db.Add(key, ZAddFlags(0), 
const_cast<std::vector<MemberScore> *>(&member_scores), &count);
   } else if (type == RDBTypeHash || type == RDBTypeHashListPack || type == 
RDBTypeHashZipList ||
              type == RDBTypeHashZipMap) {
     const auto &entries = std::get<std::map<std::string, std::string>>(obj);
diff --git a/tests/cppunit/rdb_stream_test.cc b/tests/cppunit/rdb_stream_test.cc
index c82a04cf..267703b8 100644
--- a/tests/cppunit/rdb_stream_test.cc
+++ b/tests/cppunit/rdb_stream_test.cc
@@ -29,7 +29,6 @@
 TEST(RdbFileStreamOpenTest, FileNotExist) {
   RdbFileStream reader("not_exist.rdb");
   ASSERT_FALSE(reader.Open().IsOK());
-  ;
 }
 
 TEST(RdbFileStreamOpenTest, FileExist) {
@@ -51,18 +50,18 @@ TEST(RdbFileStreamReadTest, ReadRdb) {
   ASSERT_TRUE(reader.Open().IsOK());
 
   char buf[16] = {0};
-  ASSERT_EQ(reader.Read(buf, 5).GetValue(), 5);
+  ASSERT_TRUE(reader.Read(buf, 5).IsOK());
   ASSERT_EQ(strncmp(buf, "REDIS", 5), 0);
   size -= 5;
 
   auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
   while (size >= len) {
-    ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+    ASSERT_TRUE(reader.Read(buf, len).IsOK());
     size -= len;
   }
 
   if (size > 0) {
-    ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+    ASSERT_TRUE(reader.Read(buf, size).IsOK());
   }
 }
 
@@ -80,11 +79,11 @@ TEST(RdbFileStreamReadTest, ReadRdbLittleChunk) {
   auto len = static_cast<std::streamsize>(sizeof(buf) / sizeof(buf[0]));
 
   while (size >= len) {
-    ASSERT_EQ(reader.Read(buf, len).GetValue(), len);
+    ASSERT_TRUE(reader.Read(buf, len).IsOK());
     size -= len;
   }
 
   if (size > 0) {
-    ASSERT_EQ(reader.Read(buf, size).GetValue(), size);
+    ASSERT_TRUE(reader.Read(buf, size).IsOK());
   }
 }

Reply via email to