This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dae647  refactor: use rocksdb_wrapper::write_batch_put to reimplement 
check_and_set (#656)
6dae647 is described below

commit 6dae647b69a51a312310039ac91be0905d4169e5
Author: zhao liwei <[email protected]>
AuthorDate: Wed Dec 30 18:21:37 2020 +0800

    refactor: use rocksdb_wrapper::write_batch_put to reimplement check_and_set 
(#656)
---
 rdsn                                     |   2 +-
 src/server/meta_store.h                  |   1 +
 src/server/pegasus_write_service_impl.h  |  23 +++--
 src/server/rocksdb_wrapper.cpp           | 115 ++++++++++++++++++++++-
 src/server/rocksdb_wrapper.h             |  33 +++++++
 src/server/test/rocksdb_wrapper_test.cpp | 153 ++++++++++++++++++++++++++++---
 6 files changed, 302 insertions(+), 25 deletions(-)

diff --git a/rdsn b/rdsn
index b92ba6d..f27decf 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit b92ba6d5b3a9a15370b85f82814d94a746ace132
+Subproject commit f27decf2e7015b4b75e2e40b53ad6b33fb752ecd
diff --git a/src/server/meta_store.h b/src/server/meta_store.h
index a94dcd0..fb67a53 100644
--- a/src/server/meta_store.h
+++ b/src/server/meta_store.h
@@ -74,6 +74,7 @@ private:
                                                            std::string *value);
 
     friend class pegasus_write_service;
+    friend class rocksdb_wrapper;
 
     // Keys of meta data wrote into meta column family.
     static const std::string DATA_VERSION;
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index 6187ebd..ecdf4f3 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -31,6 +31,7 @@
 #include <dsn/utility/filesystem.h>
 #include <dsn/utility/string_conv.h>
 #include <gtest/gtest_prod.h>
+#include <dsn/utility/defer.h>
 
 namespace pegasus {
 namespace server {
@@ -338,22 +339,24 @@ public:
             } else {
                 set_key = check_key;
             }
-            resp.error = db_write_batch_put(decree,
-                                            set_key,
-                                            update.set_value,
-                                            
static_cast<uint32_t>(update.set_expire_ts_seconds));
+            resp.error = _rocksdb_wrapper->write_batch_put(
+                decree,
+                set_key,
+                update.set_value,
+                static_cast<uint32_t>(update.set_expire_ts_seconds));
         } else {
             // check not passed, write empty record to update rocksdb's last 
flushed decree
-            resp.error = db_write_batch_put(decree, dsn::string_view(), 
dsn::string_view(), 0);
+            resp.error = _rocksdb_wrapper->write_batch_put(
+                decree, dsn::string_view(), dsn::string_view(), 0);
         }
+
+        auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
         if (resp.error) {
-            clear_up_batch_states(decree, resp.error);
             return resp.error;
         }
 
-        resp.error = db_write(decree);
+        resp.error = _rocksdb_wrapper->write(decree);
         if (resp.error) {
-            clear_up_batch_states(decree, resp.error);
             return resp.error;
         }
 
@@ -363,7 +366,6 @@ public:
                 invalid_argument ? rocksdb::Status::kInvalidArgument : 
rocksdb::Status::kTryAgain;
         }
 
-        clear_up_batch_states(decree, resp.error);
         return 0;
     }
 
@@ -550,10 +552,13 @@ public:
 
     void set_default_ttl(uint32_t ttl)
     {
+        // TODO(zlw): remove these lines after the refactor is done
         if (_default_ttl != ttl) {
             _default_ttl = ttl;
             ddebug_replica("update _default_ttl to {}.", ttl);
         }
+
+        _rocksdb_wrapper->set_default_ttl(ttl);
     }
 
 private:
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index e7e8bb5..45dd6a1 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -21,6 +21,7 @@
 
 #include <rocksdb/db.h>
 #include "pegasus_write_service_impl.h"
