This is an automated email from the ASF dual-hosted git repository.
wangyuan pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 694d561 Use PutVarInt instead of PutFixed while encoding stream entry
value (#791)
694d561 is described below
commit 694d5617bcb873db8d107decd7fe297e4588360b
Author: Yaroslav <[email protected]>
AuthorDate: Sat Aug 27 05:50:49 2022 +0300
Use PutVarInt instead of PutFixed while encoding stream entry value (#791)
Changes PutFixed32 to PutVarint32 and GetFixed32 to GetVarint32,
this way could effectively save space usage for small entries.
Obviously, as ##745, this change makes it impossible to decode entries
if entries were encoded with PutFixed32 (an error will be returned).
---
src/encoding.cc | 57 ++++++++++++++++++++++++++++++++++++++++
src/encoding.h | 5 ++++
src/redis_stream.cc | 42 ++++++++++++++++++++++-------
src/redis_stream_base.cc | 19 ++++++++------
src/redis_stream_base.h | 2 +-
tests/cppunit/t_encoding_test.cc | 22 +++++++++++++++-
tests/cppunit/t_stream_test.cc | 9 +++++++
7 files changed, 137 insertions(+), 19 deletions(-)
diff --git a/src/encoding.cc b/src/encoding.cc
index 1ad3845..01678ac 100644
--- a/src/encoding.cc
+++ b/src/encoding.cc
@@ -271,3 +271,60 @@ double DecodeDouble(const char *ptr) {
memcpy(&value, &decoded, sizeof(value));
return value;
}
+
+char* EncodeVarint32(char *dst, uint32_t v) {
+ // Operate on characters as unsigneds
+ unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
+ do {
+ *ptr = 0x80 | v;
+ v >>= 7, ++ptr;
+ } while (v != 0);
+ *(ptr - 1) &= 0x7F;
+ return reinterpret_cast<char*>(ptr);
+}
+
+void PutVarint32(std::string *dst, uint32_t v) {
+ char buf[5];
+ char* ptr = EncodeVarint32(buf, v);
+ dst->append(buf, static_cast<size_t>(ptr - buf));
+}
+
+const char* GetVarint32PtrFallback(const char *p, const char *limit, uint32_t
*value) {
+ uint32_t result = 0;
+ for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
+ uint32_t byte = static_cast<unsigned char>(*p);
+ p++;
+ if (byte & 0x80) {
+ // More bytes are present
+ result |= ((byte & 0x7F) << shift);
+ } else {
+ result |= (byte << shift);
+ *value = result;
+ return p;
+ }
+ }
+ return nullptr;
+}
+
+const char* GetVarint32Ptr(const char *p, const char *limit, uint32_t *value) {
+ if (p < limit) {
+ uint32_t result = static_cast<unsigned char>(*p);
+ if ((result & 0x80) == 0) {
+ *value = result;
+ return p + 1;
+ }
+ }
+ return GetVarint32PtrFallback(p, limit, value);
+}
+
+bool GetVarint32(rocksdb::Slice *input, uint32_t *value) {
+ const char* p = input->data();
+ const char* limit = p + input->size();
+ const char* q = GetVarint32Ptr(p, limit, value);
+ if (q == nullptr) {
+ return false;
+ } else {
+ *input = rocksdb::Slice(q, static_cast<size_t>(limit - q));
+ return true;
+ }
+}
diff --git a/src/encoding.h b/src/encoding.h
index 97287cd..af9ce75 100644
--- a/src/encoding.h
+++ b/src/encoding.h
@@ -43,3 +43,8 @@ uint16_t DecodeFixed16(const char *ptr);
uint32_t DecodeFixed32(const char *ptr);
uint64_t DecodeFixed64(const char *ptr);
double DecodeDouble(const char *ptr);
+char *EncodeVarint32(char *dst, uint32_t v);
+void PutVarint32(std::string *dst, uint32_t v);
+const char* GetVarint32PtrFallback(const char *p, const char *limit, uint32_t
*value);
+const char* GetVarint32Ptr(const char *p, const char *limit, uint32_t *value);
+bool GetVarint32(rocksdb::Slice *input, uint32_t *value);
diff --git a/src/redis_stream.cc b/src/redis_stream.cc
index 3d5d65e..72c4ea9 100644
--- a/src/redis_stream.cc
+++ b/src/redis_stream.cc
@@ -21,6 +21,8 @@
#include "redis_stream.h"
#include <memory>
+#include <utility>
+#include <vector>
#include <glog/logging.h>
#include <rocksdb/status.h>
@@ -322,13 +324,19 @@ rocksdb::Status Stream::range(const std::string &ns_key,
const StreamMetadata &m
return rocksdb::Status::OK();
}
- std::string value;
- auto s = db_->Get(rocksdb::ReadOptions(), stream_cf_handle_, start_key,
&value);
+ std::string entry_value;
+ auto s = db_->Get(rocksdb::ReadOptions(), stream_cf_handle_, start_key,
&entry_value);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
- entries->emplace_back(options.start.ToString(),
DecodeRawStreamEntryValue(value));
+ std::vector<std::string> values;
+ auto rv = DecodeRawStreamEntryValue(entry_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+
+ entries->emplace_back(options.start.ToString(), std::move(values));
return rocksdb::Status::OK();
}
@@ -368,8 +376,13 @@ rocksdb::Status Stream::range(const std::string &ns_key,
const StreamMetadata &m
break;
}
- entries->emplace_back(entryIDFromInternalKey(iter->key()).ToString(),
- DecodeRawStreamEntryValue(iter->value().ToString()));
+ std::vector<std::string> values;
+ auto rv = DecodeRawStreamEntryValue(iter->value().ToString(), &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+
+ entries->emplace_back(entryIDFromInternalKey(iter->key()).ToString(),
std::move(values));
if (options.with_count && entries->size() == options.count) {
break;
@@ -429,16 +442,27 @@ rocksdb::Status Stream::GetStreamInfo(const
rocksdb::Slice &stream_name, bool fu
if (!s.ok()) {
return s;
}
- info->first_entry = Util::MakeUnique<StreamEntry>(
- metadata.first_entry_id.ToString(),
DecodeRawStreamEntryValue(first_value));
+
+ std::vector<std::string> values;
+ auto rv = DecodeRawStreamEntryValue(first_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+
+ info->first_entry =
Util::MakeUnique<StreamEntry>(metadata.first_entry_id.ToString(),
std::move(values));
std::string last_value;
s = getEntryRawValue(ns_key, metadata, metadata.last_entry_id,
&last_value);
if (!s.ok()) {
return s;
}
- info->last_entry = Util::MakeUnique<StreamEntry>(
- metadata.last_entry_id.ToString(),
DecodeRawStreamEntryValue(last_value));
+
+ rv = DecodeRawStreamEntryValue(last_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+
+ info->last_entry =
Util::MakeUnique<StreamEntry>(metadata.last_entry_id.ToString(),
std::move(values));
}
return rocksdb::Status::OK();
diff --git a/src/redis_stream_base.cc b/src/redis_stream_base.cc
index 4af68fe..78b356c 100644
--- a/src/redis_stream_base.cc
+++ b/src/redis_stream_base.cc
@@ -26,6 +26,7 @@ namespace Redis {
const char* kErrLastEntryIdReached = "last possible entry id reached";
const char* kErrInvalidEntryIdSpecified = "Invalid stream ID specified as
stream command argument";
+const char* kErrDecodingStreamEntryValueFailure = "failed to decode stream
entry value";
rocksdb::Status IncrementStreamEntryID(StreamEntryID *id) {
if (id->seq == UINT64_MAX) {
@@ -99,7 +100,7 @@ Status ParseNewStreamEntryID(const std::string &input,
NewStreamEntryID *id) {
return Status(Status::RedisParseErr, kErrInvalidEntryIdSpecified);
}
- return Status();
+ return Status::OK();
}
Status ParseRangeStart(const std::string &input, StreamEntryID *id) {
@@ -123,30 +124,32 @@ Status ParseRangeEnd(const std::string &input,
StreamEntryID *id) {
return Status(Status::RedisParseErr, kErrInvalidEntryIdSpecified);
}
- return Status();
+ return Status::OK();
}
std::string EncodeStreamEntryValue(const std::vector<std::string> &args) {
std::string dst;
for (auto const &v : args) {
- PutFixed32(&dst, v.size());
+ PutVarint32(&dst, v.size());
dst.append(v);
}
return dst;
}
-std::vector<std::string> DecodeRawStreamEntryValue(const std::string &value) {
- std::vector<std::string> result;
+Status DecodeRawStreamEntryValue(const std::string &value,
std::vector<std::string> *result) {
+ result->clear();
rocksdb::Slice s(value);
while (!s.empty()) {
uint32_t len;
- GetFixed32(&s, &len);
- result.emplace_back(s.data(), len);
+ if (!GetVarint32(&s, &len)) {
+ return Status(Status::RedisParseErr,
kErrDecodingStreamEntryValueFailure);
+ }
+ result->emplace_back(s.data(), len);
s.remove_prefix(len);
}
- return result;
+ return Status::OK();
}
} // namespace Redis
diff --git a/src/redis_stream_base.h b/src/redis_stream_base.h
index 76208ca..39c3d09 100644
--- a/src/redis_stream_base.h
+++ b/src/redis_stream_base.h
@@ -145,7 +145,7 @@ Status ParseNewStreamEntryID(const std::string &input,
NewStreamEntryID *id);
Status ParseRangeStart(const std::string &input, StreamEntryID *id);
Status ParseRangeEnd(const std::string &input, StreamEntryID *id);
std::string EncodeStreamEntryValue(const std::vector<std::string> &args);
-std::vector<std::string> DecodeRawStreamEntryValue(const std::string &value);
+Status DecodeRawStreamEntryValue(const std::string &value,
std::vector<std::string> *result);
} // namespace Redis
diff --git a/tests/cppunit/t_encoding_test.cc b/tests/cppunit/t_encoding_test.cc
index 79624ff..3deaa21 100644
--- a/tests/cppunit/t_encoding_test.cc
+++ b/tests/cppunit/t_encoding_test.cc
@@ -21,7 +21,13 @@
#include <gtest/gtest.h>
#include "encoding.h"
+#include <cstdint>
#include <limits>
+#include <string>
+#include <vector>
+
+#include <rocksdb/slice.h>
+
TEST(Util, EncodeAndDecodeDouble) {
std::vector<double> values = {-1234, -100.1234, -1.2345, 0, 1.2345,
100.1234, 1234};
std::string prev_bytes;
@@ -35,4 +41,18 @@ TEST(Util, EncodeAndDecodeDouble) {
prev_bytes.assign(bytes);
ASSERT_EQ(value, got);
}
-}
\ No newline at end of file
+}
+
+TEST(Util, EncodeAndDecodeInt32AsVarint32) {
+ std::vector<uint32_t> values = {200, 65000, 16700000, 4294000000};
+ std::vector<size_t> encoded_sizes = {2, 3, 4, 5};
+ for (size_t i = 0; i < values.size(); ++i) {
+ std::string buf;
+ PutVarint32(&buf, values[i]);
+ EXPECT_EQ(buf.size(), encoded_sizes[i]);
+ uint32_t result;
+ rocksdb::Slice s(buf);
+ GetVarint32(&s, &result);
+ ASSERT_EQ(result, values[i]);
+ }
+}
diff --git a/tests/cppunit/t_stream_test.cc b/tests/cppunit/t_stream_test.cc
index 927e5b1..63ab312 100644
--- a/tests/cppunit/t_stream_test.cc
+++ b/tests/cppunit/t_stream_test.cc
@@ -55,6 +55,15 @@ protected:
Redis::Stream *stream;
};
+TEST_F(RedisStreamTest, EncodeDecodeEntryValue) {
+ std::vector<std::string> values = {"day", "first", "month", "eleventh",
"epoch", "fairly-very-old-one"};
+ auto encoded = Redis::EncodeStreamEntryValue(values);
+ std::vector<std::string> decoded;
+ auto s = Redis::DecodeRawStreamEntryValue(encoded, &decoded);
+ EXPECT_TRUE(s.IsOK());
+ checkStreamEntryValues(decoded, values);
+}
+
TEST_F(RedisStreamTest, AddEntryToNonExistingStreamWithNomkstreamOption) {
Redis::StreamAddOptions options;
options.nomkstream = true;