This is an automated email from the ASF dual-hosted git repository.

wangyuan pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 694d561  Use PutVarInt instead of PutFixed while encoding stream entry 
value (#791)
694d561 is described below

commit 694d5617bcb873db8d107decd7fe297e4588360b
Author: Yaroslav <[email protected]>
AuthorDate: Sat Aug 27 05:50:49 2022 +0300

    Use PutVarInt instead of PutFixed while encoding stream entry value (#791)
    
    Changes PutFixed32 to PutVarint32 and GetFixed32 to GetVarint32,
    this way could effectively save space usage for small entries.
    
    Obviously, as ##745, this change makes it impossible to decode entries
    if entries were encoded with PutFixed32 (an error will be returned).
---
 src/encoding.cc                  | 57 ++++++++++++++++++++++++++++++++++++++++
 src/encoding.h                   |  5 ++++
 src/redis_stream.cc              | 42 ++++++++++++++++++++++-------
 src/redis_stream_base.cc         | 19 ++++++++------
 src/redis_stream_base.h          |  2 +-
 tests/cppunit/t_encoding_test.cc | 22 +++++++++++++++-
 tests/cppunit/t_stream_test.cc   |  9 +++++++
 7 files changed, 137 insertions(+), 19 deletions(-)

diff --git a/src/encoding.cc b/src/encoding.cc
index 1ad3845..01678ac 100644
--- a/src/encoding.cc
+++ b/src/encoding.cc
@@ -271,3 +271,60 @@ double DecodeDouble(const char *ptr) {
   memcpy(&value, &decoded, sizeof(value));
   return value;
 }
+
+char* EncodeVarint32(char *dst, uint32_t v) {
+  // Operate on characters as unsigneds
+  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
+  do {
+    *ptr = 0x80 | v;
+    v >>= 7, ++ptr;
+  } while (v != 0);
+  *(ptr - 1) &= 0x7F;
+  return reinterpret_cast<char*>(ptr);
+}
+
+void PutVarint32(std::string *dst, uint32_t v) {
+  char buf[5];
+  char* ptr = EncodeVarint32(buf, v);
+  dst->append(buf, static_cast<size_t>(ptr - buf));
+}
+
+const char* GetVarint32PtrFallback(const char *p, const char *limit, uint32_t 
*value) {
+  uint32_t result = 0;
+  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
+    uint32_t byte = static_cast<unsigned char>(*p);
+    p++;
+    if (byte & 0x80) {
+      // More bytes are present
+      result |= ((byte & 0x7F) << shift);
+    } else {
+      result |= (byte << shift);
+      *value = result;
+      return p;
+    }
+  }
+  return nullptr;
+}
+
+const char* GetVarint32Ptr(const char *p, const char *limit, uint32_t *value) {
+  if (p < limit) {
+    uint32_t result = static_cast<unsigned char>(*p);
+    if ((result & 0x80) == 0) {
+      *value = result;
+      return p + 1;
+    }
+  }
+  return GetVarint32PtrFallback(p, limit, value);
+}
+
+bool GetVarint32(rocksdb::Slice *input, uint32_t *value) {
+  const char* p = input->data();
+  const char* limit = p + input->size();
+  const char* q = GetVarint32Ptr(p, limit, value);
+  if (q == nullptr) {
+    return false;
+  } else {
+    *input = rocksdb::Slice(q, static_cast<size_t>(limit - q));
+    return true;
+  }
+}
diff --git a/src/encoding.h b/src/encoding.h
index 97287cd..af9ce75 100644
--- a/src/encoding.h
+++ b/src/encoding.h
@@ -43,3 +43,8 @@ uint16_t DecodeFixed16(const char *ptr);
 uint32_t DecodeFixed32(const char *ptr);
 uint64_t DecodeFixed64(const char *ptr);
 double DecodeDouble(const char *ptr);
+char *EncodeVarint32(char *dst, uint32_t v);
+void PutVarint32(std::string *dst, uint32_t v);
+const char* GetVarint32PtrFallback(const char *p, const char *limit, uint32_t 
*value);
+const char* GetVarint32Ptr(const char *p, const char *limit, uint32_t *value);
+bool GetVarint32(rocksdb::Slice *input, uint32_t *value);
diff --git a/src/redis_stream.cc b/src/redis_stream.cc
index 3d5d65e..72c4ea9 100644
--- a/src/redis_stream.cc
+++ b/src/redis_stream.cc
@@ -21,6 +21,8 @@
 #include "redis_stream.h"
 
 #include <memory>
+#include <utility>
+#include <vector>
 
 #include <glog/logging.h>
 #include <rocksdb/status.h>
@@ -322,13 +324,19 @@ rocksdb::Status Stream::range(const std::string &ns_key, 
const StreamMetadata &m
       return rocksdb::Status::OK();
     }
 
-    std::string value;
-    auto s = db_->Get(rocksdb::ReadOptions(), stream_cf_handle_, start_key, 
&value);
+    std::string entry_value;
+    auto s = db_->Get(rocksdb::ReadOptions(), stream_cf_handle_, start_key, 
&entry_value);
     if (!s.ok()) {
       return s.IsNotFound() ? rocksdb::Status::OK() : s;
     }
 
-    entries->emplace_back(options.start.ToString(), 
DecodeRawStreamEntryValue(value));
+    std::vector<std::string> values;
+    auto rv = DecodeRawStreamEntryValue(entry_value, &values);
+    if (!rv.IsOK()) {
+      return rocksdb::Status::InvalidArgument(rv.Msg());
+    }
+
+    entries->emplace_back(options.start.ToString(), std::move(values));
     return rocksdb::Status::OK();
   }
 
