This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 79bb927 refactor: implement value_shcema_v0 (#722)
79bb927 is described below
commit 79bb9279878e492b37613cdd79c9b41cbf159393
Author: zhao liwei <[email protected]>
AuthorDate: Tue May 25 15:02:38 2021 +0800
refactor: implement value_shcema_v0 (#722)
---
src/base/pegasus_value_schema.h | 15 ++-
src/base/test/value_manager_test.cpp | 55 ++++++++++
src/base/test/value_schema_test.cpp | 113 +++++++++++++++++++++
src/base/value_field.h | 29 +++++-
src/base/value_schema_manager.cpp | 37 +++----
src/base/value_schema_manager.h | 6 +-
src/base/value_schema_v0.cpp | 98 ++++++++++++++++++
.../{value_schema_manager.h => value_schema_v0.h} | 27 ++---
8 files changed, 341 insertions(+), 39 deletions(-)
diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 76f6c35..3bd2f9f 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -238,13 +238,24 @@ private:
enum data_version
{
+ VERSION_0 = 0,
+ VERSION_COUNT,
+ VERSION_MAX = VERSION_0,
/// TBD(zlw)
};
struct value_params
{
- std::map<value_field_type, std::unique_ptr<value_field>> fields;
- /// TBD(zlw)
+ value_params(std::string &buf, std::vector<rocksdb::Slice> &slices)
+ : write_buf(buf), write_slices(slices)
+ {
+ }
+
+ std::array<std::unique_ptr<value_field>, FIELD_COUNT> fields;
+ // write_buf and write_slices are transferred from
`pegasus_value_generator`, which are used to
+ // prevent data copy
+ std::string &write_buf;
+ std::vector<rocksdb::Slice> &write_slices;
};
class value_schema
diff --git a/src/base/test/value_manager_test.cpp
b/src/base/test/value_manager_test.cpp
new file mode 100644
index 0000000..30ed3c1
--- /dev/null
+++ b/src/base/test/value_manager_test.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_schema_manager.h"
+#include <gtest/gtest.h>
+
+using namespace pegasus;
+
+extern std::string generate_value(value_schema *schema,
+ uint32_t expire_ts,
+ uint64_t time_tag,
+ dsn::string_view user_data);
+
+TEST(value_schema_manager, get_latest_value_schema)
+{
+ auto schema = value_schema_manager::instance().get_latest_value_schema();
+ ASSERT_EQ(data_version::VERSION_MAX, schema->version());
+}
+
+TEST(value_schema_manager, get_value_schema)
+{
+ struct test_case
+ {
+ uint32_t version;
+ bool schema_exist;
+ } tests[] = {
+ {pegasus::data_version::VERSION_0, true},
{pegasus::data_version::VERSION_MAX + 1, false},
+ };
+
+ for (const auto &t : tests) {
+ auto schema =
value_schema_manager::instance().get_value_schema(t.version);
+ if (t.schema_exist) {
+ ASSERT_NE(schema, nullptr);
+ ASSERT_EQ(t.version, schema->version());
+ } else {
+ ASSERT_EQ(schema, nullptr);
+ }
+ }
+}
diff --git a/src/base/test/value_schema_test.cpp
b/src/base/test/value_schema_test.cpp
new file mode 100644
index 0000000..aa1a40e
--- /dev/null
+++ b/src/base/test/value_schema_test.cpp
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "base/pegasus_value_schema.h"
+#include "base/value_schema_manager.h"
+
+#include <gtest/gtest.h>
+
+using namespace pegasus;
+
+uint32_t extract_expire_ts(value_schema *schema, const std::string &raw_value)
+{
+ auto field = schema->extract_field(raw_value,
pegasus::value_field_type::EXPIRE_TIMESTAMP);
+ auto expire_ts_field = static_cast<expire_timestamp_field *>(field.get());
+ return expire_ts_field->expire_ts;
+}
+
+uint64_t extract_time_tag(value_schema *schema, const std::string &raw_value)
+{
+ auto field = schema->extract_field(raw_value,
pegasus::value_field_type::TIME_TAG);
+ auto time_field = static_cast<time_tag_field *>(field.get());
+ return time_field->time_tag;
+}
+
+std::string generate_value(value_schema *schema,
+ uint32_t expire_ts,
+ uint64_t time_tag,
+ dsn::string_view user_data)
+{
+ std::string write_buf;
+ std::vector<rocksdb::Slice> write_slices;
+ value_params params{write_buf, write_slices};
+ params.fields[value_field_type::EXPIRE_TIMESTAMP] =
+ dsn::make_unique<expire_timestamp_field>(expire_ts);
+ params.fields[value_field_type::TIME_TAG] =
dsn::make_unique<time_tag_field>(time_tag);
+ params.fields[value_field_type::USER_DATA] =
dsn::make_unique<user_data_field>(user_data);
+
+ rocksdb::SliceParts sparts = schema->generate_value(params);
+ std::string raw_value;
+ for (int i = 0; i < sparts.num_parts; i++) {
+ raw_value += sparts.parts[i].ToString();
+ }
+ return raw_value;
+}
+
+TEST(value_schema, generate_and_extract)
+{
+ struct test_case
+ {
+ uint32_t data_version;
+ uint32_t expire_ts;
+ uint64_t time_tag;
+ std::string user_data;
+ } tests[] = {
+ {0, 1000, 0, ""},
+ {0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
+ {0, std::numeric_limits<uint32_t>::max(), 0, ""},
+ {0, 0, 0, "a"},
+ };
+
+ for (const auto &t : tests) {
+ auto schema =
value_schema_manager::instance().get_value_schema(t.data_version);
+ std::string raw_value = generate_value(schema, t.expire_ts,
t.time_tag, t.user_data);
+
+ ASSERT_EQ(t.expire_ts, extract_expire_ts(schema, raw_value));
+ if (t.data_version >= 1) {
+ ASSERT_EQ(t.time_tag, extract_time_tag(schema, raw_value));
+ }
+
+ dsn::blob user_data = schema->extract_user_data(std::move(raw_value));
+ ASSERT_EQ(t.user_data, user_data.to_string());
+ }
+}
+
+TEST(value_schema, update_expire_ts)
+{
+ struct test_case
+ {
+ uint32_t data_version;
+ uint32_t expire_ts;
+ uint32_t update_expire_ts;
+ } tests[] = {
+ {0, 1000, 10086},
+ };
+
+ for (const auto &t : tests) {
+ std::string write_buf;
+ std::vector<rocksdb::Slice> write_slices;
+ auto schema =
value_schema_manager::instance().get_value_schema(t.data_version);
+ std::string raw_value = generate_value(schema, t.expire_ts, 0, "");
+
+ std::unique_ptr<value_field> field =
+ dsn::make_unique<expire_timestamp_field>(t.update_expire_ts);
+ schema->update_field(raw_value, std::move(field));
+ ASSERT_EQ(t.update_expire_ts, extract_expire_ts(schema, raw_value));
+ }
+}
diff --git a/src/base/value_field.h b/src/base/value_field.h
index 0f39991..ecb2b76 100644
--- a/src/base/value_field.h
+++ b/src/base/value_field.h
@@ -23,7 +23,10 @@ namespace pegasus {
enum value_field_type
{
- /// TBD(zlw)
+ EXPIRE_TIMESTAMP = 0,
+ TIME_TAG,
+ USER_DATA,
+ FIELD_COUNT,
};
struct value_field
@@ -31,4 +34,28 @@ struct value_field
virtual ~value_field() = default;
virtual value_field_type type() = 0;
};
+
+struct expire_timestamp_field : public value_field
+{
+ explicit expire_timestamp_field(uint32_t timestamp) : expire_ts(timestamp)
{}
+ value_field_type type() { return value_field_type::EXPIRE_TIMESTAMP; }
+
+ uint32_t expire_ts;
+};
+
+struct time_tag_field : public value_field
+{
+ explicit time_tag_field(uint64_t tag) : time_tag(tag) {}
+ value_field_type type() { return value_field_type::TIME_TAG; }
+
+ uint64_t time_tag;
+};
+
+struct user_data_field : public value_field
+{
+ explicit user_data_field(dsn::string_view data) : user_data(data) {}
+ value_field_type type() { return value_field_type::USER_DATA; }
+
+ dsn::string_view user_data;
+};
} // namespace pegasus
diff --git a/src/base/value_schema_manager.cpp
b/src/base/value_schema_manager.cpp
index 870e2b8..4caaffe 100644
--- a/src/base/value_schema_manager.cpp
+++ b/src/base/value_schema_manager.cpp
@@ -18,12 +18,21 @@
*/
#include "value_schema_manager.h"
+#include "value_schema_v0.h"
namespace pegasus {
+value_schema_manager::value_schema_manager()
+{
+ /**
+ * If someone wants to add a new data version, he only need to implement
the new value schema,
+ * and register it here.
+ */
+
value_schema_manager::instance().register_schema(dsn::make_unique<value_schema_v0>());
+}
-void value_schema_manager::register_schema(value_schema *schema)
+void value_schema_manager::register_schema(std::unique_ptr<value_schema>
schema)
{
- /// TBD(zlw)
+ _schemas[schema->version()] = std::move(schema);
}
value_schema *value_schema_manager::get_value_schema(uint32_t
meta_cf_data_version,
@@ -35,28 +44,14 @@ value_schema
*value_schema_manager::get_value_schema(uint32_t meta_cf_data_versi
value_schema *value_schema_manager::get_value_schema(uint32_t version) const
{
- /// TBD(zlw)
- return nullptr;
+ if (version >= _schemas.size()) {
+ return nullptr;
+ }
+ return _schemas[version].get();
}
value_schema *value_schema_manager::get_latest_value_schema() const
{
- /// TBD(zlw)
- return nullptr;
-}
-
-/**
- * If someone wants to add a new data version, he only need to implement the
new value schema,
- * and register it here.
- */
-void register_value_schemas()
-{
- /// TBD(zlw)
+ return _schemas.rbegin()->get();
}
-
-struct value_schemas_registerer
-{
- value_schemas_registerer() { register_value_schemas(); }
-};
-static value_schemas_registerer value_schemas_reg;
} // namespace pegasus
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_manager.h
index 00a22fb..189d89e 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_manager.h
@@ -27,7 +27,7 @@ namespace pegasus {
class value_schema_manager : public dsn::utils::singleton<value_schema_manager>
{
public:
- void register_schema(value_schema *schema);
+ void register_schema(std::unique_ptr<value_schema> schema);
/// using the raw value in rocksdb and data version stored in meta column
family to get data
/// version
value_schema *get_value_schema(uint32_t meta_cf_data_version,
dsn::string_view value) const;
@@ -35,9 +35,9 @@ public:
value_schema *get_latest_value_schema() const;
private:
- value_schema_manager() = default;
+ value_schema_manager();
friend class dsn::utils::singleton<value_schema_manager>;
- std::array<value_schema *, 2> _schemas;
+ std::array<std::unique_ptr<value_schema>, data_version::VERSION_COUNT>
_schemas;
};
} // namespace pegasus
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v0.cpp
new file mode 100644
index 0000000..220c3b6
--- /dev/null
+++ b/src/base/value_schema_v0.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_schema_v0.h"
+
+#include <dsn/dist/fmt_logging.h>
+#include <dsn/utility/smart_pointers.h>
+
+namespace pegasus {
+std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view
value,
+ value_field_type
type)
+{
+ std::unique_ptr<value_field> field = nullptr;
+ switch (type) {
+ case value_field_type::EXPIRE_TIMESTAMP:
+ field = extract_timestamp(value);
+ break;
+ default:
+ dassert_f(false, "Unsupported field type: {}", type);
+ }
+ return field;
+}
+
+dsn::blob value_schema_v0::extract_user_data(std::string &&value)
+{
+ auto ret = dsn::blob::create_from_bytes(std::move(value));
+ ret.range(sizeof(uint32_t));
+ return ret;
+}
+
+void value_schema_v0::update_field(std::string &value,
std::unique_ptr<value_field> field)
+{
+ auto type = field->type();
+ switch (field->type()) {
+ case value_field_type::EXPIRE_TIMESTAMP:
+ update_expire_ts(value, std::move(field));
+ break;
+ default:
+ dassert_f(false, "Unsupported update field type: {}", type);
+ }
+}
+
+rocksdb::SliceParts value_schema_v0::generate_value(const value_params ¶ms)
+{
+ auto expire_ts_field = static_cast<expire_timestamp_field *>(
+ params.fields[value_field_type::EXPIRE_TIMESTAMP].get());
+ auto data_field =
+ static_cast<user_data_field
*>(params.fields[value_field_type::USER_DATA].get());
+ if (dsn_unlikely(expire_ts_field == nullptr || data_field == nullptr)) {
+ dassert_f(false, "USER_DATA or EXPIRE_TIMESTAMP is not provided");
+ return {nullptr, 0};
+ }
+
+ params.write_buf.resize(sizeof(uint32_t));
+ dsn::data_output(params.write_buf).write_u32(expire_ts_field->expire_ts);
+ params.write_slices.clear();
+ params.write_slices.emplace_back(params.write_buf.data(),
params.write_buf.size());
+
+ dsn::string_view user_data = data_field->user_data;
+ if (user_data.length() > 0) {
+ params.write_slices.emplace_back(user_data.data(), user_data.length());
+ }
+ return {¶ms.write_slices[0],
static_cast<int>(params.write_slices.size())};
+}
+
+std::unique_ptr<value_field>
value_schema_v0::extract_timestamp(dsn::string_view value)
+{
+ uint32_t expire_ts = dsn::data_input(value).read_u32();
+ return dsn::make_unique<expire_timestamp_field>(expire_ts);
+}
+
+void value_schema_v0::update_expire_ts(std::string &value,
std::unique_ptr<value_field> field)
+{
+ dassert_f(value.length() >= sizeof(uint32_t), "value must include
'expire_ts' header");
+ auto expire_field = static_cast<expire_timestamp_field *>(field.get());
+
+ auto new_expire_ts = expire_field->expire_ts;
+ new_expire_ts = dsn::endian::hton(new_expire_ts);
+ memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
+}
+
+} // namespace pegasus
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_v0.h
similarity index 56%
copy from src/base/value_schema_manager.h
copy to src/base/value_schema_v0.h
index 00a22fb..5d7712b 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_v0.h
@@ -20,24 +20,27 @@
#pragma once
#include "pegasus_value_schema.h"
+
#include <dsn/utility/singleton.h>
namespace pegasus {
-
-class value_schema_manager : public dsn::utils::singleton<value_schema_manager>
+/**
+ * rocksdb value: |- expire_ts(4bytes) -|- user value(bytes) -|
+ */
+class value_schema_v0 : public value_schema
{
public:
- void register_schema(value_schema *schema);
- /// using the raw value in rocksdb and data version stored in meta column
family to get data
- /// version
- value_schema *get_value_schema(uint32_t meta_cf_data_version,
dsn::string_view value) const;
- value_schema *get_value_schema(uint32_t version) const;
- value_schema *get_latest_value_schema() const;
+ value_schema_v0() = default;
-private:
- value_schema_manager() = default;
- friend class dsn::utils::singleton<value_schema_manager>;
+ std::unique_ptr<value_field> extract_field(dsn::string_view value,
+ value_field_type type) override;
+ dsn::blob extract_user_data(std::string &&value) override;
+ void update_field(std::string &value, std::unique_ptr<value_field> field)
override;
+ rocksdb::SliceParts generate_value(const value_params ¶ms) override;
+ data_version version() const override { return VERSION_0; }
- std::array<value_schema *, 2> _schemas;
+private:
+ std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
+ void update_expire_ts(std::string &value, std::unique_ptr<value_field>
field);
};
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]