This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 79bb927  refactor: implement value_shcema_v0 (#722)
79bb927 is described below

commit 79bb9279878e492b37613cdd79c9b41cbf159393
Author: zhao liwei <[email protected]>
AuthorDate: Tue May 25 15:02:38 2021 +0800

    refactor: implement value_shcema_v0 (#722)
---
 src/base/pegasus_value_schema.h                    |  15 ++-
 src/base/test/value_manager_test.cpp               |  55 ++++++++++
 src/base/test/value_schema_test.cpp                | 113 +++++++++++++++++++++
 src/base/value_field.h                             |  29 +++++-
 src/base/value_schema_manager.cpp                  |  37 +++----
 src/base/value_schema_manager.h                    |   6 +-
 src/base/value_schema_v0.cpp                       |  98 ++++++++++++++++++
 .../{value_schema_manager.h => value_schema_v0.h}  |  27 ++---
 8 files changed, 341 insertions(+), 39 deletions(-)

diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 76f6c35..3bd2f9f 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -238,13 +238,24 @@ private:
 
 enum data_version
 {
+    VERSION_0 = 0,
+    VERSION_COUNT,
+    VERSION_MAX = VERSION_0,
     /// TBD(zlw)
 };
 
 struct value_params
 {
-    std::map<value_field_type, std::unique_ptr<value_field>> fields;
-    /// TBD(zlw)
+    value_params(std::string &buf, std::vector<rocksdb::Slice> &slices)
+        : write_buf(buf), write_slices(slices)
+    {
+    }
+
+    std::array<std::unique_ptr<value_field>, FIELD_COUNT> fields;
+    // write_buf and write_slices are transferred from 
`pegasus_value_generator`, which are used to
+    // prevent data copy
+    std::string &write_buf;
+    std::vector<rocksdb::Slice> &write_slices;
 };
 
 class value_schema
diff --git a/src/base/test/value_manager_test.cpp 
b/src/base/test/value_manager_test.cpp
new file mode 100644
index 0000000..30ed3c1
--- /dev/null
+++ b/src/base/test/value_manager_test.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_schema_manager.h"
+#include <gtest/gtest.h>
+
+using namespace pegasus;
+
+extern std::string generate_value(value_schema *schema,
+                                  uint32_t expire_ts,
+                                  uint64_t time_tag,
+                                  dsn::string_view user_data);
+
+TEST(value_schema_manager, get_latest_value_schema)
+{
+    auto schema = value_schema_manager::instance().get_latest_value_schema();
+    ASSERT_EQ(data_version::VERSION_MAX, schema->version());
+}
+
+TEST(value_schema_manager, get_value_schema)
+{
+    struct test_case
+    {
+        uint32_t version;
+        bool schema_exist;
+    } tests[] = {
+        {pegasus::data_version::VERSION_0, true}, 
{pegasus::data_version::VERSION_MAX + 1, false},
+    };
+
+    for (const auto &t : tests) {
+        auto schema = 
value_schema_manager::instance().get_value_schema(t.version);
+        if (t.schema_exist) {
+            ASSERT_NE(schema, nullptr);
+            ASSERT_EQ(t.version, schema->version());
+        } else {
+            ASSERT_EQ(schema, nullptr);
+        }
+    }
+}
diff --git a/src/base/test/value_schema_test.cpp 
b/src/base/test/value_schema_test.cpp
new file mode 100644
index 0000000..aa1a40e
--- /dev/null
+++ b/src/base/test/value_schema_test.cpp
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "base/pegasus_value_schema.h"
+#include "base/value_schema_manager.h"
+
+#include <gtest/gtest.h>
+
+using namespace pegasus;
+
+uint32_t extract_expire_ts(value_schema *schema, const std::string &raw_value)
+{
+    auto field = schema->extract_field(raw_value, 
pegasus::value_field_type::EXPIRE_TIMESTAMP);
+    auto expire_ts_field = static_cast<expire_timestamp_field *>(field.get());
+    return expire_ts_field->expire_ts;
+}
+
+uint64_t extract_time_tag(value_schema *schema, const std::string &raw_value)
+{
+    auto field = schema->extract_field(raw_value, 
pegasus::value_field_type::TIME_TAG);
+    auto time_field = static_cast<time_tag_field *>(field.get());
+    return time_field->time_tag;
+}
+
+std::string generate_value(value_schema *schema,
+                           uint32_t expire_ts,
+                           uint64_t time_tag,
+                           dsn::string_view user_data)
+{
+    std::string write_buf;
+    std::vector<rocksdb::Slice> write_slices;
+    value_params params{write_buf, write_slices};
+    params.fields[value_field_type::EXPIRE_TIMESTAMP] =
+        dsn::make_unique<expire_timestamp_field>(expire_ts);
+    params.fields[value_field_type::TIME_TAG] = 
dsn::make_unique<time_tag_field>(time_tag);
+    params.fields[value_field_type::USER_DATA] = 
dsn::make_unique<user_data_field>(user_data);
+
+    rocksdb::SliceParts sparts = schema->generate_value(params);
+    std::string raw_value;
+    for (int i = 0; i < sparts.num_parts; i++) {
+        raw_value += sparts.parts[i].ToString();
+    }
+    return raw_value;
+}
+
+TEST(value_schema, generate_and_extract)
+{
+    struct test_case
+    {
+        uint32_t data_version;
+        uint32_t expire_ts;
+        uint64_t time_tag;
+        std::string user_data;
+    } tests[] = {
+        {0, 1000, 0, ""},
+        {0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
+        {0, std::numeric_limits<uint32_t>::max(), 0, ""},
+        {0, 0, 0, "a"},
+    };
+
+    for (const auto &t : tests) {
+        auto schema = 
value_schema_manager::instance().get_value_schema(t.data_version);
+        std::string raw_value = generate_value(schema, t.expire_ts, 
t.time_tag, t.user_data);
+
+        ASSERT_EQ(t.expire_ts, extract_expire_ts(schema, raw_value));
+        if (t.data_version >= 1) {
+            ASSERT_EQ(t.time_tag, extract_time_tag(schema, raw_value));
+        }
+
+        dsn::blob user_data = schema->extract_user_data(std::move(raw_value));
+        ASSERT_EQ(t.user_data, user_data.to_string());
+    }
+}
+
+TEST(value_schema, update_expire_ts)
+{
+    struct test_case
+    {
+        uint32_t data_version;
+        uint32_t expire_ts;
+        uint32_t update_expire_ts;
+    } tests[] = {
+        {0, 1000, 10086},
+    };
+
+    for (const auto &t : tests) {
+        std::string write_buf;
+        std::vector<rocksdb::Slice> write_slices;
+        auto schema = 
value_schema_manager::instance().get_value_schema(t.data_version);
+        std::string raw_value = generate_value(schema, t.expire_ts, 0, "");
+
+        std::unique_ptr<value_field> field =
+            dsn::make_unique<expire_timestamp_field>(t.update_expire_ts);
+        schema->update_field(raw_value, std::move(field));
+        ASSERT_EQ(t.update_expire_ts, extract_expire_ts(schema, raw_value));
+    }
+}
diff --git a/src/base/value_field.h b/src/base/value_field.h
index 0f39991..ecb2b76 100644
--- a/src/base/value_field.h
+++ b/src/base/value_field.h
@@ -23,7 +23,10 @@ namespace pegasus {
 
 enum value_field_type
 {
-    /// TBD(zlw)
+    EXPIRE_TIMESTAMP = 0,
+    TIME_TAG,
+    USER_DATA,
+    FIELD_COUNT,
 };
 
 struct value_field
@@ -31,4 +34,28 @@ struct value_field
     virtual ~value_field() = default;
     virtual value_field_type type() = 0;
 };
+
+struct expire_timestamp_field : public value_field
+{
+    explicit expire_timestamp_field(uint32_t timestamp) : expire_ts(timestamp) 
{}
+    value_field_type type() { return value_field_type::EXPIRE_TIMESTAMP; }
+
+    uint32_t expire_ts;
+};
+
+struct time_tag_field : public value_field
+{
+    explicit time_tag_field(uint64_t tag) : time_tag(tag) {}
+    value_field_type type() { return value_field_type::TIME_TAG; }
+
+    uint64_t time_tag;
+};
+
+struct user_data_field : public value_field
+{
+    explicit user_data_field(dsn::string_view data) : user_data(data) {}
+    value_field_type type() { return value_field_type::USER_DATA; }
+
+    dsn::string_view user_data;
+};
 } // namespace pegasus
diff --git a/src/base/value_schema_manager.cpp 
b/src/base/value_schema_manager.cpp
index 870e2b8..4caaffe 100644
--- a/src/base/value_schema_manager.cpp
+++ b/src/base/value_schema_manager.cpp
@@ -18,12 +18,21 @@
  */
 
 #include "value_schema_manager.h"
+#include "value_schema_v0.h"
 
 namespace pegasus {
+value_schema_manager::value_schema_manager()
+{
+    /**
+     * If someone wants to add a new data version, he only need to implement 
the new value schema,
+     * and register it here.
+     */
+    
value_schema_manager::instance().register_schema(dsn::make_unique<value_schema_v0>());
+}
 
-void value_schema_manager::register_schema(value_schema *schema)
+void value_schema_manager::register_schema(std::unique_ptr<value_schema> 
schema)
 {
-    /// TBD(zlw)
+    _schemas[schema->version()] = std::move(schema);
 }
 
 value_schema *value_schema_manager::get_value_schema(uint32_t 
meta_cf_data_version,
@@ -35,28 +44,14 @@ value_schema 
*value_schema_manager::get_value_schema(uint32_t meta_cf_data_versi
 
 value_schema *value_schema_manager::get_value_schema(uint32_t version) const
 {
-    /// TBD(zlw)
-    return nullptr;
+    if (version >= _schemas.size()) {
+        return nullptr;
+    }
+    return _schemas[version].get();
 }
 
 value_schema *value_schema_manager::get_latest_value_schema() const
 {
-    /// TBD(zlw)
-    return nullptr;
-}
-
-/**
- * If someone wants to add a new data version, he only need to implement the 
new value schema,
- * and register it here.
- */
-void register_value_schemas()
-{
-    /// TBD(zlw)
+    return _schemas.rbegin()->get();
 }
-
-struct value_schemas_registerer
-{
-    value_schemas_registerer() { register_value_schemas(); }
-};
-static value_schemas_registerer value_schemas_reg;
 } // namespace pegasus
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_manager.h
index 00a22fb..189d89e 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_manager.h
@@ -27,7 +27,7 @@ namespace pegasus {
 class value_schema_manager : public dsn::utils::singleton<value_schema_manager>
 {
 public:
-    void register_schema(value_schema *schema);
+    void register_schema(std::unique_ptr<value_schema> schema);
     /// using the raw value in rocksdb and data version stored in meta column 
family to get data
     /// version
     value_schema *get_value_schema(uint32_t meta_cf_data_version, 
dsn::string_view value) const;
@@ -35,9 +35,9 @@ public:
     value_schema *get_latest_value_schema() const;
 
 private:
-    value_schema_manager() = default;
+    value_schema_manager();
     friend class dsn::utils::singleton<value_schema_manager>;
 
-    std::array<value_schema *, 2> _schemas;
+    std::array<std::unique_ptr<value_schema>, data_version::VERSION_COUNT> 
_schemas;
 };
 } // namespace pegasus
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v0.cpp
new file mode 100644
index 0000000..220c3b6
--- /dev/null
+++ b/src/base/value_schema_v0.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_schema_v0.h"
+
+#include <dsn/dist/fmt_logging.h>
+#include <dsn/utility/smart_pointers.h>
+
+namespace pegasus {
+std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view 
value,
+                                                            value_field_type 
type)
+{
+    std::unique_ptr<value_field> field = nullptr;
+    switch (type) {
+    case value_field_type::EXPIRE_TIMESTAMP:
+        field = extract_timestamp(value);
+        break;
+    default:
+        dassert_f(false, "Unsupported field type: {}", type);
+    }
+    return field;
+}
+
+dsn::blob value_schema_v0::extract_user_data(std::string &&value)
+{
+    auto ret = dsn::blob::create_from_bytes(std::move(value));
+    ret.range(sizeof(uint32_t));
+    return ret;
+}
+
+void value_schema_v0::update_field(std::string &value, 
std::unique_ptr<value_field> field)
+{
+    auto type = field->type();
+    switch (field->type()) {
+    case value_field_type::EXPIRE_TIMESTAMP:
+        update_expire_ts(value, std::move(field));
+        break;
+    default:
+        dassert_f(false, "Unsupported update field type: {}", type);
+    }
+}
+
+rocksdb::SliceParts value_schema_v0::generate_value(const value_params &params)
+{
+    auto expire_ts_field = static_cast<expire_timestamp_field *>(
+        params.fields[value_field_type::EXPIRE_TIMESTAMP].get());
+    auto data_field =
+        static_cast<user_data_field 
*>(params.fields[value_field_type::USER_DATA].get());
+    if (dsn_unlikely(expire_ts_field == nullptr || data_field == nullptr)) {
+        dassert_f(false, "USER_DATA or EXPIRE_TIMESTAMP is not provided");
+        return {nullptr, 0};
+    }
+
+    params.write_buf.resize(sizeof(uint32_t));
+    dsn::data_output(params.write_buf).write_u32(expire_ts_field->expire_ts);
+    params.write_slices.clear();
+    params.write_slices.emplace_back(params.write_buf.data(), 
params.write_buf.size());
+
+    dsn::string_view user_data = data_field->user_data;
+    if (user_data.length() > 0) {
+        params.write_slices.emplace_back(user_data.data(), user_data.length());
+    }
+    return {&params.write_slices[0], 
static_cast<int>(params.write_slices.size())};
+}
+
+std::unique_ptr<value_field> 
value_schema_v0::extract_timestamp(dsn::string_view value)
+{
+    uint32_t expire_ts = dsn::data_input(value).read_u32();
+    return dsn::make_unique<expire_timestamp_field>(expire_ts);
+}
+
+void value_schema_v0::update_expire_ts(std::string &value, 
std::unique_ptr<value_field> field)
+{
+    dassert_f(value.length() >= sizeof(uint32_t), "value must include 
'expire_ts' header");
+    auto expire_field = static_cast<expire_timestamp_field *>(field.get());
+
+    auto new_expire_ts = expire_field->expire_ts;
+    new_expire_ts = dsn::endian::hton(new_expire_ts);
+    memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
+}
+
+} // namespace pegasus
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_v0.h
similarity index 56%
copy from src/base/value_schema_manager.h
copy to src/base/value_schema_v0.h
index 00a22fb..5d7712b 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_v0.h
@@ -20,24 +20,27 @@
 #pragma once
 
 #include "pegasus_value_schema.h"
+
 #include <dsn/utility/singleton.h>
 
 namespace pegasus {
-
-class value_schema_manager : public dsn::utils::singleton<value_schema_manager>
+/**
+ *  rocksdb value: |- expire_ts(4bytes) -|- user value(bytes) -|
+ */
+class value_schema_v0 : public value_schema
 {
 public:
-    void register_schema(value_schema *schema);
-    /// using the raw value in rocksdb and data version stored in meta column 
family to get data
-    /// version
-    value_schema *get_value_schema(uint32_t meta_cf_data_version, 
dsn::string_view value) const;
-    value_schema *get_value_schema(uint32_t version) const;
-    value_schema *get_latest_value_schema() const;
+    value_schema_v0() = default;
 
-private:
-    value_schema_manager() = default;
-    friend class dsn::utils::singleton<value_schema_manager>;
+    std::unique_ptr<value_field> extract_field(dsn::string_view value,
+                                               value_field_type type) override;
+    dsn::blob extract_user_data(std::string &&value) override;
+    void update_field(std::string &value, std::unique_ptr<value_field> field) 
override;
+    rocksdb::SliceParts generate_value(const value_params &params) override;
+    data_version version() const override { return VERSION_0; }
 
-    std::array<value_schema *, 2> _schemas;
+private:
+    std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
+    void update_expire_ts(std::string &value, std::unique_ptr<value_field> 
field);
 };
 } // namespace pegasus

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to