+#include "base/pegasus_value_schema.h"
 
 namespace pegasus {
 namespace server {
@@ -31,9 +32,17 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
     : replica_base(server),
       _db(server->_db),
       _rd_opts(server->_data_cf_rd_opts),
+      _meta_cf(server->_meta_cf),
       _pegasus_data_version(server->_pegasus_data_version),
-      _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+      _pfc_recent_expire_count(server->_pfc_recent_expire_count),
+      _default_ttl(0)
 {
+    _write_batch = dsn::make_unique<rocksdb::WriteBatch>();
+    _value_generator = dsn::make_unique<pegasus_value_generator>();
+
+    _wt_opts = dsn::make_unique<rocksdb::WriteOptions>();
+    // disable write ahead logging as replication handles logging instead now
+    _wt_opts->disableWAL = true;
 }
 
 int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
@@ -66,5 +75,109 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ 
db_get_context *ctx)
     return s.code();
 }
 
+int rocksdb_wrapper::write_batch_put(int64_t decree,
+                                     dsn::string_view raw_key,
+                                     dsn::string_view value,
+                                     uint32_t expire_sec)
+{
+    return write_batch_put_ctx(db_write_context::empty(decree), raw_key, 
value, expire_sec);
+}
+
+int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
+                                         dsn::string_view raw_key,
+                                         dsn::string_view value,
+                                         uint32_t expire_sec)
+{
+    FAIL_POINT_INJECT_F("db_write_batch_put",
+                        [](dsn::string_view) -> int { return 
FAIL_DB_WRITE_BATCH_PUT; });
+
+    uint64_t new_timetag = ctx.remote_timetag;
+    if (!ctx.is_duplicated_write()) { // local write
+        new_timetag = generate_timetag(ctx.timestamp, 
get_cluster_id_if_exists(), false);
+    }
+
+    if (ctx.verify_timetag &&         // needs read-before-write
+        _pegasus_data_version >= 1 && // data version 0 doesn't support 
timetag.
+        !raw_key.empty()) {           // not an empty write
+
+        db_get_context get_ctx;
+        int err = get(raw_key, &get_ctx);
+        if (dsn_unlikely(err != 0)) {
+            return err;
+        }
+        // if record exists and is not expired.
+        if (get_ctx.found && !get_ctx.expired) {
+            uint64_t local_timetag =
+                pegasus_extract_timetag(_pegasus_data_version, 
get_ctx.raw_value);
+
+            if (local_timetag >= new_timetag) {
+                // ignore this stale update with lower timetag,
+                // and write an empty record instead
+                raw_key = value = dsn::string_view();
+            }
+        }
+    }
+
+    rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
+    rocksdb::SliceParts skey_parts(&skey, 1);
+    rocksdb::SliceParts svalue = _value_generator->generate_value(
+        _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
+    rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
+    if (dsn_unlikely(!s.ok())) {
+        ::dsn::blob hash_key, sort_key;
+        pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), 
hash_key, sort_key);
+        derror_rocksdb("WriteBatchPut",
+                       s.ToString(),
+                       "decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}",
+                       ctx.decree,
+                       utils::c_escape_string(hash_key),
+                       utils::c_escape_string(sort_key),
+                       expire_sec);
+    }
+    return s.code();
+}
+
+int rocksdb_wrapper::write(int64_t decree)
+{
+    dassert(_write_batch->Count() != 0, "the number of updates in the batch is 
0");
+
+    FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return 
FAIL_DB_WRITE; });
+
+    rocksdb::Status status =
+        _write_batch->Put(_meta_cf, meta_store::LAST_FLUSHED_DECREE, 
std::to_string(decree));
+    if (dsn_unlikely(!status.ok())) {
+        derror_rocksdb("Write",
+                       status.ToString(),
+                       "put decree of meta cf into batch error, decree: {}",
+                       decree);
+        return status.code();
+    }
+
+    status = _db->Write(*_wt_opts, _write_batch.get());
+    if (dsn_unlikely(!status.ok())) {
+        derror_rocksdb("Write", status.ToString(), "write rocksdb error, 
decree: {}", decree);
+    }
+    return status.code();
+}
+
+void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }
+
+void rocksdb_wrapper::set_default_ttl(uint32_t ttl)
+{
+    if (_default_ttl != ttl) {
+        _default_ttl = ttl;
+        ddebug_replica("update _default_ttl to {}", ttl);
+    }
+}
+
+uint32_t rocksdb_wrapper::db_expire_ts(uint32_t expire_ts)
+{
+    // use '_default_ttl' when ttl is not set for this write operation.
+    if (_default_ttl != 0 && expire_ts == 0) {
+        return utils::epoch_now() + _default_ttl;
+    }
+
+    return expire_ts;
+}
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index 07f49a4..351f6e5 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -20,10 +20,14 @@
 #pragma once
 
 #include <dsn/dist/replication/replica_base.h>