@@ -368,8 +376,13 @@ rocksdb::Status Stream::range(const std::string &ns_key, 
const StreamMetadata &m
       break;
     }
 
-    entries->emplace_back(entryIDFromInternalKey(iter->key()).ToString(),
-                          DecodeRawStreamEntryValue(iter->value().ToString()));
+    std::vector<std::string> values;
+    auto rv = DecodeRawStreamEntryValue(iter->value().ToString(), &values);
+    if (!rv.IsOK()) {
+      return rocksdb::Status::InvalidArgument(rv.Msg());
+    }
+
+    entries->emplace_back(entryIDFromInternalKey(iter->key()).ToString(), 
std::move(values));
 
     if (options.with_count && entries->size() == options.count) {
       break;
@@ -429,16 +442,27 @@ rocksdb::Status Stream::GetStreamInfo(const 
rocksdb::Slice &stream_name, bool fu
     if (!s.ok()) {
       return s;
     }
-    info->first_entry = Util::MakeUnique<StreamEntry>(
-          metadata.first_entry_id.ToString(), 
DecodeRawStreamEntryValue(first_value));
+
+    std::vector<std::string> values;
+    auto rv = DecodeRawStreamEntryValue(first_value, &values);
+    if (!rv.IsOK()) {
+      return rocksdb::Status::InvalidArgument(rv.Msg());
+    }
+
+    info->first_entry = 
Util::MakeUnique<StreamEntry>(metadata.first_entry_id.ToString(), 
std::move(values));
 
     std::string last_value;
     s = getEntryRawValue(ns_key, metadata, metadata.last_entry_id, 
&last_value);
     if (!s.ok()) {
       return s;
     }
-    info->last_entry = Util::MakeUnique<StreamEntry>(
-          metadata.last_entry_id.ToString(), 
DecodeRawStreamEntryValue(last_value));
+
+    rv = DecodeRawStreamEntryValue(last_value, &values);
+    if (!rv.IsOK()) {
+      return rocksdb::Status::InvalidArgument(rv.Msg());
+    }
+
+    info->last_entry = 
Util::MakeUnique<StreamEntry>(metadata.last_entry_id.ToString(), 
std::move(values));
   }
 
   return rocksdb::Status::OK();
diff --git a/src/redis_stream_base.cc b/src/redis_stream_base.cc
index 4af68fe..78b356c 100644
--- a/src/redis_stream_base.cc
+++ b/src/redis_stream_base.cc
@@ -26,6 +26,7 @@ namespace Redis {
 
 const char* kErrLastEntryIdReached = "last possible entry id reached";
 const char* kErrInvalidEntryIdSpecified = "Invalid stream ID specified as 
stream command argument";
+const char* kErrDecodingStreamEntryValueFailure = "failed to decode stream 
entry value";
 
 rocksdb::Status IncrementStreamEntryID(StreamEntryID *id) {
   if (id->seq == UINT64_MAX) {
@@ -99,7 +100,7 @@ Status ParseNewStreamEntryID(const std::string &input, 
NewStreamEntryID *id) {
     return Status(Status::RedisParseErr, kErrInvalidEntryIdSpecified);
   }
 
-  return Status();
+  return Status::OK();
 }
 
 Status ParseRangeStart(const std::string &input, StreamEntryID *id) {
@@ -123,30 +124,32 @@ Status ParseRangeEnd(const std::string &input, 
StreamEntryID *id) {
     return Status(Status::RedisParseErr, kErrInvalidEntryIdSpecified);
   }
 
-  return Status();
+  return Status::OK();
 }
 
 std::string EncodeStreamEntryValue(const std::vector<std::string> &args) {
   std::string dst;
   for (auto const &v : args) {
-    PutFixed32(&dst, v.size());
+    PutVarint32(&dst, v.size());
     dst.append(v);
   }
   return dst;
 }
 
-std::vector<std::string> DecodeRawStreamEntryValue(const std::string &value) {
-  std::vector<std::string> result;
+Status DecodeRawStreamEntryValue(const std::string &value, 
std::vector<std::string> *result) {
+  result->clear();
   rocksdb::Slice s(value);
 
   while (!s.empty()) {
     uint32_t len;
-    GetFixed32(&s, &len);
-    result.emplace_back(s.data(), len);
+    if (!GetVarint32(&s, &len)) {
+      return Status(Status::RedisParseErr, 
kErrDecodingStreamEntryValueFailure);
+    }
+    result->emplace_back(s.data(), len);
     s.remove_prefix(len);
   }
 
-  return result;
+  return Status::OK();
 }
 
 }  // namespace Redis
diff --git a/src/redis_stream_base.h b/src/redis_stream_base.h
index 76208ca..39c3d09 100644
--- a/src/redis_stream_base.h
+++ b/src/redis_stream_base.h
@@ -145,7 +145,7 @@ Status ParseNewStreamEntryID(const std::string &input, 
NewStreamEntryID *id);
 Status ParseRangeStart(const std::string &input, StreamEntryID *id);
 Status ParseRangeEnd(const std::string &input, StreamEntryID *id);
 std::string EncodeStreamEntryValue(const std::vector<std::string> &args);
-std::vector<std::string> DecodeRawStreamEntryValue(const std::string &value);
+Status DecodeRawStreamEntryValue(const std::string &value, 
std::vector<std::string> *result);
 
 
 }  // namespace Redis
diff --git a/tests/cppunit/t_encoding_test.cc b/tests/cppunit/t_encoding_test.cc
index 79624ff..3deaa21 100644
--- a/tests/cppunit/t_encoding_test.cc
+++ b/tests/cppunit/t_encoding_test.cc
@@ -21,7 +21,13 @@
 #include <gtest/gtest.h>
 #include "encoding.h"
 
+#include <cstdint>
 #include <limits>
+#include <string>
+#include <vector>
+
+#include <rocksdb/slice.h>
+
 TEST(Util, EncodeAndDecodeDouble) {
   std::vector<double> values = {-1234, -100.1234, -1.2345, 0, 1.2345, 
100.1234, 1234};
   std::string prev_bytes;
@@ -35,4 +41,18 @@ TEST(Util, EncodeAndDecodeDouble) {
     prev_bytes.assign(bytes);
     ASSERT_EQ(value, got);
   }
-}
\ No newline at end of file
+}
+
+TEST(Util, EncodeAndDecodeInt32AsVarint32) {
+  std::vector<uint32_t> values = {200, 65000, 16700000, 4294000000};
+  std::vector<size_t> encoded_sizes = {2, 3, 4, 5};
+  for (size_t i = 0; i < values.size(); ++i) {
+    std::string buf;
+    PutVarint32(&buf, values[i]);
+    EXPECT_EQ(buf.size(), encoded_sizes[i]);
+    uint32_t result;
+    rocksdb::Slice s(buf);
+    GetVarint32(&s, &result);
+    ASSERT_EQ(result, values[i]);
+  }
+}
diff --git a/tests/cppunit/t_stream_test.cc b/tests/cppunit/t_stream_test.cc
index 927e5b1..63ab312 100644
--- a/tests/cppunit/t_stream_test.cc
+++ b/tests/cppunit/t_stream_test.cc
@@ -55,6 +55,15 @@ protected:
   Redis::Stream *stream;
 };
 
+TEST_F(RedisStreamTest, EncodeDecodeEntryValue) {
+  std::vector<std::string> values = {"day", "first", "month", "eleventh", 
"epoch", "fairly-very-old-one"};
+  auto encoded = Redis::EncodeStreamEntryValue(values);
+  std::vector<std::string> decoded;
+  auto s = Redis::DecodeRawStreamEntryValue(encoded, &decoded);
+  EXPECT_TRUE(s.IsOK());
+  checkStreamEntryValues(decoded, values);
+}
+
 TEST_F(RedisStreamTest, AddEntryToNonExistingStreamWithNomkstreamOption) {
   Redis::StreamAddOptions options;
   options.nomkstream = true;

Reply via email to