This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 6dae647 refactor: use rocksdb_wrapper::write_batch_put to reimplement
check_and_set (#656)
6dae647 is described below
commit 6dae647b69a51a312310039ac91be0905d4169e5
Author: zhao liwei <[email protected]>
AuthorDate: Wed Dec 30 18:21:37 2020 +0800
refactor: use rocksdb_wrapper::write_batch_put to reimplement check_and_set
(#656)
---
rdsn | 2 +-
src/server/meta_store.h | 1 +
src/server/pegasus_write_service_impl.h | 23 +++--
src/server/rocksdb_wrapper.cpp | 115 ++++++++++++++++++++++-
src/server/rocksdb_wrapper.h | 33 +++++++
src/server/test/rocksdb_wrapper_test.cpp | 153 ++++++++++++++++++++++++++++---
6 files changed, 302 insertions(+), 25 deletions(-)
diff --git a/rdsn b/rdsn
index b92ba6d..f27decf 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit b92ba6d5b3a9a15370b85f82814d94a746ace132
+Subproject commit f27decf2e7015b4b75e2e40b53ad6b33fb752ecd
diff --git a/src/server/meta_store.h b/src/server/meta_store.h
index a94dcd0..fb67a53 100644
--- a/src/server/meta_store.h
+++ b/src/server/meta_store.h
@@ -74,6 +74,7 @@ private:
std::string *value);
friend class pegasus_write_service;
+ friend class rocksdb_wrapper;
// Keys of meta data wrote into meta column family.
static const std::string DATA_VERSION;
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index 6187ebd..ecdf4f3 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -31,6 +31,7 @@
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
#include <gtest/gtest_prod.h>
+#include <dsn/utility/defer.h>
namespace pegasus {
namespace server {
@@ -338,22 +339,24 @@ public:
} else {
set_key = check_key;
}
- resp.error = db_write_batch_put(decree,
- set_key,
- update.set_value,
-
static_cast<uint32_t>(update.set_expire_ts_seconds));
+ resp.error = _rocksdb_wrapper->write_batch_put(
+ decree,
+ set_key,
+ update.set_value,
+ static_cast<uint32_t>(update.set_expire_ts_seconds));
} else {
// check not passed, write empty record to update rocksdb's last
flushed decree
- resp.error = db_write_batch_put(decree, dsn::string_view(),
dsn::string_view(), 0);
+ resp.error = _rocksdb_wrapper->write_batch_put(
+ decree, dsn::string_view(), dsn::string_view(), 0);
}
+
+ auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
if (resp.error) {
- clear_up_batch_states(decree, resp.error);
return resp.error;
}
- resp.error = db_write(decree);
+ resp.error = _rocksdb_wrapper->write(decree);
if (resp.error) {
- clear_up_batch_states(decree, resp.error);
return resp.error;
}
@@ -363,7 +366,6 @@ public:
invalid_argument ? rocksdb::Status::kInvalidArgument :
rocksdb::Status::kTryAgain;
}
- clear_up_batch_states(decree, resp.error);
return 0;
}
@@ -550,10 +552,13 @@ public:
void set_default_ttl(uint32_t ttl)
{
+ // TODO(zlw): remove these lines after the refactor is done
if (_default_ttl != ttl) {
_default_ttl = ttl;
ddebug_replica("update _default_ttl to {}.", ttl);
}
+
+ _rocksdb_wrapper->set_default_ttl(ttl);
}
private:
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index e7e8bb5..45dd6a1 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -21,6 +21,7 @@
#include <rocksdb/db.h>
#include "pegasus_write_service_impl.h"
+#include "base/pegasus_value_schema.h"
namespace pegasus {
namespace server {
@@ -31,9 +32,17 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
_rd_opts(server->_data_cf_rd_opts),
+ _meta_cf(server->_meta_cf),
_pegasus_data_version(server->_pegasus_data_version),
- _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+ _pfc_recent_expire_count(server->_pfc_recent_expire_count),
+ _default_ttl(0)
{
+ _write_batch = dsn::make_unique<rocksdb::WriteBatch>();
+ _value_generator = dsn::make_unique<pegasus_value_generator>();
+
+ _wt_opts = dsn::make_unique<rocksdb::WriteOptions>();
+ // disable write ahead logging as replication handles logging instead now
+ _wt_opts->disableWAL = true;
}
int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
@@ -66,5 +75,109 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/
db_get_context *ctx)
return s.code();
}
+int rocksdb_wrapper::write_batch_put(int64_t decree,
+ dsn::string_view raw_key,
+ dsn::string_view value,
+ uint32_t expire_sec)
+{
+ return write_batch_put_ctx(db_write_context::empty(decree), raw_key,
value, expire_sec);
+}
+
+int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
+ dsn::string_view raw_key,
+ dsn::string_view value,
+ uint32_t expire_sec)
+{
+ FAIL_POINT_INJECT_F("db_write_batch_put",
+ [](dsn::string_view) -> int { return
FAIL_DB_WRITE_BATCH_PUT; });
+
+ uint64_t new_timetag = ctx.remote_timetag;
+ if (!ctx.is_duplicated_write()) { // local write
+ new_timetag = generate_timetag(ctx.timestamp,
get_cluster_id_if_exists(), false);
+ }
+
+ if (ctx.verify_timetag && // needs read-before-write
+ _pegasus_data_version >= 1 && // data version 0 doesn't support
timetag.
+ !raw_key.empty()) { // not an empty write
+
+ db_get_context get_ctx;
+ int err = get(raw_key, &get_ctx);
+ if (dsn_unlikely(err != 0)) {
+ return err;
+ }
+ // if record exists and is not expired.
+ if (get_ctx.found && !get_ctx.expired) {
+ uint64_t local_timetag =
+ pegasus_extract_timetag(_pegasus_data_version,
get_ctx.raw_value);
+
+ if (local_timetag >= new_timetag) {
+ // ignore this stale update with lower timetag,
+ // and write an empty record instead
+ raw_key = value = dsn::string_view();
+ }
+ }
+ }
+
+ rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
+ rocksdb::SliceParts skey_parts(&skey, 1);
+ rocksdb::SliceParts svalue = _value_generator->generate_value(
+ _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
+ rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
+ if (dsn_unlikely(!s.ok())) {
+ ::dsn::blob hash_key, sort_key;
+ pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()),
hash_key, sort_key);
+ derror_rocksdb("WriteBatchPut",
+ s.ToString(),
+ "decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}",
+ ctx.decree,
+ utils::c_escape_string(hash_key),
+ utils::c_escape_string(sort_key),
+ expire_sec);
+ }
+ return s.code();
+}
+
+int rocksdb_wrapper::write(int64_t decree)
+{
+ dassert(_write_batch->Count() != 0, "the number of updates in the batch is
0");
+
+ FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return
FAIL_DB_WRITE; });
+
+ rocksdb::Status status =
+ _write_batch->Put(_meta_cf, meta_store::LAST_FLUSHED_DECREE,
std::to_string(decree));
+ if (dsn_unlikely(!status.ok())) {
+ derror_rocksdb("Write",
+ status.ToString(),
+ "put decree of meta cf into batch error, decree: {}",
+ decree);
+ return status.code();
+ }
+
+ status = _db->Write(*_wt_opts, _write_batch.get());
+ if (dsn_unlikely(!status.ok())) {
+ derror_rocksdb("Write", status.ToString(), "write rocksdb error,
decree: {}", decree);
+ }
+ return status.code();
+}
+
+void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }
+
+void rocksdb_wrapper::set_default_ttl(uint32_t ttl)
+{
+ if (_default_ttl != ttl) {
+ _default_ttl = ttl;
+ ddebug_replica("update _default_ttl to {}", ttl);
+ }
+}
+
+uint32_t rocksdb_wrapper::db_expire_ts(uint32_t expire_ts)
+{
+ // use '_default_ttl' when ttl is not set for this write operation.
+ if (_default_ttl != 0 && expire_ts == 0) {
+ return utils::epoch_now() + _default_ttl;
+ }
+
+ return expire_ts;
+}
} // namespace server
} // namespace pegasus
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index 07f49a4..351f6e5 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -20,10 +20,14 @@
#pragma once
#include <dsn/dist/replication/replica_base.h>
+#include <gtest/gtest_prod.h>
namespace rocksdb {
class DB;
class ReadOptions;
+class WriteBatch;
+class ColumnFamilyHandle;
+class WriteOptions;
} // namespace rocksdb
namespace dsn {
@@ -31,8 +35,11 @@ class perf_counter_wrapper;
} // namespace dsn
namespace pegasus {
+class pegasus_value_generator;
+
namespace server {
struct db_get_context;
+struct db_write_context;
class pegasus_server_impl;
class rocksdb_wrapper : public dsn::replication::replica_base
@@ -46,11 +53,37 @@ public:
/// \result ctx.found=false if record is not found. Still 0 is returned.
int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);
+ int write_batch_put(int64_t decree,
+ dsn::string_view raw_key,
+ dsn::string_view value,
+ uint32_t expire_sec);
+ int write_batch_put_ctx(const db_write_context &ctx,
+ dsn::string_view raw_key,
+ dsn::string_view value,
+ uint32_t expire_sec);
+ int write(int64_t decree);
+ void clear_up_write_batch();
+
+ void set_default_ttl(uint32_t ttl);
+
private:
+ uint32_t db_expire_ts(uint32_t expire_ts);
+
rocksdb::DB *_db;
rocksdb::ReadOptions &_rd_opts;
+ std::unique_ptr<pegasus_value_generator> _value_generator;
+ std::unique_ptr<rocksdb::WriteBatch> _write_batch;
+ std::unique_ptr<rocksdb::WriteOptions> _wt_opts;
+ rocksdb::ColumnFamilyHandle *_meta_cf;
+
const uint32_t _pegasus_data_version;
dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+ volatile uint32_t _default_ttl;
+
+ friend class rocksdb_wrapper_test;
+ FRIEND_TEST(rocksdb_wrapper_test, put_verify_timetag);
+ FRIEND_TEST(rocksdb_wrapper_test,
verify_timetag_compatible_with_version_0);
+ FRIEND_TEST(rocksdb_wrapper_test, get);
};
} // namespace server
} // namespace pegasus
diff --git a/src/server/test/rocksdb_wrapper_test.cpp
b/src/server/test/rocksdb_wrapper_test.cpp
index 6931db5..313f522 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -27,7 +27,6 @@ class rocksdb_wrapper_test : public pegasus_server_test_base
{
protected:
std::unique_ptr<pegasus_server_write> _server_write;
- pegasus_write_service::impl *_write_impl{nullptr};
rocksdb_wrapper *_rocksdb_wrapper{nullptr};
dsn::blob _raw_key;
@@ -36,23 +35,45 @@ public:
{
start();
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(),
true);
- _write_impl = _server_write->_write_svc->_impl.get();
- _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
+ _rocksdb_wrapper =
_server_write->_write_svc->_impl->_rocksdb_wrapper.get();
pegasus::pegasus_generate_key(
_raw_key, dsn::string_view("hash_key"),
dsn::string_view("sort_key"));
}
- void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t
expire_ts_seconds)
+ void single_set(db_write_context write_ctx,
+ dsn::blob raw_key,
+ dsn::string_view user_value,
+ int32_t expire_ts_seconds)
{
- dsn::apps::update_request put;
- put.key = raw_key;
- put.value = user_value;
- put.expire_ts_seconds = expire_ts_seconds;
- db_write_context write_ctx;
- dsn::apps::update_response put_resp;
- _write_impl->batch_put(write_ctx, put, put_resp);
- ASSERT_EQ(_write_impl->batch_commit(0), 0);
+ ASSERT_EQ(_rocksdb_wrapper->write_batch_put_ctx(
+ write_ctx, raw_key, user_value, expire_ts_seconds),
+ 0);
+ ASSERT_EQ(_rocksdb_wrapper->write(0), 0);
+ _rocksdb_wrapper->clear_up_write_batch();
+ }
+
+ // start with duplicating.
+ void set_app_duplicating()
+ {
+ _server->stop(false);
+ dsn::replication::destroy_replica(_replica);
+
+ dsn::app_info app_info;
+ app_info.app_type = "pegasus";
+ app_info.duplicating = true;
+ _replica =
+ dsn::replication::create_test_replica(_replica_stub, _gpid,
app_info, "./", false);
+ _server = dsn::make_unique<pegasus_server_impl>(_replica);
+
+ SetUp();
+ }
+
+ uint64_t read_timestamp_from(dsn::string_view raw_value)
+ {
+ uint64_t local_timetag =
+ pegasus_extract_timetag(_rocksdb_wrapper->_pegasus_data_version,
raw_value);
+ return extract_timestamp_from_timetag(local_timetag);
}
};
@@ -65,8 +86,10 @@ TEST_F(rocksdb_wrapper_test, get)
// expired
int32_t expired_ts = utils::epoch_now();
+ db_write_context write_ctx;
+ std::string value = "abc";
+ single_set(write_ctx, _raw_key, value, expired_ts);
db_get_context get_ctx2;
- single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
_rocksdb_wrapper->get(_raw_key, &get_ctx2);
ASSERT_TRUE(get_ctx2.found);
ASSERT_TRUE(get_ctx2.expired);
@@ -75,11 +98,113 @@ TEST_F(rocksdb_wrapper_test, get)
// found
expired_ts = INT32_MAX;
db_get_context get_ctx3;
- single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
+ single_set(write_ctx, _raw_key, value, expired_ts);
_rocksdb_wrapper->get(_raw_key, &get_ctx3);
ASSERT_TRUE(get_ctx2.found);
ASSERT_FALSE(get_ctx3.expired);
ASSERT_EQ(get_ctx3.expire_ts, expired_ts);
+ dsn::blob user_value;
+ pegasus_extract_user_data(
+ _rocksdb_wrapper->_pegasus_data_version,
std::move(get_ctx3.raw_value), user_value);
+ ASSERT_EQ(user_value, value);
+}
+
+TEST_F(rocksdb_wrapper_test, put_verify_timetag)
+{
+ set_app_duplicating();
+
+ /// insert timestamp 10
+ int64_t decree = 10;
+ uint64_t timestamp = 10;
+ std::string value = "value_10";
+ auto ctx = db_write_context::create(decree, timestamp);
+ single_set(ctx, _raw_key, value, 0);
+
+ db_get_context get_ctx1;
+ _rocksdb_wrapper->get(_raw_key, &get_ctx1);
+ ASSERT_TRUE(get_ctx1.found);
+ ASSERT_FALSE(get_ctx1.expired);
+ ASSERT_EQ(read_timestamp_from(get_ctx1.raw_value), timestamp);
+ dsn::blob user_value;
+ pegasus_extract_user_data(
+ _rocksdb_wrapper->_pegasus_data_version,
std::move(get_ctx1.raw_value), user_value);
+ ASSERT_EQ(user_value, value);
+
+ /// insert timestamp 15, which overwrites the previous record
+ timestamp = 15;
+ value = "value_15";
+ ctx = db_write_context::create(decree, timestamp);
+ single_set(ctx, _raw_key, value, 0);
+
+ db_get_context get_ctx2;
+ _rocksdb_wrapper->get(_raw_key, &get_ctx2);
+ ASSERT_TRUE(get_ctx2.found);
+ ASSERT_FALSE(get_ctx2.expired);
+ ASSERT_EQ(read_timestamp_from(get_ctx2.raw_value), timestamp);
+ pegasus_extract_user_data(
+ _rocksdb_wrapper->_pegasus_data_version,
std::move(get_ctx2.raw_value), user_value);
+ ASSERT_EQ(user_value, value);
+
+ /// insert timestamp 15 from remote, which will overwrite the previous
record,
+ /// since its cluster id is larger (current cluster_id=1)
+ timestamp = 15;
+ value = "value_15_new";
+ ctx.remote_timetag = pegasus::generate_timetag(timestamp, 2, false);
+ ctx.verify_timetag = true;
+ single_set(ctx, _raw_key, value, 0);
+
+ db_get_context get_ctx3;
+ _rocksdb_wrapper->get(_raw_key, &get_ctx3);
+ ASSERT_TRUE(get_ctx3.found);
+ ASSERT_FALSE(get_ctx3.expired);
+ ASSERT_EQ(read_timestamp_from(get_ctx3.raw_value), timestamp);
+ pegasus_extract_user_data(
+ _rocksdb_wrapper->_pegasus_data_version,
std::move(get_ctx3.raw_value), user_value);
+ ASSERT_EQ(user_value, value);
+
+ /// write retry
+ single_set(ctx, _raw_key, value, 0);
+
+ /// insert timestamp 16 from local, which will overwrite the remote record,
+ /// since its timestamp is larger
+ timestamp = 16;
+ value = "value_16";
+ ctx = db_write_context::create(decree, timestamp);
+ single_set(ctx, _raw_key, value, 0);
+
+ db_get_context get_ctx4;
+ _rocksdb_wrapper->get(_raw_key, &get_ctx4);
+ ASSERT_TRUE(get_ctx4.found);
+ ASSERT_FALSE(get_ctx4.expired);
+ ASSERT_EQ(read_timestamp_from(get_ctx4.raw_value), timestamp);
+ pegasus_extract_user_data(
+ _rocksdb_wrapper->_pegasus_data_version,
std::move(get_ctx4.raw_value), user_value);
+ ASSERT_EQ(user_value, value);
+
+ // write retry
+ single_set(ctx, _raw_key, value, 0);
+}
+
+// verify timetag on data version v0
+TEST_F(rocksdb_wrapper_test, verify_timetag_compatible_with_version_0)
+{
+ const_cast<uint32_t &>(_rocksdb_wrapper->_pegasus_data_version) = 0; //
old version
+
+ /// write data with data version 0
+ std::string value = "value";
+ int64_t decree = 10;
+ uint64_t timestamp = 10;
+ auto ctx = db_write_context::create_duplicate(decree, timestamp, true);
+ single_set(ctx, _raw_key, value, 0);
+
+ db_get_context get_ctx;
+ _rocksdb_wrapper->get(_raw_key, &get_ctx);
+ ASSERT_TRUE(get_ctx.found);
+ ASSERT_FALSE(get_ctx.expired);
+ dsn::blob user_value;
+ pegasus_extract_user_data(
+ _rocksdb_wrapper->_pegasus_data_version, std::move(get_ctx.raw_value),
user_value);
+ ASSERT_EQ(user_value, value);
}
} // namespace server
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]