+#include <gtest/gtest_prod.h>
 
 namespace rocksdb {
 class DB;
 class ReadOptions;
+class WriteBatch;
+class ColumnFamilyHandle;
+class WriteOptions;
 } // namespace rocksdb
 
 namespace dsn {
@@ -31,8 +35,11 @@ class perf_counter_wrapper;
 } // namespace dsn
 
 namespace pegasus {
+class pegasus_value_generator;
+
 namespace server {
 struct db_get_context;
+struct db_write_context;
 class pegasus_server_impl;
 
 class rocksdb_wrapper : public dsn::replication::replica_base
@@ -46,11 +53,37 @@ public:
     /// \result ctx.found=false if record is not found. Still 0 is returned.
     int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);
 
+    int write_batch_put(int64_t decree,
+                        dsn::string_view raw_key,
+                        dsn::string_view value,
+                        uint32_t expire_sec);
+    int write_batch_put_ctx(const db_write_context &ctx,
+                            dsn::string_view raw_key,
+                            dsn::string_view value,
+                            uint32_t expire_sec);
+    int write(int64_t decree);
+    void clear_up_write_batch();
+
+    void set_default_ttl(uint32_t ttl);
+
 private:
+    uint32_t db_expire_ts(uint32_t expire_ts);
+
     rocksdb::DB *_db;
     rocksdb::ReadOptions &_rd_opts;
+    std::unique_ptr<pegasus_value_generator> _value_generator;
+    std::unique_ptr<rocksdb::WriteBatch> _write_batch;
+    std::unique_ptr<rocksdb::WriteOptions> _wt_opts;
+    rocksdb::ColumnFamilyHandle *_meta_cf;
+
     const uint32_t _pegasus_data_version;
     dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+    volatile uint32_t _default_ttl;
+
+    friend class rocksdb_wrapper_test;
+    FRIEND_TEST(rocksdb_wrapper_test, put_verify_timetag);
+    FRIEND_TEST(rocksdb_wrapper_test, 
verify_timetag_compatible_with_version_0);
+    FRIEND_TEST(rocksdb_wrapper_test, get);
 };
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/test/rocksdb_wrapper_test.cpp 
b/src/server/test/rocksdb_wrapper_test.cpp
index 6931db5..313f522 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -27,7 +27,6 @@ class rocksdb_wrapper_test : public pegasus_server_test_base
 {
 protected:
     std::unique_ptr<pegasus_server_write> _server_write;
-    pegasus_write_service::impl *_write_impl{nullptr};
     rocksdb_wrapper *_rocksdb_wrapper{nullptr};
     dsn::blob _raw_key;
 
@@ -36,23 +35,45 @@ public:
     {
         start();
         _server_write = dsn::make_unique<pegasus_server_write>(_server.get(), 
true);
-        _write_impl = _server_write->_write_svc->_impl.get();
-        _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
+        _rocksdb_wrapper = 
_server_write->_write_svc->_impl->_rocksdb_wrapper.get();
 
         pegasus::pegasus_generate_key(
             _raw_key, dsn::string_view("hash_key"), 
dsn::string_view("sort_key"));
     }
 
-    void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t 
expire_ts_seconds)
+    void single_set(db_write_context write_ctx,
+                    dsn::blob raw_key,
+                    dsn::string_view user_value,
+                    int32_t expire_ts_seconds)
     {
-        dsn::apps::update_request put;
-        put.key = raw_key;
-        put.value = user_value;
-        put.expire_ts_seconds = expire_ts_seconds;
-        db_write_context write_ctx;
-        dsn::apps::update_response put_resp;
-        _write_impl->batch_put(write_ctx, put, put_resp);
-        ASSERT_EQ(_write_impl->batch_commit(0), 0);
+        ASSERT_EQ(_rocksdb_wrapper->write_batch_put_ctx(
+                      write_ctx, raw_key, user_value, expire_ts_seconds),
+                  0);
+        ASSERT_EQ(_rocksdb_wrapper->write(0), 0);
+        _rocksdb_wrapper->clear_up_write_batch();
+    }
+
+    // start with duplicating.
+    void set_app_duplicating()
+    {
+        _server->stop(false);
+        dsn::replication::destroy_replica(_replica);
+
+        dsn::app_info app_info;
+        app_info.app_type = "pegasus";
+        app_info.duplicating = true;
+        _replica =
+            dsn::replication::create_test_replica(_replica_stub, _gpid, 
app_info, "./", false);
+        _server = dsn::make_unique<pegasus_server_impl>(_replica);
+
+        SetUp();
+    }
+
+    uint64_t read_timestamp_from(dsn::string_view raw_value)
+    {
+        uint64_t local_timetag =
+            pegasus_extract_timetag(_rocksdb_wrapper->_pegasus_data_version, 
raw_value);
+        return extract_timestamp_from_timetag(local_timetag);
     }
 };
 
@@ -65,8 +86,10 @@ TEST_F(rocksdb_wrapper_test, get)
 
     // expired
     int32_t expired_ts = utils::epoch_now();
+    db_write_context write_ctx;
+    std::string value = "abc";
+    single_set(write_ctx, _raw_key, value, expired_ts);
     db_get_context get_ctx2;
-    single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
     _rocksdb_wrapper->get(_raw_key, &get_ctx2);
     ASSERT_TRUE(get_ctx2.found);
     ASSERT_TRUE(get_ctx2.expired);
@@ -75,11 +98,113 @@ TEST_F(rocksdb_wrapper_test, get)
     // found
     expired_ts = INT32_MAX;
     db_get_context get_ctx3;
-    single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
+    single_set(write_ctx, _raw_key, value, expired_ts);
     _rocksdb_wrapper->get(_raw_key, &get_ctx3);
     ASSERT_TRUE(get_ctx2.found);
     ASSERT_FALSE(get_ctx3.expired);
     ASSERT_EQ(get_ctx3.expire_ts, expired_ts);
+    dsn::blob user_value;
+    pegasus_extract_user_data(
+        _rocksdb_wrapper->_pegasus_data_version, 
std::move(get_ctx3.raw_value), user_value);
+    ASSERT_EQ(user_value, value);
+}
+
+TEST_F(rocksdb_wrapper_test, put_verify_timetag)
+{
+    set_app_duplicating();
+
+    /// insert timestamp 10
+    int64_t decree = 10;
+    uint64_t timestamp = 10;
+    std::string value = "value_10";
+    auto ctx = db_write_context::create(decree, timestamp);
+    single_set(ctx, _raw_key, value, 0);
+
+    db_get_context get_ctx1;
+    _rocksdb_wrapper->get(_raw_key, &get_ctx1);
+    ASSERT_TRUE(get_ctx1.found);
+    ASSERT_FALSE(get_ctx1.expired);
+    ASSERT_EQ(read_timestamp_from(get_ctx1.raw_value), timestamp);
+    dsn::blob user_value;
+    pegasus_extract_user_data(
+        _rocksdb_wrapper->_pegasus_data_version, 
std::move(get_ctx1.raw_value), user_value);
+    ASSERT_EQ(user_value, value);
+
+    /// insert timestamp 15, which overwrites the previous record
+    timestamp = 15;
+    value = "value_15";
+    ctx = db_write_context::create(decree, timestamp);
+    single_set(ctx, _raw_key, value, 0);
+
+    db_get_context get_ctx2;
+    _rocksdb_wrapper->get(_raw_key, &get_ctx2);
+    ASSERT_TRUE(get_ctx2.found);
+    ASSERT_FALSE(get_ctx2.expired);
+    ASSERT_EQ(read_timestamp_from(get_ctx2.raw_value), timestamp);
+    pegasus_extract_user_data(
+        _rocksdb_wrapper->_pegasus_data_version, 
std::move(get_ctx2.raw_value), user_value);
+    ASSERT_EQ(user_value, value);
+
+    /// insert timestamp 15 from remote, which will overwrite the previous 
record,
+    /// since its cluster id is larger (current cluster_id=1)
+    timestamp = 15;
+    value = "value_15_new";
+    ctx.remote_timetag = pegasus::generate_timetag(timestamp, 2, false);
+    ctx.verify_timetag = true;
+    single_set(ctx, _raw_key, value, 0);
+
+    db_get_context get_ctx3;
+    _rocksdb_wrapper->get(_raw_key, &get_ctx3);
+    ASSERT_TRUE(get_ctx3.found);
+    ASSERT_FALSE(get_ctx3.expired);
+    ASSERT_EQ(read_timestamp_from(get_ctx3.raw_value), timestamp);
+    pegasus_extract_user_data(
+        _rocksdb_wrapper->_pegasus_data_version, 
std::move(get_ctx3.raw_value), user_value);
+    ASSERT_EQ(user_value, value);
+
+    /// write retry
+    single_set(ctx, _raw_key, value, 0);
+
+    /// insert timestamp 16 from local, which will overwrite the remote record,
+    /// since its timestamp is larger
+    timestamp = 16;
+    value = "value_16";
+    ctx = db_write_context::create(decree, timestamp);
+    single_set(ctx, _raw_key, value, 0);
+
+    db_get_context get_ctx4;
+    _rocksdb_wrapper->get(_raw_key, &get_ctx4);
+    ASSERT_TRUE(get_ctx4.found);
+    ASSERT_FALSE(get_ctx4.expired);
+    ASSERT_EQ(read_timestamp_from(get_ctx4.raw_value), timestamp);
+    pegasus_extract_user_data(
+        _rocksdb_wrapper->_pegasus_data_version, 
std::move(get_ctx4.raw_value), user_value);
+    ASSERT_EQ(user_value, value);
+
+    // write retry
+    single_set(ctx, _raw_key, value, 0);
+}
+
+// verify timetag on data version v0
+TEST_F(rocksdb_wrapper_test, verify_timetag_compatible_with_version_0)
+{
+    const_cast<uint32_t &>(_rocksdb_wrapper->_pegasus_data_version) = 0; // 
old version
+
+    /// write data with data version 0
+    std::string value = "value";
+    int64_t decree = 10;
+    uint64_t timestamp = 10;
+    auto ctx = db_write_context::create_duplicate(decree, timestamp, true);
+    single_set(ctx, _raw_key, value, 0);
+
+    db_get_context get_ctx;
+    _rocksdb_wrapper->get(_raw_key, &get_ctx);
+    ASSERT_TRUE(get_ctx.found);
+    ASSERT_FALSE(get_ctx.expired);
+    dsn::blob user_value;
+    pegasus_extract_user_data(
+        _rocksdb_wrapper->_pegasus_data_version, std::move(get_ctx.raw_value), 
user_value);
+    ASSERT_EQ(user_value, value);
 }
 } // namespace server
 } // namespace pegasus


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to